update dependencies, including bolt with stability fixes

This commit is contained in:
Mechiel Lukkien
2023-02-17 18:55:01 +01:00
parent fb3794e31b
commit 6df4b454d5
69 changed files with 1242 additions and 462 deletions

382
vendor/go.etcd.io/bbolt/tx.go generated vendored
View File

@ -6,6 +6,7 @@ import (
"os"
"sort"
"strings"
"sync/atomic"
"time"
"unsafe"
)
@ -151,17 +152,19 @@ func (tx *Tx) Commit() error {
// Rebalance nodes which have had deletions.
var startTime = time.Now()
tx.root.rebalance()
if tx.stats.Rebalance > 0 {
tx.stats.RebalanceTime += time.Since(startTime)
if tx.stats.GetRebalance() > 0 {
tx.stats.IncRebalanceTime(time.Since(startTime))
}
opgid := tx.meta.pgid
// spill data onto dirty pages.
startTime = time.Now()
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
tx.stats.SpillTime += time.Since(startTime)
tx.stats.IncSpillTime(time.Since(startTime))
// Free the old root bucket.
tx.meta.root.root = tx.root.root
@ -180,6 +183,14 @@ func (tx *Tx) Commit() error {
tx.meta.freelist = pgidNoFreelist
}
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
// Write dirty pages to disk.
startTime = time.Now()
if err := tx.write(); err != nil {
@ -208,7 +219,7 @@ func (tx *Tx) Commit() error {
tx.rollback()
return err
}
tx.stats.WriteTime += time.Since(startTime)
tx.stats.IncWriteTime(time.Since(startTime))
// Finalize the transaction.
tx.close()
@ -224,7 +235,6 @@ func (tx *Tx) Commit() error {
func (tx *Tx) commitFreelist() error {
// Allocate new pages for the new free list. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
opgid := tx.meta.pgid
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
@ -235,13 +245,6 @@ func (tx *Tx) commitFreelist() error {
return err
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
return nil
}
@ -275,13 +278,17 @@ func (tx *Tx) rollback() {
}
if tx.writable {
tx.db.freelist.rollback(tx.meta.txid)
if !tx.db.hasSyncedFreelist() {
// Reconstruct free page list by scanning the DB to get the whole free page list.
// Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
tx.db.freelist.noSyncReload(tx.db.freepages())
} else {
// Read free page list from freelist page.
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
// When mmap fails, the `data`, `dataref` and `datasz` may be reset to
// zero values, and there is no way to reload free page IDs in this case.
if tx.db.data != nil {
if !tx.db.hasSyncedFreelist() {
// Reconstruct free page list by scanning the DB to get the whole free page list.
// Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
tx.db.freelist.noSyncReload(tx.db.freepages())
} else {
// Read free page list from freelist page.
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
}
}
tx.close()
@ -400,98 +407,6 @@ func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
return f.Close()
}
// Check performs several consistency checks on the database for this transaction.
// An error is returned if any inconsistency is found.
//
// It can be safely run concurrently on a writable transaction. However, this
// incurs a high cost for large databases and databases with a lot of subbuckets
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
func (tx *Tx) Check() <-chan error {
ch := make(chan error)
go tx.check(ch)
return ch
}
func (tx *Tx) check(ch chan error) {
// Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
// Check if any pages are double freed.
freed := make(map[pgid]bool)
all := make([]pgid, tx.db.freelist.count())
tx.db.freelist.copyall(all)
for _, id := range all {
if freed[id] {
ch <- fmt.Errorf("page %d: already freed", id)
}
freed[id] = true
}
// Track every reachable page.
reachable := make(map[pgid]*page)
reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
if tx.meta.freelist != pgidNoFreelist {
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
}
}
// Recursively check buckets.
tx.checkBucket(&tx.root, reachable, freed, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
}
}
// Close the channel to signal completion.
close(ch)
}
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
// Ignore inline buckets.
if b.root == 0 {
return
}
// Check every page used by this bucket.
b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
if p.id > tx.meta.pgid {
ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
}
// Ensure each page is only referenced once.
for i := pgid(0); i <= pgid(p.overflow); i++ {
var id = p.id + i
if _, ok := reachable[id]; ok {
ch <- fmt.Errorf("page %d: multiple references", int(id))
}
reachable[id] = p
}
// We should only encounter un-freed leaf and branch pages.
if freed[p.id] {
ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
} else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
}
})
// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch)
}
return nil
})
}
// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(tx.meta.txid, count)
@ -503,8 +418,8 @@ func (tx *Tx) allocate(count int) (*page, error) {
tx.pages[p.id] = p
// Update statistics.
tx.stats.PageCount += count
tx.stats.PageAlloc += count * tx.db.pageSize
tx.stats.IncPageCount(int64(count))
tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
return p, nil
}
@ -539,7 +454,7 @@ func (tx *Tx) write() error {
}
// Update statistics.
tx.stats.Write++
tx.stats.IncWrite(1)
// Exit inner for loop if we've written all the chunks.
rem -= sz
@ -574,7 +489,7 @@ func (tx *Tx) write() error {
for i := range buf {
buf[i] = 0
}
tx.db.pagePool.Put(buf)
tx.db.pagePool.Put(buf) //nolint:staticcheck
}
return nil
@ -598,7 +513,7 @@ func (tx *Tx) writeMeta() error {
}
// Update statistics.
tx.stats.Write++
tx.stats.IncWrite(1)
return nil
}
@ -609,26 +524,35 @@ func (tx *Tx) page(id pgid) *page {
// Check the dirty pages first.
if tx.pages != nil {
if p, ok := tx.pages[id]; ok {
p.fastCheck(id)
return p
}
}
// Otherwise return directly from the mmap.
return tx.db.page(id)
p := tx.db.page(id)
p.fastCheck(id)
return p
}
// forEachPage iterates over every page within a given page and executes a function.
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := tx.page(pgid)
func (tx *Tx) forEachPage(pgidnum pgid, fn func(*page, int, []pgid)) {
stack := make([]pgid, 10)
stack[0] = pgidnum
tx.forEachPageInternal(stack[:1], fn)
}
func (tx *Tx) forEachPageInternal(pgidstack []pgid, fn func(*page, int, []pgid)) {
p := tx.page(pgidstack[len(pgidstack)-1])
// Execute function.
fn(p, depth)
fn(p, len(pgidstack)-1, pgidstack)
// Recursively loop over children.
if (p.flags & branchPageFlag) != 0 {
for i := 0; i < int(p.count); i++ {
elem := p.branchPageElement(uint16(i))
tx.forEachPage(elem.pgid, depth+1, fn)
tx.forEachPageInternal(append(pgidstack, elem.pgid), fn)
}
}
}
@ -642,6 +566,10 @@ func (tx *Tx) Page(id int) (*PageInfo, error) {
return nil, nil
}
if tx.db.freelist == nil {
return nil, ErrFreePagesNotLoaded
}
// Build the page info.
p := tx.db.page(pgid(id))
info := &PageInfo{
@ -663,43 +591,61 @@ func (tx *Tx) Page(id int) (*PageInfo, error) {
// TxStats represents statistics about the actions performed by the transaction.
type TxStats struct {
// Page statistics.
PageCount int // number of page allocations
PageAlloc int // total bytes allocated
//
// DEPRECATED: Use GetPageCount() or IncPageCount()
PageCount int64 // number of page allocations
// DEPRECATED: Use GetPageAlloc() or IncPageAlloc()
PageAlloc int64 // total bytes allocated
// Cursor statistics.
CursorCount int // number of cursors created
//
// DEPRECATED: Use GetCursorCount() or IncCursorCount()
CursorCount int64 // number of cursors created
// Node statistics
NodeCount int // number of node allocations
NodeDeref int // number of node dereferences
//
// DEPRECATED: Use GetNodeCount() or IncNodeCount()
NodeCount int64 // number of node allocations
// DEPRECATED: Use GetNodeDeref() or IncNodeDeref()
NodeDeref int64 // number of node dereferences
// Rebalance statistics.
Rebalance int // number of node rebalances
//
// DEPRECATED: Use GetRebalance() or IncRebalance()
Rebalance int64 // number of node rebalances
// DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime()
RebalanceTime time.Duration // total time spent rebalancing
// Split/Spill statistics.
Split int // number of nodes split
Spill int // number of nodes spilled
//
// DEPRECATED: Use GetSplit() or IncSplit()
Split int64 // number of nodes split
// DEPRECATED: Use GetSpill() or IncSpill()
Spill int64 // number of nodes spilled
// DEPRECATED: Use GetSpillTime() or IncSpillTime()
SpillTime time.Duration // total time spent spilling
// Write statistics.
Write int // number of writes performed
//
// DEPRECATED: Use GetWrite() or IncWrite()
Write int64 // number of writes performed
// DEPRECATED: Use GetWriteTime() or IncWriteTime()
WriteTime time.Duration // total time spent writing to disk
}
func (s *TxStats) add(other *TxStats) {
s.PageCount += other.PageCount
s.PageAlloc += other.PageAlloc
s.CursorCount += other.CursorCount
s.NodeCount += other.NodeCount
s.NodeDeref += other.NodeDeref
s.Rebalance += other.Rebalance
s.RebalanceTime += other.RebalanceTime
s.Split += other.Split
s.Spill += other.Spill
s.SpillTime += other.SpillTime
s.Write += other.Write
s.WriteTime += other.WriteTime
s.IncPageCount(other.GetPageCount())
s.IncPageAlloc(other.GetPageAlloc())
s.IncCursorCount(other.GetCursorCount())
s.IncNodeCount(other.GetNodeCount())
s.IncNodeDeref(other.GetNodeDeref())
s.IncRebalance(other.GetRebalance())
s.IncRebalanceTime(other.GetRebalanceTime())
s.IncSplit(other.GetSplit())
s.IncSpill(other.GetSpill())
s.IncSpillTime(other.GetSpillTime())
s.IncWrite(other.GetWrite())
s.IncWriteTime(other.GetWriteTime())
}
// Sub calculates and returns the difference between two sets of transaction stats.
@ -707,17 +653,145 @@ func (s *TxStats) add(other *TxStats) {
// you need the performance counters that occurred within that time span.
func (s *TxStats) Sub(other *TxStats) TxStats {
var diff TxStats
diff.PageCount = s.PageCount - other.PageCount
diff.PageAlloc = s.PageAlloc - other.PageAlloc
diff.CursorCount = s.CursorCount - other.CursorCount
diff.NodeCount = s.NodeCount - other.NodeCount
diff.NodeDeref = s.NodeDeref - other.NodeDeref
diff.Rebalance = s.Rebalance - other.Rebalance
diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
diff.Split = s.Split - other.Split
diff.Spill = s.Spill - other.Spill
diff.SpillTime = s.SpillTime - other.SpillTime
diff.Write = s.Write - other.Write
diff.WriteTime = s.WriteTime - other.WriteTime
diff.PageCount = s.GetPageCount() - other.GetPageCount()
diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
diff.Rebalance = s.GetRebalance() - other.GetRebalance()
diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
diff.Split = s.GetSplit() - other.GetSplit()
diff.Spill = s.GetSpill() - other.GetSpill()
diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
diff.Write = s.GetWrite() - other.GetWrite()
diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
return diff
}
// GetPageCount returns PageCount atomically.
func (s *TxStats) GetPageCount() int64 {
return atomic.LoadInt64(&s.PageCount)
}
// IncPageCount increases PageCount atomically and returns the new value.
func (s *TxStats) IncPageCount(delta int64) int64 {
return atomic.AddInt64(&s.PageCount, delta)
}
// GetPageAlloc returns PageAlloc atomically.
func (s *TxStats) GetPageAlloc() int64 {
return atomic.LoadInt64(&s.PageAlloc)
}
// IncPageAlloc increases PageAlloc atomically and returns the new value.
func (s *TxStats) IncPageAlloc(delta int64) int64 {
return atomic.AddInt64(&s.PageAlloc, delta)
}
// GetCursorCount returns CursorCount atomically.
func (s *TxStats) GetCursorCount() int64 {
return atomic.LoadInt64(&s.CursorCount)
}
// IncCursorCount increases CursorCount atomically and return the new value.
func (s *TxStats) IncCursorCount(delta int64) int64 {
return atomic.AddInt64(&s.CursorCount, delta)
}
// GetNodeCount returns NodeCount atomically.
func (s *TxStats) GetNodeCount() int64 {
return atomic.LoadInt64(&s.NodeCount)
}
// IncNodeCount increases NodeCount atomically and returns the new value.
func (s *TxStats) IncNodeCount(delta int64) int64 {
return atomic.AddInt64(&s.NodeCount, delta)
}
// GetNodeDeref returns NodeDeref atomically.
func (s *TxStats) GetNodeDeref() int64 {
return atomic.LoadInt64(&s.NodeDeref)
}
// IncNodeDeref increases NodeDeref atomically and returns the new value.
func (s *TxStats) IncNodeDeref(delta int64) int64 {
return atomic.AddInt64(&s.NodeDeref, delta)
}
// GetRebalance returns Rebalance atomically.
func (s *TxStats) GetRebalance() int64 {
return atomic.LoadInt64(&s.Rebalance)
}
// IncRebalance increases Rebalance atomically and returns the new value.
func (s *TxStats) IncRebalance(delta int64) int64 {
return atomic.AddInt64(&s.Rebalance, delta)
}
// GetRebalanceTime returns RebalanceTime atomically.
func (s *TxStats) GetRebalanceTime() time.Duration {
return atomicLoadDuration(&s.RebalanceTime)
}
// IncRebalanceTime increases RebalanceTime atomically and returns the new value.
func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.RebalanceTime, delta)
}
// GetSplit returns Split atomically.
func (s *TxStats) GetSplit() int64 {
return atomic.LoadInt64(&s.Split)
}
// IncSplit increases Split atomically and returns the new value.
func (s *TxStats) IncSplit(delta int64) int64 {
return atomic.AddInt64(&s.Split, delta)
}
// GetSpill returns Spill atomically.
func (s *TxStats) GetSpill() int64 {
return atomic.LoadInt64(&s.Spill)
}
// IncSpill increases Spill atomically and returns the new value.
func (s *TxStats) IncSpill(delta int64) int64 {
return atomic.AddInt64(&s.Spill, delta)
}
// GetSpillTime returns SpillTime atomically.
func (s *TxStats) GetSpillTime() time.Duration {
return atomicLoadDuration(&s.SpillTime)
}
// IncSpillTime increases SpillTime atomically and returns the new value.
func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.SpillTime, delta)
}
// GetWrite returns Write atomically.
func (s *TxStats) GetWrite() int64 {
return atomic.LoadInt64(&s.Write)
}
// IncWrite increases Write atomically and returns the new value.
func (s *TxStats) IncWrite(delta int64) int64 {
return atomic.AddInt64(&s.Write, delta)
}
// GetWriteTime returns WriteTime atomically.
func (s *TxStats) GetWriteTime() time.Duration {
return atomicLoadDuration(&s.WriteTime)
}
// IncWriteTime increases WriteTime atomically and returns the new value.
func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.WriteTime, delta)
}
func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
}
func atomicLoadDuration(ptr *time.Duration) time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
}