mirror of
https://github.com/mjl-/mox.git
synced 2025-07-12 17:44:35 +03:00
In storage consistency checks, verify the junk filter has the expected word counts
Fix up a test or two. Simplify the XOR logic when we train the junk filter: Only if junk or nonjunk is set, but not when both (or none are set). i.e. when the values aren't the same. Locking the account when we do consistency checks prevents spurious test failures that may have been introduced in the previous commit.
This commit is contained in:
@ -37,6 +37,7 @@ import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"sort"
|
||||
@ -53,6 +54,7 @@ import (
|
||||
|
||||
"github.com/mjl-/mox/config"
|
||||
"github.com/mjl-/mox/dns"
|
||||
"github.com/mjl-/mox/junk"
|
||||
"github.com/mjl-/mox/message"
|
||||
"github.com/mjl-/mox/metrics"
|
||||
"github.com/mjl-/mox/mlog"
|
||||
@ -654,7 +656,7 @@ func (m Message) LoadPart(r io.ReaderAt) (message.Part, error) {
|
||||
func (m Message) NeedsTraining() bool {
|
||||
untrain := m.TrainedJunk != nil
|
||||
untrainJunk := untrain && *m.TrainedJunk
|
||||
train := m.Junk || m.Notjunk && !(m.Junk && m.Notjunk)
|
||||
train := m.Junk != m.Notjunk
|
||||
trainJunk := m.Junk
|
||||
return untrain != train || untrain && train && untrainJunk != trainJunk
|
||||
}
|
||||
@ -1306,6 +1308,7 @@ func (a *Account) Close() error {
|
||||
// - Message ModSeq > 0, CreateSeq > 0, CreateSeq <= ModSeq.
|
||||
// - All messages have a nonzero ThreadID, and no cycles in ThreadParentID, and parent messages the same ThreadParentIDs tail.
|
||||
// - Annotations must have ModSeq > 0, CreateSeq > 0, ModSeq >= CreateSeq.
|
||||
// - Recalculate junk filter (words and counts) and check they are the same.
|
||||
func (a *Account) CheckConsistency() error {
|
||||
var uidErrors []string // With a limit, could be many.
|
||||
var modseqErrors []string // With limit.
|
||||
@ -1315,16 +1318,18 @@ func (a *Account) CheckConsistency() error {
|
||||
var threadAncestorErrors []string // With limit.
|
||||
var errmsgs []string
|
||||
|
||||
err := a.DB.Read(context.Background(), func(tx *bstore.Tx) error {
|
||||
ctx := context.Background()
|
||||
log := mlog.New("store", nil)
|
||||
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
err := a.DB.Read(ctx, func(tx *bstore.Tx) error {
|
||||
nuv := NextUIDValidity{ID: 1}
|
||||
err := tx.Get(&nuv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetching next uid validity: %v", err)
|
||||
}
|
||||
|
||||
// All message id's from database. For checking for unexpected files afterwards.
|
||||
messageIDs := map[int64]struct{}{}
|
||||
|
||||
mailboxes := map[int64]Mailbox{}
|
||||
err = bstore.QueryTx[Mailbox](tx).ForEach(func(mb Mailbox) error {
|
||||
mailboxes[mb.ID] = mb
|
||||
@ -1368,6 +1373,29 @@ func (a *Account) CheckConsistency() error {
|
||||
return fmt.Errorf("checking mailbox annotations: %v", err)
|
||||
}
|
||||
|
||||
// All message id's from database. For checking for unexpected files afterwards.
|
||||
messageIDs := map[int64]struct{}{}
|
||||
|
||||
// If configured, we'll be building up the junk filter for the messages, to compare
|
||||
// against the on-disk junk filter.
|
||||
var jf *junk.Filter
|
||||
conf, _ := a.Conf()
|
||||
if conf.JunkFilter != nil {
|
||||
random := make([]byte, 16)
|
||||
cryptorand.Read(random)
|
||||
dbpath := filepath.Join(mox.DataDirPath("tmp"), fmt.Sprintf("junkfilter-check-%x.db", random))
|
||||
bloompath := filepath.Join(mox.DataDirPath("tmp"), fmt.Sprintf("junkfilter-check-%x.bloom", random))
|
||||
os.MkdirAll(filepath.Dir(dbpath), 0700)
|
||||
defer os.Remove(dbpath)
|
||||
defer os.Remove(bloompath)
|
||||
jf, err = junk.NewFilter(ctx, log, conf.JunkFilter.Params, dbpath, bloompath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("new junk filter: %v", err)
|
||||
}
|
||||
defer jf.Close()
|
||||
}
|
||||
var ntrained int
|
||||
|
||||
counts := map[int64]MailboxCounts{}
|
||||
err = bstore.QueryTx[Message](tx).ForEach(func(m Message) error {
|
||||
mc := counts[m.MailboxID]
|
||||
@ -1387,6 +1415,7 @@ func (a *Account) CheckConsistency() error {
|
||||
if m.Expunged {
|
||||
return nil
|
||||
}
|
||||
|
||||
messageIDs[m.ID] = struct{}{}
|
||||
p := a.MessagePath(m.ID)
|
||||
st, err := os.Stat(p)
|
||||
@ -1419,6 +1448,16 @@ func (a *Account) CheckConsistency() error {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if jf != nil {
|
||||
if m.Junk != m.Notjunk {
|
||||
ntrained++
|
||||
if _, err := a.TrainMessage(ctx, log, jf, m); err != nil {
|
||||
return fmt.Errorf("train message: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@ -1470,6 +1509,45 @@ func (a *Account) CheckConsistency() error {
|
||||
errmsgs = append(errmsgs, errmsg)
|
||||
}
|
||||
|
||||
// Compare on-disk junk filter with our recalculated filter.
|
||||
if jf != nil {
|
||||
load := func(f *junk.Filter) (map[junk.Wordscore]struct{}, error) {
|
||||
words := map[junk.Wordscore]struct{}{}
|
||||
err := bstore.QueryDB[junk.Wordscore](ctx, f.DB()).ForEach(func(w junk.Wordscore) error {
|
||||
if w.Ham != 0 || w.Spam != 0 {
|
||||
words[w] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read junk filter wordscores: %v", err)
|
||||
}
|
||||
return words, nil
|
||||
}
|
||||
if err := jf.Save(); err != nil {
|
||||
return fmt.Errorf("save recalculated junk filter: %v", err)
|
||||
}
|
||||
wordsExp, err := load(jf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read recalculated junk filter: %v", err)
|
||||
}
|
||||
|
||||
ajf, _, err := a.OpenJunkFilter(ctx, log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open account junk filter: %v", err)
|
||||
}
|
||||
defer ajf.Close()
|
||||
wordsGot, err := load(ajf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read account junk filter: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(wordsGot, wordsExp) {
|
||||
errmsg := fmt.Sprintf("unexpected values in junk filter, trained %d of %d\ngot:\n%v\nexpected:\n%v", ntrained, len(messageIDs), wordsGot, wordsExp)
|
||||
errmsgs = append(errmsgs, errmsg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user