automatically reparse all messages, in the background, after addition of header fields in the parsed mime form of messages in the message index database

With that recent change, we would keep track of Content-* headers of parsed
messages. We could ask admins to run a command to reparse messages for all
accounts. But instead we just do it automatically when opening the account. We
keep track whether we did the upgrade. And we do it in the background. Those
recent changes were to add optional fields to the IMAP fetch "bodystructure"
responses. There is a small chance that an IMAP client requests these fields
before they are properly populated with the reparse (only existing messages,
new incoming messages are parsed with the new code). We could try to detect
whether the upgrade has completed, and chance IMAP behaviour based on that. But
the complexity and long-term maintenance burden doesn't seem worth it. Worst
case, we'll temporarily claim some relatively unimportant headers aren't
present on a message. Most email clients won't even look at those fields, but
will parse them message themselves instead.
This commit is contained in:
Mechiel Lukkien 2025-04-16 17:37:39 +02:00
parent 07533252b3
commit 31c22618f5
No known key found for this signature in database
4 changed files with 328 additions and 53 deletions

42
ctl.go
View File

@ -1706,8 +1706,6 @@ func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) {
xctl.xwriteok()
xw := xctl.writer()
const batchSize = 100
xreparseAccount := func(accName string) {
acc, err := store.OpenAccount(log, accName, false)
xctl.xcheck(err, "open account")
@ -1716,43 +1714,11 @@ func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) {
log.Check(err, "closing account after reparsing messages")
}()
total := 0
var lastID int64
for {
var n int
// Don't process all message in one transaction, we could block the account for too long.
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[store.Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(batchSize)
q.SortAsc("ID")
return q.ForEach(func(m store.Message) error {
lastID = m.ID
mr := acc.MessageReader(m)
p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
if err != nil {
fmt.Fprintf(xw, "parsing message %d: %v (continuing)\n", m.ID, err)
}
m.ParsedBuf, err = json.Marshal(p)
if err != nil {
return fmt.Errorf("marshal parsed message: %v", err)
}
total++
n++
if err := tx.Update(&m); err != nil {
return fmt.Errorf("update message: %v", err)
}
return nil
})
start := time.Now()
total, err := acc.ReparseMessages(ctx, log)
xctl.xcheck(err, "reparse messages")
})
xctl.xcheck(err, "update messages with parsed mime structure")
if n < batchSize {
break
}
}
fmt.Fprintf(xw, "%d message(s) reparsed for account %s\n", total, accName)
fmt.Fprintf(xw, "%d message(s) reparsed for account %s in %dms\n", total, accName, time.Since(start)/time.Millisecond)
}
if accountOpt != "" {

View File

@ -980,15 +980,19 @@ type Upgrade struct {
MailboxModSeq bool // Whether mailboxes have been assigned modseqs.
MailboxParentID bool // Setting ParentID on mailboxes.
MailboxCounts bool // Global flag about whether we have mailbox flags. Instead of previous per-mailbox boolean.
MessageParseVersion int // If different than latest, all messages will be reparsed.
}
// upgradeInit is the value to for new account database, that don't need any upgrading.
const MessageParseVersionLatest = 1
// upgradeInit is the value for new account database, which don't need any upgrading.
var upgradeInit = Upgrade{
ID: 1, // Singleton.
Threads: 2,
MailboxModSeq: true,
MailboxParentID: true,
MailboxCounts: true,
MessageParseVersion: MessageParseVersionLatest,
}
// InitialUIDValidity returns a UIDValidity used for initializing an account.
@ -1132,6 +1136,8 @@ func openAccount(log mlog.Log, name string) (a *Account, rerr error) {
// or error. Only exported for use by subcommands that verify the database file.
// Almost all account opens must go through OpenAccount/OpenEmail/OpenEmailAuth.
func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, rerr error) {
log = log.With(slog.String("account", accountName))
dbpath := filepath.Join(accountDir, "index.db")
// Create account if it doesn't exist yet.
@ -1501,6 +1507,68 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re
}
}
if up.MessageParseVersion != MessageParseVersionLatest {
log.Debug("upgrade: reparsing message for mime structures for new message parse version", slog.Int("current", up.MessageParseVersion), slog.Int("latest", MessageParseVersionLatest))
// Unless we also need to upgrade threading, we'll be reparsing messages in the
// background so opening of the account is quick.
done := make(chan error, 1)
bg := up.Threads == 2
// Increase account use before holding on to account in background.
// Caller holds the lock. The goroutine below decreases nused by calling
// closeAccount.
acc.nused++
go func() {
start := time.Now()
var rerr error
defer func() {
x := recover()
if x != nil {
rerr = fmt.Errorf("unhandled panic: %v", x)
log.Error("unhandled panic reparsing messages", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Store)
}
if bg && rerr != nil {
log.Errorx("upgrade failed: reparsing message for mime structures for new message parse version", rerr, slog.Duration("duration", time.Since(start)))
}
done <- rerr
// Must be done at end of defer. Our parent context/goroutine has openAccounts lock
// held, so we won't make progress until after the enclosing method has returned.
err := closeAccount(acc)
log.Check(err, "closing account after reparsing messages")
}()
var total int
total, rerr = acc.ReparseMessages(mox.Shutdown, log)
if rerr != nil {
rerr = fmt.Errorf("reparsing messages and updating mime structures in message index: %w", rerr)
return
}
up.MessageParseVersion = MessageParseVersionLatest
rerr = acc.DB.Update(context.TODO(), &up)
if rerr != nil {
rerr = fmt.Errorf("marking latest message parse version: %w", rerr)
return
}
log.Info("upgrade completed: reparsing message for mime structures for new message parse version", slog.Int("total", total), slog.Duration("duration", time.Since(start)))
}()
if !bg {
err := <-done
if err != nil {
return nil, err
}
}
}
if up.Threads == 2 {
close(acc.threadsCompleted)
return acc, nil
@ -1514,11 +1582,11 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re
// Ensure all messages have a MessageID and SubjectBase, which are needed when
// matching threads.
// Then assign messages to threads, in the same way we do during imports.
log.Info("upgrading account for threading, in background", slog.String("account", acc.Name))
log.Info("upgrading account for threading, in background")
go func() {
defer func() {
err := closeAccount(acc)
log.Check(err, "closing use of account after upgrading account storage for threads", slog.String("account", a.Name))
log.Check(err, "closing use of account after upgrading account storage for threads")
// Mark that upgrade has finished, possibly error is indicated in threadsErr.
close(acc.threadsCompleted)
@ -1537,9 +1605,9 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re
err := upgradeThreads(mox.Shutdown, log, acc, up)
if err != nil {
a.threadsErr = err
log.Errorx("upgrading account for threading, aborted", err, slog.String("account", a.Name))
log.Errorx("upgrading account for threading, aborted", err)
} else {
log.Info("upgrading account for threading, completed", slog.String("account", a.Name))
log.Info("upgrading account for threading, completed")
}
}()
return acc, nil

142
store/reparse.go Normal file
View File

@ -0,0 +1,142 @@
package store
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"runtime/debug"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
)
// We process messages in database transactions in batches. Otherwise, for accounts
// with many messages, we would get slowdown with many unwritten blocks in memory.
var reparseMessageBatchSize = 10000
// ReparseMessages reparses all messages, updating the MIME structure in
// Message.ParsedBuf.
//
// Typically called during automatic account upgrade, or manually.
//
// Returns total number of messages, all of which were reparsed.
func (a *Account) ReparseMessages(ctx context.Context, log mlog.Log) (int, error) {
type Result struct {
Message *Message
Buf []byte
Err error
}
// We'll have multiple goroutines that pick up messages to parse. The assumption is
// that reads of messages from disk are the bottleneck.
nprog := 10
work := make(chan *Message, nprog)
results := make(chan Result, nprog)
processMessage := func(m *Message) {
r := Result{Message: m}
defer func() {
x := recover()
if x != nil {
r.Err = fmt.Errorf("unhandled panic parsing message: %v", x)
log.Error("processMessage panic", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Store)
}
results <- r
}()
mr := a.MessageReader(*m)
p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
if err != nil {
// note: p is still set to a usable part
log.Debugx("reparsing message", err, slog.Int64("msgid", m.ID))
}
r.Buf, r.Err = json.Marshal(p)
}
// Start goroutines that parse messages.
for range nprog {
go func() {
for {
m, ok := <-work
if !ok {
return
}
processMessage(m)
}
}()
}
defer close(work) // Stop goroutines when done.
total := 0
var lastID int64 // Each db transaction starts after lastID.
for {
var n int
err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
var busy int
q := bstore.QueryTx[Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.Limit(reparseMessageBatchSize)
q.SortAsc("ID")
err := q.ForEach(func(m Message) error {
lastID = m.ID
n++
for {
select {
case work <- &m:
busy++
return nil
case r := <-results:
busy--
if r.Err != nil {
log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
} else {
if err := tx.Update(r.Message); err != nil {
return fmt.Errorf("update message: %w", err)
}
}
}
}
})
if err != nil {
return fmt.Errorf("reparsing messages: %w", err)
}
// Drain remaining reparses.
for ; busy > 0; busy-- {
r := <-results
if r.Err != nil {
log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
} else {
if err := tx.Update(r.Message); err != nil {
return fmt.Errorf("update message with id %d: %w", r.Message.ID, err)
}
}
}
return nil
})
total += n
if err != nil {
return total, fmt.Errorf("update messages with parsed mime structure: %w", err)
}
log.Debug("reparse message progress", slog.Int("total", total))
if n < reparseMessageBatchSize {
break
}
}
return total, nil
}

