implement message threading in backend and webmail

we match messages to their parents based on the "references" and "in-reply-to"
headers (requiring the same base subject), and in absense of those headers we
also by only base subject (against messages received max 4 weeks ago).

we store a threadid with messages. all messages in a thread have the same
threadid.  messages also have a "thread parent ids", which holds all id's of
parent messages up to the thread root.  then there is "thread missing link",
which is set when a referenced immediate parent wasn't found (but possibly
earlier ancestors can still be found and will be in thread parent ids".

threads can be muted: newly delivered messages are automatically marked as
read/seen.  threads can be marked as collapsed: if set, the webmail collapses
the thread to a single item in the basic threading view (default is to expand
threads).  the muted and collapsed fields are copied from their parent on
message delivery.

the threading is implemented in the webmail. the non-threading mode still works
as before. the new default threading mode "unread" automatically expands only
the threads with at least one unread (not seen) meessage. the basic threading
mode "on" expands all threads except when explicitly collapsed (as saved in the
thread collapsed field). new shortcuts for navigation/interaction threads have
been added, e.g. go to previous/next thread root, toggle collapse/expand of
thread (or double click), toggle mute of thread. some previous shortcuts have
changed, see the help for details.

the message threading are added with an explicit account upgrade step,
automatically started when an account is opened. the upgrade is done in the
background because it will take too long for large mailboxes to block account
operations. the upgrade takes two steps: 1. updating all message records in the
database to add a normalized message-id and thread base subject (with "re:",
"fwd:" and several other schemes stripped). 2. going through all messages in
the database again, reading the "references" and "in-reply-to" headers from
disk, and matching against their parents. this second step is also done at the
end of each import of mbox/maildir mailboxes. new deliveries are matched
immediately against other existing messages, currently no attempt is made to
rematch previously delivered messages (which could be useful for related
messages being delivered out of order).

the threading is not yet exposed over imap.
This commit is contained in:
Mechiel Lukkien
2023-09-13 08:51:50 +02:00
parent b754b5f9ac
commit 3fb41ff073
44 changed files with 5930 additions and 821 deletions

View File

