mirror of
https://github.com/mjl-/mox.git
synced 2025-07-10 06:34:40 +03:00
update to latest bstore, with a bugfix for queries with multiple orders that were partially handled by an index
causing returned order to be incorrect. was triggered by new code i'm working on.
This commit is contained in:
105
vendor/github.com/mjl-/bstore/exec.go
generated
vendored
105
vendor/github.com/mjl-/bstore/exec.go
generated
vendored
@ -21,14 +21,31 @@ type exec[T any] struct {
|
||||
// See plan.keys. We remove items from the list when we looked one up, but we keep the slice non-nil.
|
||||
keys [][]byte
|
||||
|
||||
// If -1, no limit is set. This is different from Query where 0 means
|
||||
// no limit. We count back and 0 means the end.
|
||||
// If non-empty, serve nextKey requests from here. Used when we need to do
|
||||
// in-memory sort. After reading from here, and limit isn't reached yet, we may do
|
||||
// another fill & sort of data to serve from, for orderings partially from an
|
||||
// index. When filling data, limit (below) is accounted for, so all elements can be
|
||||
// returned to caller.
|
||||
data []pair[T]
|
||||
|
||||
// If -1, no limit is set. This is different from Query where 0 means no limit. We
|
||||
// count back and 0 means the end. Also set from -1 to 0 when end of execution is
|
||||
// reached.
|
||||
limit int
|
||||
|
||||
data []pair[T] // If not nil (even if empty), serve nextKey requests from here.
|
||||
ib *bolt.Bucket
|
||||
rb *bolt.Bucket
|
||||
forward func() (bk, bv []byte) // Once we start scanning, we prepare forward to next/prev to the following value.
|
||||
// Index and record buckets loaded when first needed.
|
||||
ib *bolt.Bucket
|
||||
rb *bolt.Bucket
|
||||
|
||||
// Of last element in data. For finding end of group through prefix-match during
|
||||
// partial index ordering for remaining in-memory sort.
|
||||
lastik []byte
|
||||
|
||||
// If not nil, row that was scanned previously, to use instead of calling forward.
|
||||
stowedbk, stowedbv []byte
|
||||
|
||||
// Once we start scanning, we prepare forward to next/prev to the following value.
|
||||
forward func() (bk, bv []byte)
|
||||
}
|
||||
|
||||
// exec creates a new execution for the plan, registering statistics.
|
||||
@ -54,7 +71,7 @@ func (p *plan[T]) exec(q *Query[T]) *exec[T] {
|
||||
if len(p.orders) > 0 {
|
||||
q.stats.Sort++
|
||||
}
|
||||
q.stats.LastOrdered = p.start != nil || p.stop != nil
|
||||
q.stats.LastOrdered = p.start != nil || p.stop != nil || p.norderidxuse > 0
|
||||
q.stats.LastAsc = !p.desc
|
||||
|
||||
limit := -1
|
||||
@ -107,12 +124,9 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
return nil, zero, q.err
|
||||
}
|
||||
|
||||
// We collected & sorted data previously. Return from it until done.
|
||||
// Limit was already applied.
|
||||
if e.data != nil {
|
||||
if len(e.data) == 0 {
|
||||
return nil, zero, ErrAbsent
|
||||
}
|
||||
// We collected & sorted data previously.
|
||||
// Limit was already applied/updated, so we can serve these without checking.
|
||||
if len(e.data) > 0 {
|
||||
p := e.data[0]
|
||||
e.data = e.data[1:]
|
||||
var v T
|
||||
@ -127,6 +141,7 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
return p.bk, v, nil
|
||||
}
|
||||
|
||||
// Limit is 0 when we hit the limit or at end of processing the execution.
|
||||
if e.limit == 0 {
|
||||
return nil, zero, ErrAbsent
|
||||
}
|
||||
@ -153,10 +168,8 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
// List of IDs (records) or full unique index equality match.
|
||||
// We can get the records/index value by a simple "get" on the key.
|
||||
if e.keys != nil {
|
||||
// If we need to sort, we collect all elements and prevent further querying.
|
||||
collect := len(e.plan.orders) > 0
|
||||
if collect {
|
||||
e.data = []pair[T]{} // Must be non-nil to get into e.data branch!
|
||||
}
|
||||
for i, xk := range e.keys {
|
||||
var bk, bv []byte
|
||||
|
||||
@ -217,6 +230,7 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
return bk, v, nil
|
||||
}
|
||||
if !collect {
|
||||
e.limit = 0
|
||||
return nil, zero, ErrAbsent
|
||||
}
|
||||
// Restart, now with data.
|
||||
@ -225,14 +239,13 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
if e.limit > 0 && len(e.data) > e.limit {
|
||||
e.data = e.data[:e.limit]
|
||||
}
|
||||
e.limit = 0
|
||||
return q.nextKey(write, value)
|
||||
}
|
||||
|
||||
// We are going to do a scan, either over the records or an index. We may have a start and stop key.
|
||||
// We are going to do a scan, either over the records or (a part of) an index. We
|
||||
// may have a start and stop key.
|
||||
collect := len(e.plan.orders) > 0
|
||||
if collect {
|
||||
e.data = []pair[T]{} // Must be non-nil to get into e.data branch on function restart.
|
||||
}
|
||||
// Every 1k keys we've seen, we'll check if the context has been canceled. If we
|
||||
// wouldn't do this, a query that doesn't return any matches won't get canceled
|
||||
// until it is finished.
|
||||
@ -304,6 +317,10 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if e.stowedbk != nil {
|
||||
// Resume with previously seen key/value.
|
||||
xk, xv = e.stowedbk, e.stowedbv
|
||||
e.stowedbk, e.stowedbv = nil, nil
|
||||
} else {
|
||||
if e.plan.idx == nil {
|
||||
q.stats.Records.Cursor++
|
||||
@ -331,24 +348,37 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
}
|
||||
|
||||
var pk, bv []byte
|
||||
ordersidxPartial := e.plan.norderidxuse > 0 && len(e.plan.orders) > 0
|
||||
var idxkeys [][]byte // Only set when we have partial ordering from index.
|
||||
if e.plan.idx == nil {
|
||||
pk = xk
|
||||
bv = xv
|
||||
} else {
|
||||
var err error
|
||||
pk, _, err = e.plan.idx.parseKey(xk, false)
|
||||
pk, idxkeys, err = e.plan.idx.parseKey(xk, ordersidxPartial, true)
|
||||
if err != nil {
|
||||
q.error(err)
|
||||
return nil, zero, err
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a parial order from the index, and this new value has a different
|
||||
// index ordering key prefix than the last value, we stop collecting, sort the data we
|
||||
// have by the remaining ordering, return that data, and continue collecting in the
|
||||
// next round. We stow the new value so we don't have to revert the forward() from
|
||||
// earlier.
|
||||
if ordersidxPartial && len(e.data) > 0 && !prefixMatch(e.lastik, e.plan.norderidxuse, idxkeys, pk) {
|
||||
e.stowedbk, e.stowedbv = xk, xv
|
||||
break
|
||||
}
|
||||
|
||||
p := pair[T]{pk, bv, nil}
|
||||
if ok, err := e.checkFilter(&p); err != nil {
|
||||
return nil, zero, err
|
||||
} else if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
//log.Printf("have kv, %x %x", p.bk, p.bv)
|
||||
var v T
|
||||
var err error
|
||||
@ -361,6 +391,7 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
}
|
||||
if collect {
|
||||
e.data = append(e.data, p)
|
||||
e.lastik = xk
|
||||
continue
|
||||
}
|
||||
if e.limit > 0 {
|
||||
@ -368,17 +399,39 @@ func (e *exec[T]) nextKey(write, value bool) ([]byte, T, error) {
|
||||
}
|
||||
return p.bk, v, nil
|
||||
}
|
||||
if !collect {
|
||||
if !collect || len(e.data) == 0 {
|
||||
e.limit = 0
|
||||
return nil, zero, ErrAbsent
|
||||
}
|
||||
// Restart, now with data.
|
||||
e.sort()
|
||||
if e.limit > 0 && len(e.data) > e.limit {
|
||||
e.data = e.data[:e.limit]
|
||||
if e.limit > 0 {
|
||||
if len(e.data) > e.limit {
|
||||
e.data = e.data[:e.limit]
|
||||
}
|
||||
e.limit -= len(e.data)
|
||||
}
|
||||
return e.nextKey(write, value)
|
||||
}
|
||||
|
||||
// prefixMatch returns whether ik (index key) starts with the bytes from n elements
|
||||
// from field keys kl and primary key pk.
|
||||
func prefixMatch(ik []byte, n int, kl [][]byte, pk []byte) bool {
|
||||
for i := 0; i < n; i++ {
|
||||
var k []byte
|
||||
if i < len(kl) {
|
||||
k = kl[i]
|
||||
} else {
|
||||
k = pk
|
||||
}
|
||||
if !bytes.HasPrefix(ik, k) {
|
||||
return false
|
||||
}
|
||||
ik = ik[len(k):]
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// checkFilter checks against the filters for the plan.
|
||||
func (e *exec[T]) checkFilter(p *pair[T]) (rok bool, rerr error) {
|
||||
q := e.q
|
||||
@ -574,6 +627,10 @@ func compare(k kind, a, b reflect.Value) int {
|
||||
}
|
||||
|
||||
func (e *exec[T]) sort() {
|
||||
if len(e.data) <= 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: We should check whether we actually need to load values. We're
|
||||
// always loading it for the time being because SortStableFunc isn't
|
||||
// going to give us a *pair (even though it could because of the slice)
|
||||
|
Reference in New Issue
Block a user