update to latest bstore, which now properly handles modifications during Query.ForEach

This commit is contained in:
Mechiel Lukkien
2025-03-24 10:01:57 +01:00
parent 88ec5c6fbe
commit 04b1f030b7
8 changed files with 130 additions and 62 deletions

View File

@ -5,6 +5,9 @@ import (
"errors"
"fmt"
"reflect"
"slices"
bolt "go.etcd.io/bbolt"
)
// The convention for handling a errors on a Query: methods that return a bool
@ -29,7 +32,7 @@ type Query[T any] struct {
ctxDone <-chan struct{} // ctx.Done(), kept here for fast access.
st storeType // Of T.
pkType reflect.Type // Shortcut for st.Current.Fields[0].
xtx *Tx // If nil, a new transaction is automatically created from db. Using a tx goes through tx() one exists.
xtx *Tx // If nil, a new transaction is automatically created from db. Using a tx goes through tx() to ensure one exists.
xdb *DB // If not nil, xtx was created to execute the operation and is when the operation finishes (also on error).
err error // If set, returned by operations. For indicating failed filters, or that an operation has finished.
xfilterIDs *filterIDs[T] // Kept separately from filters because these filters make us use the PK without further index planning.
@ -189,6 +192,9 @@ func QueryTx[T any](tx *Tx) *Query[T] {
return q
}
q.init(tx.ctx, tx.db)
if q.err == nil {
q.xtx.queries = append(q.xtx.queries, q)
}
return q
}
@ -239,6 +245,7 @@ func (q *Query[T]) tx(write bool) (*Tx, error) {
return nil, q.err
}
q.xtx = &Tx{ctx: q.ctx, db: q.xdb, btx: tx}
q.xtx.queries = append(q.xtx.queries, q)
if write {
q.stats.Writes++
} else {
@ -265,6 +272,7 @@ func (q *Query[T]) error(err error) {
q.xdb = nil
}
if q.xtx != nil {
q.xtx.queries = slices.DeleteFunc(q.xtx.queries, func(oq bucketReseeker) bool { return oq == q })
q.txAddStats()
}
// This is the only place besides init that sets an error on query.
@ -276,6 +284,12 @@ func (q *Query[T]) errorf(format string, args ...any) {
q.error(fmt.Errorf(format, args...))
}
func (q *Query[T]) bucketReseek(b *bolt.Bucket) {
if q.err == nil && q.exec != nil && q.exec.cursor != nil && q.exec.cursor.Bucket() == b {
q.exec.reseek = true
}
}
// Close closes a Query. Must always be called for Queries on which Next or
// NextID was called. Other operations call Close themselves.
func (q *Query[T]) Close() error {
@ -924,28 +938,15 @@ func (q *Query[T]) Delete() (deleted int, rerr error) {
return 0, q.err
}
// We collect the records to delete first, then delete them.
type work struct {
bk []byte
rov reflect.Value
}
var deletes []work
n := 0
err := q.foreachKey(true, true, func(bk []byte, ov T) error {
n++
rov := reflect.ValueOf(ov)
q.gather(ov, rov)
deletes = append(deletes, work{bk, rov})
return nil
})
if err != nil {
return 0, err
}
for _, w := range deletes {
q.stats.Delete++
if err := q.xtx.delete(q.exec.rb, q.st, w.bk, w.rov); err != nil {
return 0, err
}
}
return len(deletes), nil
return q.xtx.delete(q.exec.rb, q.st, bk, rov)
})
return n, err
}
// Get returns the single selected record.
@ -1094,37 +1095,21 @@ next:
}
func (q *Query[T]) update(fields []reflect.StructField, values []reflect.Value) (int, error) {
// todo: we could check if the updated fields are not relevant for the cursor (not in filter/sort query) and update inside foreach.
// We first gather all records to be updated (using the query), then update the
// records.
type work struct {
bk []byte
rv reflect.Value
ov reflect.Value
}
var updates []work
n := 0
ov := reflect.New(q.st.Type).Elem()
err := q.foreachKey(true, true, func(bk []byte, v T) error {
n++
rv := reflect.ValueOf(&v).Elem()
ov := reflect.New(q.st.Type).Elem()
ov.Set(rv)
for i, sf := range fields {
frv := rv.FieldByIndex(sf.Index)
frv.Set(values[i])
}
q.gather(v, rv)
updates = append(updates, work{bk, rv, ov})
return nil
})
if err != nil {
return 0, err
}
for _, w := range updates {
q.stats.Update++
if err := q.xtx.update(q.exec.rb, q.st, w.rv, w.ov, w.bk); err != nil {
return 0, err
}
}
return len(updates), nil
return q.xtx.update(q.exec.rb, q.st, rv, ov, bk)
})
return n, err
}
// IDs sets idsptr to the primary keys of selected records. Idptrs must be a
@ -1228,11 +1213,15 @@ func (q *Query[T]) Exists() (exists bool, rerr error) {
var StopForEach error = errors.New("stop foreach")
// ForEach calls fn on each selected record.
//
// If fn returns StopForEach, ForEach stops iterating, so no longer calls fn,
// and returns nil.
// Fn must not update values, the internal cursor is not repositioned between
// invocations of fn, which would cause undefined behaviour (in practice,
// matching values could be skipped).
//
// ForEach automatically respositions the iteration cursor when data is changed by
// fn. Changes to data by fn aren't necessarily reflected in the values fn is
// called with. This can happen when a sort order isn't executed using an index:
// ForEach gathers and sorts (a subset of) values before yielding them and does not
// reapply filtering and does not necessarily yield the updated values.
func (q *Query[T]) ForEach(fn func(value T) error) (rerr error) {
defer q.finish(&rerr)
q.checkNotNext()