Refactor how messages are added to mailboxes

DeliverMessage() is now MessageAdd(), and it takes a Mailbox object that it
modifies but doesn't write to the database (the caller must do it, and plenty
of times can do it more efficiently by doing it once for multiple messages).
The new AddOpts let the caller influence how many checks and how much of the
work MessageAdd() does. The zero-value AddOpts enable all checks and all the
work, but callers can take responsibility of some of the checks/work if it can
do it more efficiently itself.

This simplifies the code in most places, and makes it more efficient. The
checks to update per-mailbox keywords is a bit simpler too now.

We are also more careful to close the junk filter without saving it in case of
errors.

Still part of more upcoming changes.
This commit is contained in:
Mechiel Lukkien
2025-03-01 16:06:01 +01:00
parent 7855a32852
commit 2beb30cc20
13 changed files with 410 additions and 362 deletions

161
import.go
View File

@ -16,12 +16,11 @@ import (
"strings"
"time"
"golang.org/x/exp/maps"
"github.com/mjl-/mox/config"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
"github.com/mjl-/mox/store"
)
@ -237,72 +236,57 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
}
tx, err := a.DB.Begin(ctx, true)
ctl.xcheck(err, "begin transaction")
defer func() {
if tx != nil {
err := tx.Rollback()
ctl.log.Check(err, "rolling back transaction")
}
}()
// All preparations done. Good to go.
ctl.xwriteok()
// We will be delivering messages. If we fail halfway, we need to remove the created msg files.
var deliveredIDs []int64
defer func() {
x := recover()
if x == nil {
return
}
if x != ctl.x {
ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
debug.PrintStack()
metrics.PanicInc(metrics.Import)
} else {
ctl.log.Error("import error")
}
for _, id := range deliveredIDs {
p := a.MessagePath(id)
err := os.Remove(p)
ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
}
ctl.xerror(fmt.Sprintf("import error: %v", x))
}()
var changes []store.Change
var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
xdeliver := func(m *store.Message, mf *os.File) {
// todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
const sync = false
const notrain = true
const nothreads = true
const updateDiskUsage = false
err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
ctl.xcheck(err, "delivering message")
deliveredIDs = append(deliveredIDs, m.ID)
ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
changes = append(changes, m.ChangeAddUID())
}
// todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
n := 0
a.WithWLock(func() {
var changes []store.Change
tx, err := a.DB.Begin(ctx, true)
ctl.xcheck(err, "begin transaction")
defer func() {
if tx != nil {
err := tx.Rollback()
ctl.log.Check(err, "rolling back transaction")
}
}()
// All preparations done. Good to go.
ctl.xwriteok()
// We will be delivering messages. If we fail halfway, we need to remove the created msg files.
var deliveredIDs []int64
defer func() {
x := recover()
if x == nil {
return
}
if x != ctl.x {
ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
debug.PrintStack()
metrics.PanicInc(metrics.Import)
} else {
ctl.log.Error("import error")
}
for _, id := range deliveredIDs {
p := a.MessagePath(id)
err := os.Remove(p)
ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
}
deliveredIDs = nil
ctl.xerror(fmt.Sprintf("import error: %v", x))
}()
var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
// Ensure mailbox exists.
var mb store.Mailbox
mb, changes, err = a.MailboxEnsure(tx, mailbox, true, store.SpecialUse{}, &modseq)
ctl.xcheck(err, "ensuring mailbox exists")
// We ensure keywords in messages make it to the mailbox as well.
mailboxKeywords := map[string]bool{}
nkeywords := len(mb.Keywords)
jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
@ -310,7 +294,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
}
defer func() {
if jf != nil {
err = jf.Close()
err = jf.CloseDiscard()
ctl.xcheck(err, "close junk filter")
}
}()
@ -323,6 +307,8 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
err = tx.Get(&du)
ctl.xcheck(err, "get disk usage")
msgDirs := map[string]struct{}{}
process := func(m *store.Message, msgf *os.File, origPath string) {
defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
@ -331,11 +317,6 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
}
for _, kw := range m.Keywords {
mailboxKeywords[kw] = true
}
mb.Add(m.MailboxCounts())
// Parse message and store parsed information for later fast retrieval.
p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
if err != nil {
@ -344,7 +325,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
m.ParsedBuf, err = json.Marshal(p)
ctl.xcheck(err, "marshal parsed message structure")
// Set fields needed for future threading. By doing it now, DeliverMessage won't
// Set fields needed for future threading. By doing it now, MessageAdd won't
// have to parse the Part again.
p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
m.PrepareThreading(ctl.log, &p)
@ -357,9 +338,6 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
}
}
// We set the flags that Deliver would set now and train ourselves. This prevents
// Deliver from training, which would open the junk filter, change it, and write it
// back to disk, for each message (slow).
m.JunkFlagsForMailbox(mb, conf)
if jf != nil && m.NeedsTraining() {
if words, err := jf.ParseMessage(p); err != nil {
@ -375,13 +353,28 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
var err error
modseq, err = a.NextModSeq(tx)
ctl.xcheck(err, "assigning next modseq")
mb.ModSeq = modseq
}
m.MailboxID = mb.ID
m.MailboxOrigID = mb.ID
m.CreateSeq = modseq
m.ModSeq = modseq
xdeliver(m, msgf)
// todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
opts := store.AddOpts{
SkipDirSync: true,
SkipTraining: true,
SkipThreads: true, // We do this efficiently when we have all messages.
SkipUpdateDiskUsage: true, // We do this once at the end.
SkipCheckQuota: true, // We check before.
}
err = a.MessageAdd(ctl.log, tx, &mb, m, msgf, opts)
ctl.xcheck(err, "delivering message")
deliveredIDs = append(deliveredIDs, m.ID)
changes = append(changes, m.ChangeAddUID())
msgDirs[filepath.Dir(a.MessagePath(m.ID))] = struct{}{}
n++
if n%1000 == 0 {
@ -405,25 +398,27 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) {
ctl.xcheck(err, "assigning messages to threads")
}
// Get mailbox again, uidnext is likely updated.
mc := mb.MailboxCounts
err = tx.Get(&mb)
ctl.xcheck(err, "get mailbox")
mb.MailboxCounts = mc
// If there are any new keywords, update the mailbox.
var mbKwChanged bool
mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
if mbKwChanged {
changes = append(changes, mb.ChangeCounts())
if nkeywords != len(mb.Keywords) {
changes = append(changes, mb.ChangeKeywords())
}
err = tx.Update(&mb)
ctl.xcheck(err, "updating message counts and keywords in mailbox")
changes = append(changes, mb.ChangeCounts())
err = a.AddMessageSize(ctl.log, tx, addSize)
xcheckf(err, "updating total message size")
ctl.xcheck(err, "updating total message size")
for msgDir := range msgDirs {
err := moxio.SyncDir(ctl.log, msgDir)
ctl.xcheck(err, "sync dir")
}
if jf != nil {
err := jf.Close()
ctl.log.Check(err, "close junk filter")
jf = nil
}
err = tx.Commit()
ctl.xcheck(err, "commit")