99
store/reparse_test.go Normal file
View File

@ -0,0 +1,99 @@
package store
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
)
func TestReparse(t *testing.T) {
log := mlog.New("store", nil)
os.RemoveAll("../testdata/store/data")
mox.ConfigStaticPath = filepath.FromSlash("../testdata/store/mox.conf")
mox.MustLoadConfig(true, false)
err := Init(ctxbg)
tcheck(t, err, "init")
defer func() {
err := Close()
tcheck(t, err, "close")
}()
defer Switchboard()()
orig := reparseMessageBatchSize
reparseMessageBatchSize = 2
defer func() {
reparseMessageBatchSize = orig
}()
acc, err := OpenAccount(log, "mjl", false)
tcheck(t, err, "open account")
// Prepare message to add later.
msgFile, err := CreateMessageTemp(log, "account-test")
tcheck(t, err, "create temp message file")
defer CloseRemoveTempFile(log, msgFile, "temp message file")
msgWriter := message.NewWriter(msgFile)
_, err = msgWriter.Write([]byte(" message"))
tcheck(t, err, "write message")
msgPrefix := []byte("From: <mjl@mox.example\r\nTo: <mjl@mox.example>\r\nCc: <mjl@mox.example>Subject: test\r\nMessage-Id: <m01@mox.example>\r\n\r\n")
m := Message{
Received: time.Now(),
Size: int64(len(msgPrefix)) + msgWriter.Size,
MsgPrefix: msgPrefix,
}
// Add messages.
acc.WithRLock(func() {
conf, _ := acc.Conf()
for range 10 {
nm := m
err := acc.DeliverDestination(log, conf.Destinations["mjl"], &nm, msgFile)
tcheck(t, err, "deliver")
}
})
// Reparse explicitly.
total, err := acc.ReparseMessages(ctxbg, log)
tcheck(t, err, "reparsing messages")
tcompare(t, total, 10)
// Ensure a next reopen will reparse messages in the background.
_, err = bstore.QueryDB[Upgrade](ctxbg, acc.DB).UpdateNonzero(Upgrade{MessageParseVersion: MessageParseVersionLatest + 1})
tcheck(t, err, "change")
// Close account, and wait until really closed.
err = acc.Close()
tcheck(t, err, "closing account")
acc.WaitClosed()
// Reopen account, should trigger reparse. We immediately Close again, account DB
// should be kept open.
acc, err = OpenAccount(log, "mjl", false)
tcheck(t, err, "open account")
err = acc.Close()
tcheck(t, err, "closing account")
acc.WaitClosed()
// Check that the reparse is finished.
acc, err = OpenAccount(log, "mjl", false)
tcheck(t, err, "open account")
for range 10 {
up, err := bstore.QueryDB[Upgrade](ctxbg, acc.DB).Get()
tcheck(t, err, "change")
if up.MessageParseVersion == MessageParseVersionLatest {
break
}
time.Sleep(time.Second / 10)
}
err = acc.Close()
tcheck(t, err, "closing account")
acc.WaitClosed()
}