better thread matching for dsns

keep track of whether a message is a dsn, and match dsn's against their sent
message by ignoring the message subject.
This commit is contained in:
Mechiel Lukkien
2024-03-04 16:40:27 +01:00
parent f6497b1aaf
commit 13923e4b7b
9 changed files with 39 additions and 13 deletions

View File

@ -484,6 +484,10 @@ type Message struct {
// filtering).
IsMailingList bool
// If this message is a DSN. For DSNs, we don't look at the subject when matching
// threads.
DSN bool
ReceivedTLSVersion uint16 // 0 if unknown, 1 if plaintext/no TLS, otherwise TLS cipher suite.
ReceivedTLSCipherSuite uint16
ReceivedRequireTLS bool // Whether RequireTLS was known to be used for incoming delivery.
@ -574,9 +578,11 @@ func (m *Message) PrepareExpunge() {
}
}
// PrepareThreading sets MessageID and SubjectBase (used in threading) based on the
// envelope in part.
// PrepareThreading sets MessageID, SubjectBase and DSN (used in threading) based
// on the part.
func (m *Message) PrepareThreading(log mlog.Log, part *message.Part) {
m.DSN = part.IsDSN()
if part.Envelope == nil {
return
}

View File

@ -58,7 +58,7 @@ func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) e
if messageID == m.MessageID {
continue
}
tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
if err != nil {
return fmt.Errorf("looking up thread message for new message: %v", err)
} else if tm != nil {
@ -302,7 +302,7 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
if messageID == m.MessageID {
continue
}
tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
if err != nil {
return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
} else if tm != nil {
@ -485,7 +485,7 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
return nil
}
// Use multiple worker goroutines to read parse headers from on-disk messages.
// Use multiple worker goroutines to parse headers from on-disk messages.
procs := runtime.GOMAXPROCS(0)
wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
@ -674,16 +674,18 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
return nil
}
// lookupThreadMessage tries to find the parent message with messageID that must
// have a matching subjectBase.
// lookupThreadMessage tries to find the parent message with messageID, that must
// have a matching subjectBase (unless it is a DSN).
//
// If the message isn't present (with a valid thread id), a nil message and nil
// error is returned. The bool return value indicates if a message with the
// message-id exists at all.
func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string) (*Message, bool, error) {
func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string, isDSN bool) (*Message, bool, error) {
q := bstore.QueryTx[Message](tx)
q.FilterNonzero(Message{MessageID: messageID})
q.FilterEqual("SubjectBase", subjectBase)
if !isDSN {
q.FilterEqual("SubjectBase", subjectBase)
}
q.FilterEqual("Expunged", false)
q.FilterNotEqual("ID", mID)
q.SortAsc("ID")