diff --git a/ctl.go b/ctl.go index 1f66441..8598394 100644 --- a/ctl.go +++ b/ctl.go @@ -1706,8 +1706,6 @@ func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) { xctl.xwriteok() xw := xctl.writer() - const batchSize = 100 - xreparseAccount := func(accName string) { acc, err := store.OpenAccount(log, accName, false) xctl.xcheck(err, "open account") @@ -1716,43 +1714,11 @@ func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) { log.Check(err, "closing account after reparsing messages") }() - total := 0 - var lastID int64 - for { - var n int - // Don't process all message in one transaction, we could block the account for too long. - err := acc.DB.Write(ctx, func(tx *bstore.Tx) error { - q := bstore.QueryTx[store.Message](tx) - q.FilterEqual("Expunged", false) - q.FilterGreater("ID", lastID) - q.Limit(batchSize) - q.SortAsc("ID") - return q.ForEach(func(m store.Message) error { - lastID = m.ID - mr := acc.MessageReader(m) - p, err := message.EnsurePart(log.Logger, false, mr, m.Size) - if err != nil { - fmt.Fprintf(xw, "parsing message %d: %v (continuing)\n", m.ID, err) - } - m.ParsedBuf, err = json.Marshal(p) - if err != nil { - return fmt.Errorf("marshal parsed message: %v", err) - } - total++ - n++ - if err := tx.Update(&m); err != nil { - return fmt.Errorf("update message: %v", err) - } - return nil - }) + start := time.Now() + total, err := acc.ReparseMessages(ctx, log) + xctl.xcheck(err, "reparse messages") - }) - xctl.xcheck(err, "update messages with parsed mime structure") - if n < batchSize { - break - } - } - fmt.Fprintf(xw, "%d message(s) reparsed for account %s\n", total, accName) + fmt.Fprintf(xw, "%d message(s) reparsed for account %s in %dms\n", total, accName, time.Since(start)/time.Millisecond) } if accountOpt != "" { diff --git a/store/account.go b/store/account.go index 2406b2b..b1c4507 100644 --- a/store/account.go +++ b/store/account.go @@ -975,20 +975,24 @@ type Account struct { } type Upgrade struct { - ID byte - Threads byte // 0: None, 1: Adding MessageID's completed, 2: Adding ThreadID's completed. - MailboxModSeq bool // Whether mailboxes have been assigned modseqs. - MailboxParentID bool // Setting ParentID on mailboxes. - MailboxCounts bool // Global flag about whether we have mailbox flags. Instead of previous per-mailbox boolean. + ID byte + Threads byte // 0: None, 1: Adding MessageID's completed, 2: Adding ThreadID's completed. + MailboxModSeq bool // Whether mailboxes have been assigned modseqs. + MailboxParentID bool // Setting ParentID on mailboxes. + MailboxCounts bool // Global flag about whether we have mailbox flags. Instead of previous per-mailbox boolean. + MessageParseVersion int // If different than latest, all messages will be reparsed. } -// upgradeInit is the value to for new account database, that don't need any upgrading. +const MessageParseVersionLatest = 1 + +// upgradeInit is the value for new account database, which don't need any upgrading. var upgradeInit = Upgrade{ - ID: 1, // Singleton. - Threads: 2, - MailboxModSeq: true, - MailboxParentID: true, - MailboxCounts: true, + ID: 1, // Singleton. + Threads: 2, + MailboxModSeq: true, + MailboxParentID: true, + MailboxCounts: true, + MessageParseVersion: MessageParseVersionLatest, } // InitialUIDValidity returns a UIDValidity used for initializing an account. @@ -1132,6 +1136,8 @@ func openAccount(log mlog.Log, name string) (a *Account, rerr error) { // or error. Only exported for use by subcommands that verify the database file. // Almost all account opens must go through OpenAccount/OpenEmail/OpenEmailAuth. func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, rerr error) { + log = log.With(slog.String("account", accountName)) + dbpath := filepath.Join(accountDir, "index.db") // Create account if it doesn't exist yet. @@ -1501,6 +1507,68 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re } } + if up.MessageParseVersion != MessageParseVersionLatest { + log.Debug("upgrade: reparsing message for mime structures for new message parse version", slog.Int("current", up.MessageParseVersion), slog.Int("latest", MessageParseVersionLatest)) + + // Unless we also need to upgrade threading, we'll be reparsing messages in the + // background so opening of the account is quick. + done := make(chan error, 1) + bg := up.Threads == 2 + + // Increase account use before holding on to account in background. + // Caller holds the lock. The goroutine below decreases nused by calling + // closeAccount. + acc.nused++ + + go func() { + start := time.Now() + + var rerr error + defer func() { + x := recover() + if x != nil { + rerr = fmt.Errorf("unhandled panic: %v", x) + log.Error("unhandled panic reparsing messages", slog.Any("err", x)) + debug.PrintStack() + metrics.PanicInc(metrics.Store) + } + + if bg && rerr != nil { + log.Errorx("upgrade failed: reparsing message for mime structures for new message parse version", rerr, slog.Duration("duration", time.Since(start))) + } + done <- rerr + + // Must be done at end of defer. Our parent context/goroutine has openAccounts lock + // held, so we won't make progress until after the enclosing method has returned. + err := closeAccount(acc) + log.Check(err, "closing account after reparsing messages") + }() + + var total int + total, rerr = acc.ReparseMessages(mox.Shutdown, log) + if rerr != nil { + rerr = fmt.Errorf("reparsing messages and updating mime structures in message index: %w", rerr) + return + } + + up.MessageParseVersion = MessageParseVersionLatest + rerr = acc.DB.Update(context.TODO(), &up) + if rerr != nil { + rerr = fmt.Errorf("marking latest message parse version: %w", rerr) + return + } + + log.Info("upgrade completed: reparsing message for mime structures for new message parse version", slog.Int("total", total), slog.Duration("duration", time.Since(start))) + }() + + if !bg { + err := <-done + if err != nil { + return nil, err + } + } + } + if up.Threads == 2 { close(acc.threadsCompleted) return acc, nil @@ -1514,11 +1582,11 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re // Ensure all messages have a MessageID and SubjectBase, which are needed when // matching threads. // Then assign messages to threads, in the same way we do during imports. - log.Info("upgrading account for threading, in background", slog.String("account", acc.Name)) + log.Info("upgrading account for threading, in background") go func() { defer func() { err := closeAccount(acc) - log.Check(err, "closing use of account after upgrading account storage for threads", slog.String("account", a.Name)) + log.Check(err, "closing use of account after upgrading account storage for threads") // Mark that upgrade has finished, possibly error is indicated in threadsErr. close(acc.threadsCompleted) @@ -1537,9 +1605,9 @@ func OpenAccountDB(log mlog.Log, accountDir, accountName string) (a *Account, re err := upgradeThreads(mox.Shutdown, log, acc, up) if err != nil { a.threadsErr = err - log.Errorx("upgrading account for threading, aborted", err, slog.String("account", a.Name)) + log.Errorx("upgrading account for threading, aborted", err) } else { - log.Info("upgrading account for threading, completed", slog.String("account", a.Name)) + log.Info("upgrading account for threading, completed") } }() return acc, nil diff --git a/store/reparse.go b/store/reparse.go new file mode 100644 index 0000000..aa6acb2 --- /dev/null +++ b/store/reparse.go @@ -0,0 +1,142 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "runtime/debug" + + "github.com/mjl-/bstore" + + "github.com/mjl-/mox/message" + "github.com/mjl-/mox/metrics" + "github.com/mjl-/mox/mlog" +) + +// We process messages in database transactions in batches. Otherwise, for accounts +// with many messages, we would get slowdown with many unwritten blocks in memory. +var reparseMessageBatchSize = 10000 + +// ReparseMessages reparses all messages, updating the MIME structure in +// Message.ParsedBuf. +// +// Typically called during automatic account upgrade, or manually. +// +// Returns total number of messages, all of which were reparsed. +func (a *Account) ReparseMessages(ctx context.Context, log mlog.Log) (int, error) { + type Result struct { + Message *Message + Buf []byte + Err error + } + + // We'll have multiple goroutines that pick up messages to parse. The assumption is + // that reads of messages from disk are the bottleneck. + nprog := 10 + work := make(chan *Message, nprog) + results := make(chan Result, nprog) + + processMessage := func(m *Message) { + r := Result{Message: m} + + defer func() { + x := recover() + if x != nil { + r.Err = fmt.Errorf("unhandled panic parsing message: %v", x) + log.Error("processMessage panic", slog.Any("err", x)) + debug.PrintStack() + metrics.PanicInc(metrics.Store) + } + + results <- r + }() + + mr := a.MessageReader(*m) + p, err := message.EnsurePart(log.Logger, false, mr, m.Size) + if err != nil { + // note: p is still set to a usable part + log.Debugx("reparsing message", err, slog.Int64("msgid", m.ID)) + } + r.Buf, r.Err = json.Marshal(p) + } + + // Start goroutines that parse messages. + for range nprog { + go func() { + for { + m, ok := <-work + if !ok { + return + } + + processMessage(m) + } + }() + } + defer close(work) // Stop goroutines when done. + + total := 0 + var lastID int64 // Each db transaction starts after lastID. + for { + var n int + err := a.DB.Write(ctx, func(tx *bstore.Tx) error { + var busy int + + q := bstore.QueryTx[Message](tx) + q.FilterEqual("Expunged", false) + q.FilterGreater("ID", lastID) + q.Limit(reparseMessageBatchSize) + q.SortAsc("ID") + err := q.ForEach(func(m Message) error { + lastID = m.ID + n++ + + for { + select { + case work <- &m: + busy++ + return nil + + case r := <-results: + busy-- + if r.Err != nil { + log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID)) + } else { + if err := tx.Update(r.Message); err != nil { + return fmt.Errorf("update message: %w", err) + } + } + } + } + }) + if err != nil { + return fmt.Errorf("reparsing messages: %w", err) + } + + // Drain remaining reparses. + for ; busy > 0; busy-- { + r := <-results + if r.Err != nil { + log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID)) + } else { + if err := tx.Update(r.Message); err != nil { + return fmt.Errorf("update message with id %d: %w", r.Message.ID, err) + } + } + } + + return nil + }) + total += n + if err != nil { + return total, fmt.Errorf("update messages with parsed mime structure: %w", err) + } + log.Debug("reparse message progress", slog.Int("total", total)) + if n < reparseMessageBatchSize { + break + } + } + + return total, nil +} diff --git a/store/reparse_test.go b/store/reparse_test.go new file mode 100644 index 0000000..3503564 --- /dev/null +++ b/store/reparse_test.go @@ -0,0 +1,99 @@ +package store + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/mjl-/bstore" + + "github.com/mjl-/mox/message" + "github.com/mjl-/mox/mlog" + "github.com/mjl-/mox/mox-" +) + +func TestReparse(t *testing.T) { + log := mlog.New("store", nil) + os.RemoveAll("../testdata/store/data") + mox.ConfigStaticPath = filepath.FromSlash("../testdata/store/mox.conf") + mox.MustLoadConfig(true, false) + err := Init(ctxbg) + tcheck(t, err, "init") + defer func() { + err := Close() + tcheck(t, err, "close") + }() + defer Switchboard()() + + orig := reparseMessageBatchSize + reparseMessageBatchSize = 2 + defer func() { + reparseMessageBatchSize = orig + }() + + acc, err := OpenAccount(log, "mjl", false) + tcheck(t, err, "open account") + + // Prepare message to add later. + msgFile, err := CreateMessageTemp(log, "account-test") + tcheck(t, err, "create temp message file") + defer CloseRemoveTempFile(log, msgFile, "temp message file") + msgWriter := message.NewWriter(msgFile) + _, err = msgWriter.Write([]byte(" message")) + tcheck(t, err, "write message") + + msgPrefix := []byte("From: \r\nCc: Subject: test\r\nMessage-Id: \r\n\r\n") + m := Message{ + Received: time.Now(), + Size: int64(len(msgPrefix)) + msgWriter.Size, + MsgPrefix: msgPrefix, + } + + // Add messages. + acc.WithRLock(func() { + conf, _ := acc.Conf() + for range 10 { + nm := m + err := acc.DeliverDestination(log, conf.Destinations["mjl"], &nm, msgFile) + tcheck(t, err, "deliver") + } + }) + + // Reparse explicitly. + total, err := acc.ReparseMessages(ctxbg, log) + tcheck(t, err, "reparsing messages") + tcompare(t, total, 10) + + // Ensure a next reopen will reparse messages in the background. + _, err = bstore.QueryDB[Upgrade](ctxbg, acc.DB).UpdateNonzero(Upgrade{MessageParseVersion: MessageParseVersionLatest + 1}) + tcheck(t, err, "change") + + // Close account, and wait until really closed. + err = acc.Close() + tcheck(t, err, "closing account") + acc.WaitClosed() + + // Reopen account, should trigger reparse. We immediately Close again, account DB + // should be kept open. + acc, err = OpenAccount(log, "mjl", false) + tcheck(t, err, "open account") + err = acc.Close() + tcheck(t, err, "closing account") + acc.WaitClosed() + + // Check that the reparse is finished. + acc, err = OpenAccount(log, "mjl", false) + tcheck(t, err, "open account") + for range 10 { + up, err := bstore.QueryDB[Upgrade](ctxbg, acc.DB).Get() + tcheck(t, err, "change") + if up.MessageParseVersion == MessageParseVersionLatest { + break + } + time.Sleep(time.Second / 10) + } + err = acc.Close() + tcheck(t, err, "closing account") + acc.WaitClosed() +}