diff --git a/go.mod b/go.mod index 53e7d5b..5c0aad9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22.0 require ( github.com/mjl-/adns v0.0.0-20250321173553-ab04b05bdfea github.com/mjl-/autocert v0.0.0-20250321204043-abab2b936e31 - github.com/mjl-/bstore v0.0.6 + github.com/mjl-/bstore v0.0.8 github.com/mjl-/flate v0.0.0-20250221133712-6372d09eb978 github.com/mjl-/sconf v0.0.7 github.com/mjl-/sherpa v0.6.7 diff --git a/go.sum b/go.sum index a27a3b0..6c23e31 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/mjl-/adns v0.0.0-20250321173553-ab04b05bdfea h1:8dftsVL1tHhRksXzFZRhS github.com/mjl-/adns v0.0.0-20250321173553-ab04b05bdfea/go.mod h1:rWZMqGA2HoBm5b5q/A5J8u1sSVuEYh6zBz9tMoVs+RU= github.com/mjl-/autocert v0.0.0-20250321204043-abab2b936e31 h1:6MFGOLPGf6VzHWkKv8waSzJMMS98EFY2LVKPRHffCyo= github.com/mjl-/autocert v0.0.0-20250321204043-abab2b936e31/go.mod h1:taMFU86abMxKLPV4Bynhv8enbYmS67b8LG80qZv2Qus= -github.com/mjl-/bstore v0.0.6 h1:ntlu9MkfCkpm2XfBY4+Ws4KK9YzXzewr3+lCueFB+9c= -github.com/mjl-/bstore v0.0.6/go.mod h1:/cD25FNBaDfvL/plFRxI3Ba3E+wcB0XVOS8nJDqndg0= +github.com/mjl-/bstore v0.0.8 h1:9pojrmRTJZZOx9yAUbldq6CZCBuRbwNtk+N+SRCEUVk= +github.com/mjl-/bstore v0.0.8/go.mod h1:92/K8lyfA4z5W0P9O0TqCenNnC76p2YFdWJSZChzBkk= github.com/mjl-/flate v0.0.0-20250221133712-6372d09eb978 h1:Eg5DfI3/00URzGErujKus6a3O0kyXzF8vjoDZzH/gig= github.com/mjl-/flate v0.0.0-20250221133712-6372d09eb978/go.mod h1:QBkFtjai3AiQQuUu7pVh6PA06Vd3oa68E+vddf/UBOs= github.com/mjl-/sconf v0.0.7 h1:bdBcSFZCDFMm/UdBsgNCsjkYmKrSgYwp7rAOoufwHe4= diff --git a/vendor/github.com/mjl-/bstore/exec.go b/vendor/github.com/mjl-/bstore/exec.go index 93a59a9..b0fcdfa 100644 --- a/vendor/github.com/mjl-/bstore/exec.go +++ b/vendor/github.com/mjl-/bstore/exec.go @@ -37,8 +37,16 @@ type exec[T any] struct { ib *bolt.Bucket rb *bolt.Bucket - // Of last element in data. For finding end of group through prefix-match during - // partial index ordering for remaining in-memory sort. + // For index or record bucket, for full/index scans. + cursor *bolt.Cursor + + // Whether to reseek the cursor. Set after changes to the ib or rb. + reseek bool + + // Last cursor key we read, for reseeking after changes to the bucket. + lastck []byte + + // Last index key used to collect pairs in data. lastik []byte // If not nil, row that was scanned previously, to use instead of calling forward. @@ -250,6 +258,14 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) { // wouldn't do this, a query that doesn't return any matches won't get canceled // until it is finished. keysSeen := 0 + + var statsKV *StatsKV + if e.plan.idx == nil { + statsKV = &q.stats.Records + } else { + statsKV = &q.stats.Index + } + for { var xk, xv []byte keysSeen++ @@ -266,15 +282,12 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) { if e.forward == nil { // First time we are in this loop, we set up a cursor and e.forward. - var c *bolt.Cursor - var statsKV *StatsKV if e.plan.idx == nil { - c = e.rb.Cursor() - statsKV = &q.stats.Records + e.cursor = e.rb.Cursor() } else { - c = e.ib.Cursor() - statsKV = &q.stats.Index + e.cursor = e.ib.Cursor() } + c := e.cursor if !e.plan.desc { e.forward = c.Next if e.plan.start != nil { @@ -321,12 +334,47 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) { // Resume with previously seen key/value. xk, xv = e.stowedbk, e.stowedbv e.stowedbk, e.stowedbv = nil, nil - } else { - if e.plan.idx == nil { - q.stats.Records.Cursor++ - } else { - q.stats.Index.Cursor++ + + if e.reseek { + e.reseek = false + q.stats.Reseek++ + statsKV.Cursor++ + // We haven't processed xk yet. It may still exist, or may be gone which causes us to end up at the key after it. + xk, xv = e.cursor.Seek(xk) + if e.plan.desc && xk == nil { + // Our key is gone, the cursor advanced, but there is no more key left. So start + // again at the end. If there is still anything, the check below will match and + // we'll move. + statsKV.Cursor++ + xk, _ = e.cursor.Last() + } + // If we have a key, and we operate descending, we may have seeked to a key we've + // already handled when collecting data previously. + if xk != nil && e.plan.desc && bytes.Compare(xk, e.lastik) >= 0 { + statsKV.Cursor++ + xk, xv = e.forward() + } } + } else if e.reseek { + e.reseek = false + q.stats.Reseek++ + statsKV.Cursor++ + xk, xv = e.cursor.Seek(e.lastck) + // We may have seeked to the end, beyond our starting point in case of descending + // order. So start again at the end. If there is any key left, the check below will + // match and we'll move. + if e.plan.desc && xk == nil { + statsKV.Cursor++ + xk, _ = e.cursor.Last() + } + // If xk is the same as e.lastck, we already handled it. Also, the Last() above may + // have seeked to beyond what we've already handled. + if xk != nil && (bytes.Equal(xk, e.lastck) || e.plan.desc && bytes.Compare(xk, e.lastck) > 0) { + statsKV.Cursor++ + xk, xv = e.forward() + } + } else { + statsKV.Cursor++ xk, xv = e.forward() // log.Printf("forwarded, %x %x", xk, xv) } @@ -335,6 +383,8 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) { break } + e.lastck = xk + if e.plan.start != nil && !e.plan.startInclusive && bytes.HasPrefix(xk, e.plan.start) { continue } diff --git a/vendor/github.com/mjl-/bstore/query.go b/vendor/github.com/mjl-/bstore/query.go index 2b7efa1..5914e35 100644 --- a/vendor/github.com/mjl-/bstore/query.go +++ b/vendor/github.com/mjl-/bstore/query.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "reflect" + "slices" + + bolt "go.etcd.io/bbolt" ) // The convention for handling a errors on a Query: methods that return a bool @@ -29,7 +32,7 @@ type Query[T any] struct { ctxDone <-chan struct{} // ctx.Done(), kept here for fast access. st storeType // Of T. pkType reflect.Type // Shortcut for st.Current.Fields[0]. - xtx *Tx // If nil, a new transaction is automatically created from db. Using a tx goes through tx() one exists. + xtx *Tx // If nil, a new transaction is automatically created from db. Using a tx goes through tx() to ensure one exists. xdb *DB // If not nil, xtx was created to execute the operation and is when the operation finishes (also on error). err error // If set, returned by operations. For indicating failed filters, or that an operation has finished. xfilterIDs *filterIDs[T] // Kept separately from filters because these filters make us use the PK without further index planning. @@ -189,6 +192,9 @@ func QueryTx[T any](tx *Tx) *Query[T] { return q } q.init(tx.ctx, tx.db) + if q.err == nil { + q.xtx.queries = append(q.xtx.queries, q) + } return q } @@ -239,6 +245,7 @@ func (q *Query[T]) tx(write bool) (*Tx, error) { return nil, q.err } q.xtx = &Tx{ctx: q.ctx, db: q.xdb, btx: tx} + q.xtx.queries = append(q.xtx.queries, q) if write { q.stats.Writes++ } else { @@ -265,6 +272,7 @@ func (q *Query[T]) error(err error) { q.xdb = nil } if q.xtx != nil { + q.xtx.queries = slices.DeleteFunc(q.xtx.queries, func(oq bucketReseeker) bool { return oq == q }) q.txAddStats() } // This is the only place besides init that sets an error on query. @@ -276,6 +284,12 @@ func (q *Query[T]) errorf(format string, args ...any) { q.error(fmt.Errorf(format, args...)) } +func (q *Query[T]) bucketReseek(b *bolt.Bucket) { + if q.err == nil && q.exec != nil && q.exec.cursor != nil && q.exec.cursor.Bucket() == b { + q.exec.reseek = true + } +} + // Close closes a Query. Must always be called for Queries on which Next or // NextID was called. Other operations call Close themselves. func (q *Query[T]) Close() error { @@ -924,28 +938,15 @@ func (q *Query[T]) Delete() (deleted int, rerr error) { return 0, q.err } - // We collect the records to delete first, then delete them. - type work struct { - bk []byte - rov reflect.Value - } - var deletes []work + n := 0 err := q.foreachKey(true, true, func(bk []byte, ov T) error { + n++ rov := reflect.ValueOf(ov) q.gather(ov, rov) - deletes = append(deletes, work{bk, rov}) - return nil - }) - if err != nil { - return 0, err - } - for _, w := range deletes { q.stats.Delete++ - if err := q.xtx.delete(q.exec.rb, q.st, w.bk, w.rov); err != nil { - return 0, err - } - } - return len(deletes), nil + return q.xtx.delete(q.exec.rb, q.st, bk, rov) + }) + return n, err } // Get returns the single selected record. @@ -1094,37 +1095,21 @@ next: } func (q *Query[T]) update(fields []reflect.StructField, values []reflect.Value) (int, error) { - // todo: we could check if the updated fields are not relevant for the cursor (not in filter/sort query) and update inside foreach. - // We first gather all records to be updated (using the query), then update the - // records. - type work struct { - bk []byte - rv reflect.Value - ov reflect.Value - } - var updates []work + n := 0 + ov := reflect.New(q.st.Type).Elem() err := q.foreachKey(true, true, func(bk []byte, v T) error { + n++ rv := reflect.ValueOf(&v).Elem() - ov := reflect.New(q.st.Type).Elem() ov.Set(rv) for i, sf := range fields { frv := rv.FieldByIndex(sf.Index) frv.Set(values[i]) } q.gather(v, rv) - updates = append(updates, work{bk, rv, ov}) - return nil - }) - if err != nil { - return 0, err - } - for _, w := range updates { q.stats.Update++ - if err := q.xtx.update(q.exec.rb, q.st, w.rv, w.ov, w.bk); err != nil { - return 0, err - } - } - return len(updates), nil + return q.xtx.update(q.exec.rb, q.st, rv, ov, bk) + }) + return n, err } // IDs sets idsptr to the primary keys of selected records. Idptrs must be a @@ -1228,11 +1213,15 @@ func (q *Query[T]) Exists() (exists bool, rerr error) { var StopForEach error = errors.New("stop foreach") // ForEach calls fn on each selected record. +// // If fn returns StopForEach, ForEach stops iterating, so no longer calls fn, // and returns nil. -// Fn must not update values, the internal cursor is not repositioned between -// invocations of fn, which would cause undefined behaviour (in practice, -// matching values could be skipped). +// +// ForEach automatically respositions the iteration cursor when data is changed by +// fn. Changes to data by fn aren't necessarily reflected in the values fn is +// called with. This can happen when a sort order isn't executed using an index: +// ForEach gathers and sorts (a subset of) values before yielding them and does not +// reapply filtering and does not necessarily yield the updated values. func (q *Query[T]) ForEach(fn func(value T) error) (rerr error) { defer q.finish(&rerr) q.checkNotNext() diff --git a/vendor/github.com/mjl-/bstore/stats.go b/vendor/github.com/mjl-/bstore/stats.go index 9b556e4..77551d7 100644 --- a/vendor/github.com/mjl-/bstore/stats.go +++ b/vendor/github.com/mjl-/bstore/stats.go @@ -36,6 +36,7 @@ type Stats struct { LastIndex string // Last index for LastType used for a query, or empty. LastOrdered bool // Whether last scan (PK or index) use was ordered, e.g. for sorting or because of a comparison filter. LastAsc bool // If ordered, whether last index scan was ascending. + Reseek uint // Number of cursor reseeks due to updates during queries. } func (skv *StatsKV) add(n StatsKV) { @@ -77,6 +78,7 @@ func (st *Stats) add(n Stats) { st.LastIndex = n.LastIndex st.LastOrdered = n.LastOrdered st.LastAsc = n.LastAsc + st.Reseek += n.Reseek } // Sub returns st with the counters from o subtracted. @@ -100,6 +102,7 @@ func (st Stats) Sub(o Stats) Stats { st.PlanPKScan -= o.PlanPKScan st.PlanIndexScan -= o.PlanIndexScan st.Sort -= o.Sort + st.Reseek -= o.Reseek return st } diff --git a/vendor/github.com/mjl-/bstore/store.go b/vendor/github.com/mjl-/bstore/store.go index b723366..cd7ecc3 100644 --- a/vendor/github.com/mjl-/bstore/store.go +++ b/vendor/github.com/mjl-/bstore/store.go @@ -75,9 +75,18 @@ type Tx struct { bucketCache map[bucketKey]*bolt.Bucket + // We need to keep track of queries to set reseek on their execs when updating + // (put/delete) the record/index bucket for their cursors. + queries []bucketReseeker + stats Stats } +type bucketReseeker interface { + // bucketReseek is called on queries when a bucket changed (put/delete). + bucketReseek(b *bolt.Bucket) +} + // bucketKey represents a subbucket for a type. type bucketKey struct { typeName string diff --git a/vendor/github.com/mjl-/bstore/tx.go b/vendor/github.com/mjl-/bstore/tx.go index 305695b..640eea3 100644 --- a/vendor/github.com/mjl-/bstore/tx.go +++ b/vendor/github.com/mjl-/bstore/tx.go @@ -108,6 +108,7 @@ func (tx *Tx) updateIndices(tv *typeVersion, pk []byte, ov, v reflect.Value) err if err != nil { return err } + var modified bool if remove { ikl, err := idx.packKey(ov, pk) if err != nil { @@ -121,6 +122,7 @@ func (tx *Tx) updateIndices(tv *typeVersion, pk []byte, ov, v reflect.Value) err return fmt.Errorf("%w: key missing from index", ErrStore) } } + modified = true if err := ib.Delete(ik.full); err != nil { return fmt.Errorf("%w: removing from index: %s", ErrStore, err) } @@ -140,11 +142,15 @@ func (tx *Tx) updateIndices(tv *typeVersion, pk []byte, ov, v reflect.Value) err } tx.stats.Index.Put++ + modified = true if err := ib.Put(ik.full, []byte{}); err != nil { return fmt.Errorf("inserting into index: %w", err) } } } + if modified { + tx.bucketReseek(ib) + } } return nil } @@ -286,6 +292,7 @@ func (tx *Tx) delete(rb *bolt.Bucket, st storeType, k []byte, rov reflect.Value) } tx.stats.Records.Delete++ + tx.bucketReseek(rb) return rb.Delete(k) } @@ -403,6 +410,7 @@ func (tx *Tx) insert(rb *bolt.Bucket, st storeType, rv, krv reflect.Value, k []b return fmt.Errorf("updating indices for inserted value: %w", err) } tx.stats.Records.Put++ + tx.bucketReseek(rb) if err := rb.Put(k, v); err != nil { return err } @@ -427,9 +435,18 @@ func (tx *Tx) update(rb *bolt.Bucket, st storeType, rv, rov reflect.Value, k []b return fmt.Errorf("updating indices for updated record: %w", err) } tx.stats.Records.Put++ + tx.bucketReseek(rb) return rb.Put(k, v) } +// bucketReseek marks queries as needing a reseek on their cursor due to changes to +// the bucket. +func (tx *Tx) bucketReseek(b *bolt.Bucket) { + for _, q := range tx.queries { + q.bucketReseek(b) + } +} + // Begin starts a transaction. // // If writable is true, the transaction allows modifications. Only one writable diff --git a/vendor/modules.txt b/vendor/modules.txt index 784ddfd..4a49939 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,8 +16,8 @@ github.com/mjl-/adns/internal/singleflight # github.com/mjl-/autocert v0.0.0-20250321204043-abab2b936e31 ## explicit; go 1.20 github.com/mjl-/autocert -# github.com/mjl-/bstore v0.0.6 -## explicit; go 1.19 +# github.com/mjl-/bstore v0.0.8 +## explicit; go 1.22 github.com/mjl-/bstore # github.com/mjl-/flate v0.0.0-20250221133712-6372d09eb978 ## explicit; go 1.21