@ -34,7 +34,9 @@ import (
"io"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -48,6 +50,7 @@ import (
"github.com/mjl-/mox/config"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
@ -444,14 +447,41 @@ type Message struct {
OrigEHLODomain string
OrigDKIMDomains []string
// Value of Message-Id header. Only set for messages that were
// delivered to the rejects mailbox. For ensuring such messages are
// delivered only once. Value includes <>.
// Canonicalized Message-Id, always lower-case and normalized quoting, without
// <>'s. Empty if missing. Used for matching message threads, and to prevent
// duplicate reject delivery.
MessageID string `bstore:"index"`
// lower-case: ../rfc/5256:495
// Hash of message. For rejects delivery, so optional like MessageID.
// For matching threads in case there is no References/In-Reply-To header. It is
// lower-cased, white-space collapsed, mailing list tags and re/fwd tags removed.
SubjectBase string `bstore:"index"`
// ../rfc/5256:90
// Hash of message. For rejects delivery in case there is no Message-ID, only set
// when delivered as reject.
MessageHash []byte
// ID of message starting this thread.
ThreadID int64 `bstore:"index"`
// IDs of parent messages, from closest parent to the root message. Parent messages
// may be in a different mailbox, or may no longer exist. ThreadParentIDs must
// never contain the message id itself (a cycle), and parent messages must
// reference the same ancestors.
ThreadParentIDs []int64
// ThreadMissingLink is true if there is no match with a direct parent. E.g. first
// ID in ThreadParentIDs is not the direct ancestor (an intermediate message may
// have been deleted), or subject-based matching was done.
ThreadMissingLink bool
// If set, newly delivered child messages are automatically marked as read. This
// field is copied to new child messages. Changes are propagated to the webmail
// client.
ThreadMuted bool
// If set, this (sub)thread is collapsed in the webmail client, for threading mode
// "on" (mode "unread" ignores it). This field is copied to new child message.
// Changes are propagated to the webmail client.
ThreadCollapsed bool
Flags
// For keywords other than system flags or the basic well-known $-flags. Only in
// "atom" syntax (IMAP), they are case-insensitive, always stored in lower-case
@ -498,6 +528,10 @@ func (m Message) ChangeFlags(orig Flags) ChangeFlags {
return ChangeFlags{MailboxID: m.MailboxID, UID: m.UID, ModSeq: m.ModSeq, Mask: mask, Flags: m.Flags, Keywords: m.Keywords}
}
func (m Message) ChangeThread() ChangeThread {
return ChangeThread{[]int64{m.ID}, m.ThreadMuted, m.ThreadCollapsed}
}
// ModSeq represents a modseq as stored in the database. ModSeq 0 in the
// database is sent to the client as 1, because modseq 0 is special in IMAP.
// ModSeq coming from the client are of type int64.
@ -533,6 +567,22 @@ func (m *Message) PrepareExpunge() {
}
}
// PrepareThreading sets MessageID and SubjectBase (used in threading) based on the
// envelope in part.
func (m *Message) PrepareThreading(log *mlog.Log, part *message.Part) {
if part.Envelope == nil {
return
}
messageID, raw, err := message.MessageIDCanonical(part.Envelope.MessageID)
if err != nil {
log.Debugx("parsing message-id, ignoring", err, mlog.Field("messageid", part.Envelope.MessageID))
} else if raw {
log.Debug("could not parse message-id as address, continuing with raw value", mlog.Field("messageid", part.Envelope.MessageID))
}
m.MessageID = messageID
m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
}
// LoadPart returns a message.Part by reading from m.ParsedBuf.
func (m Message) LoadPart(r io.ReaderAt) (message.Part, error) {
if m.ParsedBuf == nil {
@ -614,7 +664,7 @@ type Outgoing struct {
}
// Types stored in DB.
var DBTypes = []any{NextUIDValidity{}, Message{}, Recipient{}, Mailbox{}, Subscription{}, Outgoing{}, Password{}, Subjectpass{}, SyncState{}}
var DBTypes = []any{NextUIDValidity{}, Message{}, Recipient{}, Mailbox{}, Subscription{}, Outgoing{}, Password{}, Subjectpass{}, SyncState{}, Upgrade{}}
// Account holds the information about a user, includings mailboxes, messages, imap subscriptions.
type Account struct {
@ -623,6 +673,13 @@ type Account struct {
DBPath string // Path to database with mailboxes, messages, etc.
DB *bstore.DB // Open database connection.
// Channel that is closed if/when account has/gets "threads" accounting (see
// Upgrade.Threads).
threadsCompleted chan struct{}
// If threads upgrade completed with error, this is set. Used for warning during
// delivery, or aborting when importing.
threadsErr error
// Write lock must be held for account/mailbox modifications including message delivery.
// Read lock for reading mailboxes/messages.
// When making changes to mailboxes/messages, changes must be broadcasted before
@ -632,6 +689,11 @@ type Account struct {
nused int // Reference count, while >0, this account is alive and shared.
}
type Upgrade struct {
ID byte
Threads byte // 0: None, 1: Adding MessageID's completed, 2: Adding ThreadID's completed.
}
// InitialUIDValidity returns a UIDValidity used for initializing an account.
// It can be replaced during tests with a predictable value.
var InitialUIDValidity = func() uint32 {
@ -650,6 +712,7 @@ func closeAccount(acc *Account) (rerr error) {
acc.nused--
defer openAccounts.Unlock()
if acc.nused == 0 {
// threadsCompleted must be closed now because it increased nused.
rerr = acc.DB.Close()
acc.DB = nil
delete(openAccounts.names, acc.Name)
@ -714,46 +777,127 @@ func OpenAccountDB(accountDir, accountName string) (a *Account, rerr error) {
}
}()
acc := &Account{
Name: accountName,
Dir: accountDir,
DBPath: dbpath,
DB: db,
nused: 1,
threadsCompleted: make(chan struct{}),
}
if isNew {
if err := initAccount(db); err != nil {
return nil, fmt.Errorf("initializing account: %v", err)
}
} else {
// Ensure mailbox counts are set.
var mentioned bool
err := db.Write(context.TODO(), func(tx *bstore.Tx) error {
return bstore.QueryTx[Mailbox](tx).FilterEqual("HaveCounts", false).ForEach(func(mb Mailbox) error {
if !mentioned {
mentioned = true
xlog.Info("first calculation of mailbox counts for account", mlog.Field("account", accountName))
}
mc, err := mb.CalculateCounts(tx)
if err != nil {
return err
}
mb.HaveCounts = true
mb.MailboxCounts = mc
return tx.Update(&mb)
})
})
if err != nil {
return nil, fmt.Errorf("calculating counts for mailbox: %v", err)
}
close(acc.threadsCompleted)
return acc, nil
}
return &Account{
Name: accountName,
Dir: accountDir,
DBPath: dbpath,
DB: db,
nused: 1,
}, nil
// Ensure mailbox counts are set.
var mentioned bool
err = db.Write(context.TODO(), func(tx *bstore.Tx) error {
return bstore.QueryTx[Mailbox](tx).FilterEqual("HaveCounts", false).ForEach(func(mb Mailbox) error {
if !mentioned {
mentioned = true
xlog.Info("first calculation of mailbox counts for account", mlog.Field("account", accountName))
}
mc, err := mb.CalculateCounts(tx)
if err != nil {
return err
}
mb.HaveCounts = true
mb.MailboxCounts = mc
return tx.Update(&mb)
})
})
if err != nil {
return nil, fmt.Errorf("calculating counts for mailbox: %v", err)
}
// Start adding threading if needed.
up := Upgrade{ID: 1}
err = db.Write(context.TODO(), func(tx *bstore.Tx) error {
err := tx.Get(&up)
if err == bstore.ErrAbsent {
if err := tx.Insert(&up); err != nil {
return fmt.Errorf("inserting initial upgrade record: %v", err)
}
err = nil
}
return err
})
if err != nil {
return nil, fmt.Errorf("checking message threading: %v", err)
}
if up.Threads == 2 {
close(acc.threadsCompleted)
return acc, nil
}
// Increase account use before holding on to account in background.
// Caller holds the lock. The goroutine below decreases nused by calling
// closeAccount.
acc.nused++
// Ensure all messages have a MessageID and SubjectBase, which are needed when
// matching threads.
// Then assign messages to threads, in the same way we do during imports.
xlog.Info("upgrading account for threading, in background", mlog.Field("account", acc.Name))
go func() {
defer func() {
err := closeAccount(acc)
xlog.Check(err, "closing use of account after upgrading account storage for threads", mlog.Field("account", a.Name))
}()
defer func() {
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
xlog.Error("upgradeThreads panic", mlog.Field("err", x))
debug.PrintStack()
metrics.PanicInc("upgradeThreads")
acc.threadsErr = fmt.Errorf("panic during upgradeThreads: %v", x)
}
// Mark that upgrade has finished, possibly error is indicated in threadsErr.
close(acc.threadsCompleted)
}()
err := upgradeThreads(mox.Shutdown, acc, &up)
if err != nil {
a.threadsErr = err
xlog.Errorx("upgrading account for threading, aborted", err, mlog.Field("account", a.Name))
} else {
xlog.Info("upgrading account for threading, completed", mlog.Field("account", a.Name))
}
}()
return acc, nil
}
// ThreadingWait blocks until the one-time account threading upgrade for the
// account has completed, and returns an error if not successful.
//
// To be used before starting an import of messages.
func (a *Account) ThreadingWait(log *mlog.Log) error {
select {
case <-a.threadsCompleted:
return a.threadsErr
default:
}
log.Debug("waiting for account upgrade to complete")
<-a.threadsCompleted
return a.threadsErr
}
func initAccount(db *bstore.DB) error {
return db.Write(context.TODO(), func(tx *bstore.Tx) error {
uidvalidity := InitialUIDValidity()
if err := tx.Insert(&Upgrade{ID: 1, Threads: 2}); err != nil {
return err
}
if len(mox.Conf.Static.DefaultMailboxes) > 0 {
// Deprecated in favor of InitialMailboxes.
defaultMailboxes := mox.Conf.Static.DefaultMailboxes
@ -862,10 +1006,14 @@ func (a *Account) Close() error {
// - Message with UID >= mailbox uid next.
// - Mailbox uidvalidity >= account uid validity.
// - ModSeq > 0, CreateSeq > 0, CreateSeq <= ModSeq.
// - All messages have a nonzero ThreadID, and no cycles in ThreadParentID, and parent messages the same ThreadParentIDs tail.
func (a *Account) CheckConsistency() error {
var uiderrors []string // With a limit, could be many.
var modseqerrors []string // With limit.
var fileerrors []string // With limit.
var uidErrors []string // With a limit, could be many.
var modseqErrors []string // With limit.
var fileErrors []string // With limit.
var threadidErrors []string // With limit.
var threadParentErrors []string // With limit.
var threadAncestorErrors []string // With limit.
var errors []string
err := a.DB.Read(context.Background(), func(tx *bstore.Tx) error {
@ -897,13 +1045,13 @@ func (a *Account) CheckConsistency() error {
mb := mailboxes[m.MailboxID]
if (m.ModSeq == 0 || m.CreateSeq == 0 || m.CreateSeq > m.ModSeq) && len(modseqerrors) < 20 {
if (m.ModSeq == 0 || m.CreateSeq == 0 || m.CreateSeq > m.ModSeq) && len(modseqErrors) < 20 {
modseqerr := fmt.Sprintf("message %d in mailbox %q (id %d) has invalid modseq %d or createseq %d, both must be > 0 and createseq <= modseq", m.ID, mb.Name, mb.ID, m.ModSeq, m.CreateSeq)
modseqerrors = append(modseqerrors, modseqerr)
modseqErrors = append(modseqErrors, modseqerr)
}
if m.UID >= mb.UIDNext && len(uiderrors) < 20 {
if m.UID >= mb.UIDNext && len(uidErrors) < 20 {
uiderr := fmt.Sprintf("message %d in mailbox %q (id %d) has uid %d >= mailbox uidnext %d", m.ID, mb.Name, mb.ID, m.UID, mb.UIDNext)
uiderrors = append(uiderrors, uiderr)
uidErrors = append(uidErrors, uiderr)
}
if m.Expunged {
return nil
@ -912,10 +1060,32 @@ func (a *Account) CheckConsistency() error {
st, err := os.Stat(p)
if err != nil {
existserr := fmt.Sprintf("message %d in mailbox %q (id %d) on-disk file %s: %v", m.ID, mb.Name, mb.ID, p, err)
fileerrors = append(fileerrors, existserr)
} else if len(fileerrors) < 20 && m.Size != int64(len(m.MsgPrefix))+st.Size() {
fileErrors = append(fileErrors, existserr)
} else if len(fileErrors) < 20 && m.Size != int64(len(m.MsgPrefix))+st.Size() {
sizeerr := fmt.Sprintf("message %d in mailbox %q (id %d) has size %d != len msgprefix %d + on-disk file size %d = %d", m.ID, mb.Name, mb.ID, m.Size, len(m.MsgPrefix), st.Size(), int64(len(m.MsgPrefix))+st.Size())
fileerrors = append(fileerrors, sizeerr)
fileErrors = append(fileErrors, sizeerr)
}
if m.ThreadID <= 0 && len(threadidErrors) < 20 {
err := fmt.Sprintf("message %d in mailbox %q (id %d) has threadid 0", m.ID, mb.Name, mb.ID)
threadidErrors = append(threadidErrors, err)
}
if slices.Contains(m.ThreadParentIDs, m.ID) && len(threadParentErrors) < 20 {
err := fmt.Sprintf("message %d in mailbox %q (id %d) references itself in threadparentids", m.ID, mb.Name, mb.ID)
threadParentErrors = append(threadParentErrors, err)
}
for i, pid := range m.ThreadParentIDs {
am := Message{ID: pid}
if err := tx.Get(&am); err == bstore.ErrAbsent {
continue
} else if err != nil {
return fmt.Errorf("get ancestor message: %v", err)
} else if !slices.Equal(m.ThreadParentIDs[i+1:], am.ThreadParentIDs) && len(threadAncestorErrors) < 20 {
err := fmt.Sprintf("message %d, thread %d has ancestor ids %v, and ancestor at index %d with id %d should have the same tail but has %v\n", m.ID, m.ThreadID, m.ThreadParentIDs, i, am.ID, am.ThreadParentIDs)
threadAncestorErrors = append(threadAncestorErrors, err)
} else {
break
}
}
return nil
})
@ -938,9 +1108,12 @@ func (a *Account) CheckConsistency() error {
if err != nil {
return err
}
errors = append(errors, uiderrors...)
errors = append(errors, modseqerrors...)
errors = append(errors, fileerrors...)
errors = append(errors, uidErrors...)
errors = append(errors, modseqErrors...)
errors = append(errors, fileErrors...)
errors = append(errors, threadidErrors...)
errors = append(errors, threadParentErrors...)
errors = append(errors, threadAncestorErrors...)
if len(errors) > 0 {
return fmt.Errorf("%s", strings.Join(errors, "; "))
}
@ -1031,7 +1204,7 @@ func (a *Account) WithRLock(fn func()) {
// Caller must broadcast new message.
//
// Caller must update mailbox counts.
func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, consumeFile, sync, notrain bool) error {
func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFile *os.File, consumeFile, sync, notrain, nothreads bool) error {
if m.Expunged {
return fmt.Errorf("cannot deliver expunged message")
}
@ -1049,9 +1222,9 @@ func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFi
conf, _ := a.Conf()
m.JunkFlagsForMailbox(mb.Name, conf)
mr := FileMsgReader(m.MsgPrefix, msgFile) // We don't close, it would close the msgFile.
var part *message.Part
if m.ParsedBuf == nil {
mr := FileMsgReader(m.MsgPrefix, msgFile) // We don't close, it would close the msgFile.
p, err := message.EnsurePart(log, false, mr, m.Size)
if err != nil {
log.Infox("parsing delivered message", err, mlog.Field("parse", ""), mlog.Field("message", m.ID))
@ -1063,6 +1236,13 @@ func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFi
return fmt.Errorf("marshal parsed message: %w", err)
}
m.ParsedBuf = buf
} else {
var p message.Part
if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
log.Errorx("unmarshal parsed message, continuing", err, mlog.Field("parse", ""))
} else {
part = &p
}
}
// If we are delivering to the originally intended mailbox, no need to store the mailbox ID again.
@ -1078,52 +1258,73 @@ func (a *Account) DeliverMessage(log *mlog.Log, tx *bstore.Tx, m *Message, msgFi
m.ModSeq = modseq
}
if part != nil && m.MessageID == "" && m.SubjectBase == "" {
m.PrepareThreading(log, part)
}
// Assign to thread (if upgrade has completed).
noThreadID := nothreads
if m.ThreadID == 0 && !nothreads && part != nil {
select {
case <-a.threadsCompleted:
if a.threadsErr != nil {
log.Info("not assigning threads for new delivery, upgrading to threads failed")
noThreadID = true
} else {
if err := assignThread(log, tx, m, part); err != nil {
return fmt.Errorf("assigning thread: %w", err)
}
}
default:
// note: since we have a write transaction to get here, we can't wait for the
// thread upgrade to finish.
// If we don't assign a threadid the upgrade process will do it.
log.Info("not assigning threads for new delivery, upgrading to threads in progress which will assign this message")
noThreadID = true
}
}
if err := tx.Insert(m); err != nil {
return fmt.Errorf("inserting message: %w", err)
}
if !noThreadID && m.ThreadID == 0 {
m.ThreadID = m.ID
if err := tx.Update(m); err != nil {
return fmt.Errorf("updating message for its own thread id: %w", err)
}
}
// todo: perhaps we should match the recipients based on smtp submission and a matching message-id? we now miss the addresses in bcc's. for webmail, we could insert the recipients directly.
if mb.Sent {
// Attempt to parse the message for its To/Cc/Bcc headers, which we insert into Recipient.
if part == nil {
var p message.Part
if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
log.Errorx("unmarshal parsed message for its to,cc,bcc headers, continuing", err, mlog.Field("parse", ""))
} else {
part = &p
}
if mb.Sent && part != nil && part.Envelope != nil {
e := part.Envelope
sent := e.Date
if sent.IsZero() {
sent = m.Received
}
if part != nil && part.Envelope != nil {
e := part.Envelope
sent := e.Date
if sent.IsZero() {
sent = m.Received
if sent.IsZero() {
sent = time.Now()
}
addrs := append(append(e.To, e.CC...), e.BCC...)
for _, addr := range addrs {
if addr.User == "" {
// Would trigger error because Recipient.Localpart must be nonzero. todo: we could allow empty localpart in db, and filter by not using FilterNonzero.
log.Info("to/cc/bcc address with empty localpart, not inserting as recipient", mlog.Field("address", addr))
continue
}
if sent.IsZero() {
sent = time.Now()
d, err := dns.ParseDomain(addr.Host)
if err != nil {
log.Debugx("parsing domain in to/cc/bcc address", err, mlog.Field("address", addr))
continue
}
addrs := append(append(e.To, e.CC...), e.BCC...)
for _, addr := range addrs {
if addr.User == "" {
// Would trigger error because Recipient.Localpart must be nonzero. todo: we could allow empty localpart in db, and filter by not using FilterNonzero.
log.Info("to/cc/bcc address with empty localpart, not inserting as recipient", mlog.Field("address", addr))
continue
}
d, err := dns.ParseDomain(addr.Host)
if err != nil {
log.Debugx("parsing domain in to/cc/bcc address", err, mlog.Field("address", addr))
continue
}
mr := Recipient{
MessageID: m.ID,
Localpart: smtp.Localpart(addr.User),
Domain: d.Name(),
OrgDomain: publicsuffix.Lookup(context.TODO(), d).Name(),
Sent: sent,
}
if err := tx.Insert(&mr); err != nil {
return fmt.Errorf("inserting sent message recipients: %w", err)
}
mr := Recipient{
MessageID: m.ID,
Localpart: smtp.Localpart(addr.User),
Domain: d.Name(),
OrgDomain: publicsuffix.Lookup(context.TODO(), d).Name(),
Sent: sent,
}
if err := tx.Insert(&mr); err != nil {
return fmt.Errorf("inserting sent message recipients: %w", err)
}
}
}
@ -1440,7 +1641,7 @@ ruleset:
// MessagePath returns the file system path of a message.
func (a *Account) MessagePath(messageID int64) string {
return filepath.Join(a.Dir, "msg", MessagePath(messageID))
return strings.Join(append([]string{a.Dir, "msg"}, messagePathElems(messageID)...), "/")
}
// MessageReader opens a message for reading, transparently combining the
@ -1489,7 +1690,7 @@ func (a *Account) DeliverMailbox(log *mlog.Log, mailbox string, m *Message, msgF
return fmt.Errorf("updating mailbox for delivery: %w", err)
}
if err := a.DeliverMessage(log, tx, m, msgFile, consumeFile, true, false); err != nil {
if err := a.DeliverMessage(log, tx, m, msgFile, consumeFile, true, false, false); err != nil {
return err
}
@ -1773,6 +1974,12 @@ const msgDirChars = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVW
// MessagePath returns the filename of the on-disk filename, relative to the containing directory such as <account>/msg or queue.
// Returns names like "AB/1".
func MessagePath(messageID int64) string {
return strings.Join(messagePathElems(messageID), "/")
}
// messagePathElems returns the elems, for a single join without intermediate
// string allocations.
func messagePathElems(messageID int64) []string {
v := messageID >> 13 // 8k files per directory.
dir := ""
for {
@ -1782,7 +1989,7 @@ func MessagePath(messageID int64) string {
break
}
}
return fmt.Sprintf("%s/%d", dir, messageID)
return []string{dir, strconv.FormatInt(messageID, 10)}
}
// Set returns a copy of f, with each flag that is true in mask set to the

View File

@ -58,6 +58,8 @@ func TestMailbox(t *testing.T) {
MsgPrefix: msgPrefix,
}
msent := m
m.ThreadMuted = true
m.ThreadCollapsed = true
var mbsent Mailbox
mbrejects := Mailbox{Name: "Rejects", UIDValidity: 1, UIDNext: 1, HaveCounts: true}
mreject := m
@ -77,8 +79,11 @@ func TestMailbox(t *testing.T) {
tcheck(t, err, "sent mailbox")
msent.MailboxID = mbsent.ID
msent.MailboxOrigID = mbsent.ID
err = acc.DeliverMessage(xlog, tx, &msent, msgFile, false, true, false)
err = acc.DeliverMessage(xlog, tx, &msent, msgFile, false, true, false, false)
tcheck(t, err, "deliver message")
if !msent.ThreadMuted || !msent.ThreadCollapsed {
t.Fatalf("thread muted & collapsed should have been copied from parent (duplicate message-id) m")
}
err = tx.Get(&mbsent)
tcheck(t, err, "get mbsent")
@ -90,7 +95,7 @@ func TestMailbox(t *testing.T) {
tcheck(t, err, "insert rejects mailbox")
mreject.MailboxID = mbrejects.ID
mreject.MailboxOrigID = mbrejects.ID
err = acc.DeliverMessage(xlog, tx, &mreject, msgFile, false, true, false)
err = acc.DeliverMessage(xlog, tx, &mreject, msgFile, false, true, false, false)
tcheck(t, err, "deliver message")
err = tx.Get(&mbrejects)

View File

@ -50,6 +50,13 @@ type ChangeFlags struct {
Keywords []string // Non-system/well-known flags/keywords/labels.
}
// ChangeThread is sent when muted/collapsed changes.
type ChangeThread struct {
MessageIDs []int64
Muted bool
Collapsed bool
}
// ChangeRemoveMailbox is sent for a removed mailbox.
type ChangeRemoveMailbox struct {
MailboxID int64

775
store/threads.go Normal file
View File

@ -0,0 +1,775 @@
package store
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"runtime"
"sort"
"time"
"golang.org/x/exp/slices"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/message"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/moxio"
)
// Assign a new/incoming message to a thread. Message does not yet have an ID. If
// this isn't a response, ThreadID should remain 0 (unless this is a message with
// existing message-id) and the caller must set ThreadID to ID.
// If the account is still busy upgrading messages with threadids in the background, parents
// may have a threadid 0. That results in this message getting threadid 0, which
// will handled by the background upgrade process assigning a threadid when it gets
// to this message.
func assignThread(log *mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
if m.MessageID != "" {
// Match against existing different message with same Message-ID.
q := bstore.QueryTx[Message](tx)
q.FilterNonzero(Message{MessageID: m.MessageID})
q.FilterEqual("Expunged", false)
q.FilterNotEqual("ID", m.ID)
q.FilterNotEqual("ThreadID", int64(0))
q.SortAsc("ID")
q.Limit(1)
em, err := q.Get()
if err != nil && err != bstore.ErrAbsent {
return fmt.Errorf("looking up existing message with message-id: %v", err)
} else if err == nil {
assignParent(m, em, true)
return nil
}
}
h, err := part.Header()
if err != nil {
log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, mlog.Field("msgid", m.ID))
}
messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
if err != nil {
log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, mlog.Field("msgid", m.ID))
}
for i := len(messageIDs) - 1; i >= 0; i-- {
messageID := messageIDs[i]
if messageID == m.MessageID {
continue
}
tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
if err != nil {
return fmt.Errorf("looking up thread message for new message: %v", err)
} else if tm != nil {
assignParent(m, *tm, true)
return nil
}
m.ThreadMissingLink = true
}
if len(messageIDs) > 0 {
return nil
}
var isResp bool
if part != nil && part.Envelope != nil {
m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
}
if !isResp || m.SubjectBase == "" {
return nil
}
m.ThreadMissingLink = true
tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
if err != nil {
return fmt.Errorf("looking up thread message by subject: %v", err)
} else if tm != nil {
assignParent(m, *tm, true)
}
return nil
}
// assignParent assigns threading fields to m that make it a child of parent message pm.
// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
func assignParent(m *Message, pm Message, updateSeen bool) {
if pm.ThreadID == 0 {
panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
}
if m.ID == pm.ID {
panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
}
m.ThreadID = pm.ThreadID
// Make sure we don't add cycles.
if !slices.Contains(pm.ThreadParentIDs, m.ID) {
m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
} else if pm.ID != m.ID {
m.ThreadParentIDs = []int64{pm.ID}
} else {
m.ThreadParentIDs = nil
}
if m.MessageID != "" && m.MessageID == pm.MessageID {
m.ThreadMissingLink = true
}
m.ThreadMuted = pm.ThreadMuted
m.ThreadCollapsed = pm.ThreadCollapsed
if updateSeen && m.ThreadMuted {
m.Seen = true
}
}
// ResetThreading resets the MessageID and SubjectBase fields for all messages in
// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
// are made in transactions of batchSize changes. The total number of updated
// messages is returned.
//
// ModSeq is not changed. Calles should bump the uid validity of the mailboxes
// to propagate the changes to IMAP clients.
func (a *Account) ResetThreading(ctx context.Context, log *mlog.Log, batchSize int, clearIDs bool) (int, error) {
// todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?
var lastID int64
total := 0
for {
n := 0
prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
for {
w, ok := <-in
if !ok {
return
}
m := w.In
// We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
// struct so we don't generate so much garbage for the garbage collector to sift
// through.
var part struct {
Envelope *message.Envelope
}
if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, mlog.Field("msgid", m.ID))
} else {
m.MessageID = ""
if part.Envelope != nil && part.Envelope.MessageID != "" {
s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
if err != nil {
log.Debugx("parsing message-id, skipping", err, mlog.Field("msgid", m.ID), mlog.Field("messageid", part.Envelope.MessageID))
}
m.MessageID = s
}
if part.Envelope != nil {
m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
}
}
w.Out = m
out <- w
}
}
err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
processMessage := func(in, m Message) error {
if clearIDs {
m.ThreadID = 0
m.ThreadParentIDs = nil
m.ThreadMissingLink = false
}
return tx.Update(&m)
}
// JSON parsing is relatively heavy, we benefit from multiple goroutines.
procs := runtime.GOMAXPROCS(0)
wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)
q := bstore.QueryTx[Message](tx)
q.FilterEqual("Expunged", false)
q.FilterGreater("ID", lastID)
q.SortAsc("ID")
err := q.ForEach(func(m Message) error {
// We process in batches so we don't block other operations for a long time.
if n >= batchSize {
return bstore.StopForEach
}
// Update starting point for next batch.
lastID = m.ID
n++
return wq.Add(m)
})
if err == nil {
err = wq.Finish()
}
wq.Stop()
return err
})
if err != nil {
return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
}
total += n
if n == 0 {
break
}
}
return total, nil
}
// AssignThreads assigns thread-related fields to messages with ID >=
// startMessageID. Changes are committed each batchSize changes if txOpt is nil
// (i.e. during automatic account upgrade, we don't want to block database access
// for a long time). If txOpt is not nil, all changes are made in that
// transaction.
//
// When resetting thread assignments, the caller must first clear the existing
// thread fields.
//
// Messages are processed in order of ID, so when added to the account, not
// necessarily by received/date. Most threaded messages can immediately be matched
// to their parent message. If not, we keep track of the missing message-id and
// resolve as soon as we encounter it. At the end, we resolve all remaining
// messages, they start with a cycle.
//
// Does not set Seen flag for muted threads.
//
// Progress is written to progressWriter, every 100k messages.
func (a *Account) AssignThreads(ctx context.Context, log *mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
// We use a more basic version of the thread-matching algorithm describe in:
// ../rfc/5256:443
// The algorithm assumes you'll select messages, then group into threads. We normally do
// thread-calculation when messages are delivered. Here, we assign threads as soon
// as we can, but will queue messages that reference known ancestors and resolve as
// soon as we process them. We can handle large number of messages, but not very
// quickly because we make lots of database queries.
type childMsg struct {
ID int64 // This message will be fetched and updated with the threading fields once the parent is resolved.
MessageID string // Of child message. Once child is resolved, its own children can be resolved too.
ThreadMissingLink bool
}
// Messages that have a References/In-Reply-To that we want to set as parent, but
// where the parent doesn't have a ThreadID yet are added to pending. The key is
// the normalized MessageID of the parent, and the value is a list of messages that
// can get resolved once the parent gets its ThreadID. The kids will get the same
// ThreadIDs, and they themselves may be parents to kids, and so on.
// For duplicate messages (messages with identical Message-ID), the second
// Message-ID to be added to pending is added under its own message-id, so it gets
// its original as parent.
pending := map[string][]childMsg{}
// Current tx. If not equal to txOpt, we clean it up before we leave.
var tx *bstore.Tx
defer func() {
if tx != nil && tx != txOpt {
err := tx.Rollback()
log.Check(err, "rolling back transaction")
}
}()
// Set thread-related fields for a single message. Caller must save the message,
// only if not an error and not added to the pending list.
assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
if m.MessageID != "" {
// Attempt to match against existing different message with same Message-ID that
// already has a threadid.
// If there are multiple messages for a message-id a future call to assign may use
// its threadid, or it may end up in pending and we resolve it when we need to.
q := bstore.QueryTx[Message](tx)
q.FilterNonzero(Message{MessageID: m.MessageID})
q.FilterEqual("Expunged", false)
q.FilterLess("ID", m.ID)
q.SortAsc("ID")
q.Limit(1)
em, err := q.Get()
if err != nil && err != bstore.ErrAbsent {
return false, fmt.Errorf("looking up existing message with message-id: %v", err)
} else if err == nil {
if em.ThreadID == 0 {
pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
return true, nil
} else {
assignParent(m, em, false)
return false, nil
}
}
}
refids, err := message.ReferencedIDs(references, inReplyTo)
if err != nil {
log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, mlog.Field("msgid", m.ID))
}
for i := len(refids) - 1; i >= 0; i-- {
messageID := refids[i]
if messageID == m.MessageID {
continue
}
tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
if err != nil {
return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
} else if tm != nil {
assignParent(m, *tm, false)
return false, nil
} else if exists {
pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
return true, nil
}
}
var subjectBase string
var isResp bool
if subject != "" {
subjectBase, isResp = message.ThreadSubject(subject, false)
}
if len(refids) > 0 || !isResp || subjectBase == "" {
m.ThreadID = m.ID
m.ThreadMissingLink = len(refids) > 0
return false, nil
}
// No references to use. If this is a reply/forward (based on subject), we'll match
// against base subject, at most 4 weeks back so we don't match against ancient
// messages and 1 day ahead so we can match against delayed deliveries.
tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
if err != nil {
return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
} else if tm != nil {
m.ThreadID = tm.ThreadID
m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
m.ThreadMissingLink = true
m.ThreadMuted = tm.ThreadMuted
m.ThreadCollapsed = tm.ThreadCollapsed
} else {
m.ThreadID = m.ID
}
return false, nil
}
npendingResolved := 0
// Resolve pending messages that wait on m.MessageID to be resolved, recursively.
var resolvePending func(tm Message, cyclic bool) error
resolvePending = func(tm Message, cyclic bool) error {
if tm.MessageID == "" {
return nil
}
l := pending[tm.MessageID]
delete(pending, tm.MessageID)
for _, mi := range l {
m := Message{ID: mi.ID}
if err := tx.Get(&m); err != nil {
return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
}
if m.ThreadID != 0 {
// ThreadID already set because this is a cyclic message. If we would assign a
// parent again, we would create a cycle.
if m.MessageID != tm.MessageID && !cyclic {
panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
}
continue
}
assignParent(&m, tm, false)
m.ThreadMissingLink = mi.ThreadMissingLink
if err := tx.Update(&m); err != nil {
return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
}
if err := resolvePending(m, cyclic); err != nil {
return err
}
npendingResolved++
}
return nil
}
// Output of the worker goroutines.
type threadPrep struct {
references []string
inReplyTo []string
subject string
}
// Single allocation.
threadingFields := [][]byte{
[]byte("references"),
[]byte("in-reply-to"),
[]byte("subject"),
}
// Worker goroutine function. We start with a reasonably large buffer for reading
// the header into. And we have scratch space to copy the needed headers into. That
// means we normally won't allocate any more buffers.
prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
headerbuf := make([]byte, 8*1024)
scratch := make([]byte, 4*1024)
for {
w, ok := <-in
if !ok {
return
}
m := w.In
var partialPart struct {
HeaderOffset int64
BodyOffset int64
}
if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
w.Err = fmt.Errorf("unmarshal part: %v", err)
} else {
size := partialPart.BodyOffset - partialPart.HeaderOffset
if int(size) > len(headerbuf) {
headerbuf = make([]byte, size)
}
if size > 0 {
buf := headerbuf[:int(size)]
err := func() error {
mr := a.MessageReader(m)
defer mr.Close()
// ReadAt returns whole buffer or error. Single read should be fast.
n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
if err != nil || n != len(buf) {
return fmt.Errorf("read header: %v", err)
}
return nil
}()
if err != nil {
w.Err = err
} else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
w.Err = err
} else {
w.Out.references = h["References"]
w.Out.inReplyTo = h["In-Reply-To"]
l := h["Subject"]
if len(l) > 0 {
w.Out.subject = l[0]
}
}
}
}
out <- w
}
}
// Assign threads to messages, possibly in batches.
nassigned := 0
for {
n := 0
tx = txOpt
if tx == nil {
var err error
tx, err = a.DB.Begin(ctx, true)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
}
processMessage := func(m Message, prep threadPrep) error {
pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
if err != nil {
return fmt.Errorf("for msgid %d: %w", m.ID, err)
} else if pend {
return nil
}
if m.ThreadID == 0 {
panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
}
// Fields have been set, store in database and resolve messages waiting for this MessageID.
if slices.Contains(m.ThreadParentIDs, m.ID) {
panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
}
if err := tx.Update(&m); err != nil {
return err
}
if err := resolvePending(m, false); err != nil {
return fmt.Errorf("resolving pending message-id: %v", err)
}
return nil
}
// Use multiple worker goroutines to read parse headers from on-disk messages.
procs := runtime.GOMAXPROCS(0)
wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
// We assign threads in order by ID, so messages delivered in between our
// transaction will get assigned threads too: they'll have the highest id's.
q := bstore.QueryTx[Message](tx)
q.FilterGreaterEqual("ID", startMessageID)
q.FilterEqual("Expunged", false)
q.SortAsc("ID")
err := q.ForEach(func(m Message) error {
// Batch number of changes, so we give other users of account a change to run.
if txOpt == nil && n >= batchSize {
return bstore.StopForEach
}
// Starting point for next batch.
startMessageID = m.ID + 1
// Don't process again. Can happen when earlier upgrade was aborted.
if m.ThreadID != 0 {
return nil
}
n++
return wq.Add(m)
})
if err == nil {
err = wq.Finish()
}
wq.Stop()
if err == nil && txOpt == nil {
err = tx.Commit()
tx = nil
}
if err != nil {
return fmt.Errorf("assigning threads: %w", err)
}
if n == 0 {
break
}
nassigned += n
if nassigned%100000 == 0 {
log.Debug("assigning threads, progress", mlog.Field("count", nassigned), mlog.Field("unresolved", len(pending)))
if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
return fmt.Errorf("writing progress: %v", err)
}
}
}
if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
return fmt.Errorf("writing progress: %v", err)
}
log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", mlog.Field("count", nassigned), mlog.Field("unresolved", len(pending)))
if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
return fmt.Errorf("writing progress: %v", err)
}
// Remaining messages in pending have cycles and possibly tails. The cycle is at
// the head of the thread. Once we resolve that, the rest of the thread can be
// resolved too. Ignoring self-references (duplicate messages), there can only be
// one cycle, and it is at the head. So we look for cycles, ignoring
// self-references, and resolve a message as soon as we see the cycle.
parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
pendlist := []string{}
for pmsgid, l := range pending {
pendlist = append(pendlist, pmsgid)
for _, k := range l {
if k.MessageID == pmsgid {
// No self-references for duplicate messages.
continue
}
if _, ok := parent[k.MessageID]; !ok {
parent[k.MessageID] = pmsgid
}
// else, this message should be resolved by following pending.
}
}
sort.Strings(pendlist)
tx = txOpt
if tx == nil {
var err error
tx, err = a.DB.Begin(ctx, true)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
}
// We walk through all messages of pendlist, but some will already have been
// resolved by the time we get to them.
done := map[string]bool{}
for _, msgid := range pendlist {
if done[msgid] {
continue
}
// We walk up to parent, until we see a message-id we've already seen, a cycle.
seen := map[string]bool{}
for {
pmsgid, ok := parent[msgid]
if !ok {
panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
}
if !seen[pmsgid] {
seen[pmsgid] = true
msgid = pmsgid
continue
}
// Cycle detected. Make this message-id the thread root.
q := bstore.QueryTx[Message](tx)
q.FilterNonzero(Message{MessageID: msgid})
q.FilterEqual("ThreadID", int64(0))
q.FilterEqual("Expunged", false)
q.SortAsc("ID")
l, err := q.List()
if err == nil && len(l) == 0 {
err = errors.New("no messages")
}
if err != nil {
return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
}
for i, m := range l {
m.ThreadID = l[0].ID
m.ThreadMissingLink = true
if i == 0 {
m.ThreadParentIDs = nil
l[0] = m // For resolvePending below.
} else {
assignParent(&m, l[0], false)
}
if slices.Contains(m.ThreadParentIDs, m.ID) {
panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
}
if err := tx.Update(&m); err != nil {
return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
}
}
// Mark all children as done so we don't process these messages again.
walk := map[string]struct{}{msgid: {}}
for len(walk) > 0 {
for msgid := range walk {
delete(walk, msgid)
if done[msgid] {
continue
}
done[msgid] = true
for _, mi := range pending[msgid] {
if !done[mi.MessageID] {
walk[mi.MessageID] = struct{}{}
}
}
}
}
// Resolve all messages in this thread.
if err := resolvePending(l[0], true); err != nil {
return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
}
break
}
}
// Check that there are no more messages without threadid.
q := bstore.QueryTx[Message](tx)
q.FilterEqual("ThreadID", int64(0))
q.FilterEqual("Expunged", false)
l, err := q.List()
if err == nil && len(l) > 0 {
err = errors.New("found messages without threadid")
}
if err != nil {
return fmt.Errorf("listing messages without threadid: %v", err)
}
if txOpt == nil {
err := tx.Commit()
tx = nil
if err != nil {
return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
}
}
return nil
}
// lookupThreadMessage tries to find the parent message with messageID that must
// have a matching subjectBase.
//
// If the message isn't present (with a valid thread id), a nil message and nil
// error is returned. The bool return value indicates if a message with the
// message-id exists at all.
func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string) (*Message, bool, error) {
q := bstore.QueryTx[Message](tx)
q.FilterNonzero(Message{MessageID: messageID})
q.FilterEqual("SubjectBase", subjectBase)
q.FilterEqual("Expunged", false)
q.FilterNotEqual("ID", mID)
q.SortAsc("ID")
l, err := q.List()
if err != nil {
return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
}
exists := len(l) > 0
for _, tm := range l {
if tm.ThreadID != 0 {
return &tm, true, nil
}
}
return nil, exists, nil
}
// lookupThreadMessageSubject looks up a parent/ancestor message for the message
// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
//
// If no message (with a threadid) is found a nil message and nil error is returned.
func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
q := bstore.QueryTx[Message](tx)
q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
q.FilterEqual("Expunged", false)
q.FilterNotEqual("ID", m.ID)
q.FilterNotEqual("ThreadID", int64(0))
q.SortDesc("Received")
q.Limit(1)
tm, err := q.Get()
if err == bstore.ErrAbsent {
return nil, nil
} else if err != nil {
return nil, err
}
return &tm, nil
}
func upgradeThreads(ctx context.Context, acc *Account, up *Upgrade) error {
log := xlog.Fields(mlog.Field("account", acc.Name))
if up.Threads == 0 {
// Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
// message and the base subject for thread matching. This allows efficient thread
// lookup in the second step.
log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
t0 := time.Now()
const batchSize = 10000
total, err := acc.ResetThreading(ctx, xlog, batchSize, true)
if err != nil {
return fmt.Errorf("resetting message threading fields: %v", err)
}
up.Threads = 1
if err := acc.DB.Update(ctx, up); err != nil {
up.Threads = 0
return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
}
log.Info("upgrading account for threading, step 1/2: completed", mlog.Field("duration", time.Since(t0)), mlog.Field("messages", total))
}
if up.Threads == 1 {
// Step 2 of the upgrade is going through all messages and assigning threadid's.
// Lookup of messageid and base subject is now fast through indexed database
// access.
log.Info("upgrading account for threading, step 2/2: matching messages to threads")
t0 := time.Now()
const batchSize = 10000
if err := acc.AssignThreads(ctx, xlog, nil, 1, batchSize, io.Discard); err != nil {
return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
}
up.Threads = 2
if err := acc.DB.Update(ctx, up); err != nil {
up.Threads = 1
return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
}
log.Info("upgrading account for threading, step 2/2: completed", mlog.Field("duration", time.Since(t0)))
}
// Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
// use threadid's before, so there is nothing to be out of date.
return nil
}

208
store/threads_test.go Normal file
View File

@ -0,0 +1,208 @@
package store
import (
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
)
func TestThreadingUpgrade(t *testing.T) {
os.RemoveAll("../testdata/store/data")
mox.ConfigStaticPath = "../testdata/store/mox.conf"
mox.MustLoadConfig(true, false)
acc, err := OpenAccount("mjl")
tcheck(t, err, "open account")
defer func() {
err = acc.Close()
tcheck(t, err, "closing account")
}()
defer Switchboard()()
log := mlog.New("store")
// New account already has threading. Add some messages, check the threading.
deliver := func(recv time.Time, s string, expThreadID int64) Message {
t.Helper()
f, err := CreateMessageTemp("account-test")
tcheck(t, err, "temp file")
defer f.Close()
s = strings.ReplaceAll(s, "\n", "\r\n")
m := Message{
Size: int64(len(s)),
MsgPrefix: []byte(s),
Received: recv,
}
err = acc.DeliverMailbox(log, "Inbox", &m, f, true)
tcheck(t, err, "deliver")
if expThreadID == 0 {
expThreadID = m.ID
}
if m.ThreadID != expThreadID {
t.Fatalf("got threadid %d, expected %d", m.ThreadID, expThreadID)
}
return m
}
now := time.Now()
m0 := deliver(now, "Message-ID: <m0@localhost>\nSubject: test1\n\ntest\n", 0)
m1 := deliver(now, "Message-ID: <m1@localhost>\nReferences: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // References.
m2 := deliver(now, "Message-ID: <m2@localhost>\nReferences: <m0@localhost>\nSubject: other\n\ntest\n", 0) // References, but different subject.
m3 := deliver(now, "Message-ID: <m3@localhost>\nIn-Reply-To: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // In-Reply-To.
m4 := deliver(now, "Message-ID: <m4@localhost>\nSubject: re: test1\n\ntest\n", m0.ID) // Subject.
m5 := deliver(now, "Message-ID: <m5@localhost>\nSubject: test1 (fwd)\n\ntest\n", m0.ID) // Subject.
m6 := deliver(now, "Message-ID: <m6@localhost>\nSubject: [fwd: test1]\n\ntest\n", m0.ID) // Subject.
m7 := deliver(now, "Message-ID: <m7@localhost>\nSubject: test1\n\ntest\n", 0) // Only subject, but not a response.
// Thread with a cyclic head, a self-referencing message.
c1 := deliver(now, "Message-ID: <c1@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", 0) // Head cycle with m8.
c2 := deliver(now, "Message-ID: <c2@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Head cycle with c1.
c3 := deliver(now, "Message-ID: <c3@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to one of the cycle elements.
c4 := deliver(now, "Message-ID: <c4@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to other cycle element.
c5 := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
c5b := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Duplicate, e.g. Sent item, internal cycle during upgrade.
c6 := deliver(now, "Message-ID: <c6@localhost>\nReferences: <c5@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
c7 := deliver(now, "Message-ID: <c7@localhost>\nReferences: <c5@localhost> <c7@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Self-referencing message that also points to actual parent.
// More than 2 messages to make a cycle.
d0 := deliver(now, "Message-ID: <d0@localhost>\nReferences: <d2@localhost>\nSubject: cycle1\n\ntest\n", 0)
d1 := deliver(now, "Message-ID: <d1@localhost>\nReferences: <d0@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
d2 := deliver(now, "Message-ID: <d2@localhost>\nReferences: <d1@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
// Cycle with messages delivered later. During import/upgrade, they will all be one thread.
e0 := deliver(now, "Message-ID: <e0@localhost>\nReferences: <e1@localhost>\nSubject: cycle2\n\ntest\n", 0)
e1 := deliver(now, "Message-ID: <e1@localhost>\nReferences: <e2@localhost>\nSubject: cycle2\n\ntest\n", 0)
e2 := deliver(now, "Message-ID: <e2@localhost>\nReferences: <e0@localhost>\nSubject: cycle2\n\ntest\n", e0.ID)
// Three messages in a cycle (f1, f2, f3), with one with an additional ancestor (f4) which is ignored due to the cycle. Has different threads during import.
f0 := deliver(now, "Message-ID: <f0@localhost>\nSubject: cycle3\n\ntest\n", 0)
f1 := deliver(now, "Message-ID: <f1@localhost>\nReferences: <f0@localhost> <f2@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
f2 := deliver(now, "Message-ID: <f2@localhost>\nReferences: <f3@localhost>\nSubject: cycle3\n\ntest\n", 0)
f3 := deliver(now, "Message-ID: <f3@localhost>\nReferences: <f1@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
// Duplicate single message (no larger thread).
g0 := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", 0)
g0b := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", g0.ID)
// Duplicate message with a child message.
h0 := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", 0)
h0b := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
h1 := deliver(now, "Message-ID: <h1@localhost>\nReferences: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
// Message has itself as reference.
s0 := deliver(now, "Message-ID: <s0@localhost>\nReferences: <s0@localhost>\nSubject: self-referencing message\n\ntest\n", 0)
// Message with \0 in subject, should get an empty base subject.
b0 := deliver(now, "Message-ID: <b0@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0)
b1 := deliver(now, "Message-ID: <b1@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0) // Not matched.
// Interleaved duplicate threaded messages. First child, then parent, then duplicate parent, then duplicat child again.
i0 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
i1 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
i2 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i1.ID)
i3 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i0.ID)
j0 := deliver(now, "Message-ID: <j0@localhost>\nReferences: <>\nSubject: empty id in references\n\ntest\n", 0)
dbpath := acc.DBPath
err = acc.Close()
tcheck(t, err, "close account")
// Now clear the threading upgrade, and the threading fields and close the account.
// We open the database file directly, so we don't trigger the consistency checker.
db, err := bstore.Open(ctxbg, dbpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
err = db.Write(ctxbg, func(tx *bstore.Tx) error {
up := Upgrade{ID: 1}
err := tx.Delete(&up)
tcheck(t, err, "delete upgrade")
q := bstore.QueryTx[Message](tx)
_, err = q.UpdateFields(map[string]any{
"MessageID": "",
"SubjectBase": "",
"ThreadID": int64(0),
"ThreadParentIDs": []int64(nil),
"ThreadMissingLink": false,
})
return err
})
tcheck(t, err, "reset threading fields")
err = db.Close()
tcheck(t, err, "closing db")
// Open the account again, that should get the account upgraded. Wait for upgrade to finish.
acc, err = OpenAccount("mjl")
tcheck(t, err, "open account")
err = acc.ThreadingWait(log)
tcheck(t, err, "wait for threading")
check := func(id int64, expThreadID int64, expParentIDs []int64, expMissingLink bool) {
t.Helper()
m := Message{ID: id}
err := acc.DB.Get(ctxbg, &m)
tcheck(t, err, "get message")
if m.ThreadID != expThreadID || !reflect.DeepEqual(m.ThreadParentIDs, expParentIDs) || m.ThreadMissingLink != expMissingLink {
t.Fatalf("got thread id %d, parent ids %v, missing link %v, expected %d %v %v", m.ThreadID, m.ThreadParentIDs, m.ThreadMissingLink, expThreadID, expParentIDs, expMissingLink)
}
}
parents0 := []int64{m0.ID}
check(m0.ID, m0.ID, nil, false)
check(m1.ID, m0.ID, parents0, false)
check(m2.ID, m2.ID, nil, true)
check(m3.ID, m0.ID, parents0, false)
check(m4.ID, m0.ID, parents0, true)
check(m5.ID, m0.ID, parents0, true)
check(m6.ID, m0.ID, parents0, true)
check(m7.ID, m7.ID, nil, false)
check(c1.ID, c1.ID, nil, true) // Head of cycle, hence missing link
check(c2.ID, c1.ID, []int64{c1.ID}, false)
check(c3.ID, c1.ID, []int64{c1.ID}, false)
check(c4.ID, c1.ID, []int64{c2.ID, c1.ID}, false)
check(c5.ID, c1.ID, []int64{c4.ID, c2.ID, c1.ID}, false)
check(c5b.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
check(c6.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, false)
check(c7.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
check(d0.ID, d0.ID, nil, true)
check(d1.ID, d0.ID, []int64{d0.ID}, false)
check(d2.ID, d0.ID, []int64{d1.ID, d0.ID}, false)
check(e0.ID, e0.ID, nil, true)
check(e1.ID, e0.ID, []int64{e2.ID, e0.ID}, false)
check(e2.ID, e0.ID, []int64{e0.ID}, false)
check(f0.ID, f0.ID, nil, false)
check(f1.ID, f1.ID, nil, true)
check(f2.ID, f1.ID, []int64{f3.ID, f1.ID}, false)
check(f3.ID, f1.ID, []int64{f1.ID}, false)
check(g0.ID, g0.ID, nil, false)
check(g0b.ID, g0.ID, []int64{g0.ID}, true)
check(h0.ID, h0.ID, nil, false)
check(h0b.ID, h0.ID, []int64{h0.ID}, true)
check(h1.ID, h0.ID, []int64{h0.ID}, false)
check(s0.ID, s0.ID, nil, true)
check(b0.ID, b0.ID, nil, false)
check(b1.ID, b1.ID, nil, false)
check(i0.ID, i1.ID, []int64{i1.ID}, false)
check(i1.ID, i1.ID, nil, false)
check(i2.ID, i1.ID, []int64{i1.ID}, true)
check(i3.ID, i1.ID, []int64{i0.ID, i1.ID}, true)
check(j0.ID, j0.ID, nil, false)
}