imapserver: simplify and fix logic around processing changes while opening a mailbox (with SELECT or EXAMINE)

We were first getting UIDs in a transaction with a lock. Then getting the
changes and processing them in a special way. And then processing for qresync
in a new transaction. The special processing of changes is now gone, it seems
to have skipped adding/removing uids to the session, which can't be correct.
The new approach is just using a lock and transaction and process the whole
opening of the mailbox, and not processing any changes as part of the open, and
getting rid of the special "initial" mode processing a mailbox.
This commit is contained in:
Mechiel Lukkien 2025-04-11 20:28:35 +02:00
parent fd5167fdb3
commit af3e9351bc
No known key found for this signature in database
3 changed files with 195 additions and 234 deletions

View File

@ -276,7 +276,7 @@ func (c *conn) cmdNotify(tag, cmd string, p *parser) {
select {
case <-c.comm.Pending:
overflow, changes := c.comm.Get()
c.xapplyChanges(overflow, changes, true, true)
c.xapplyChanges(overflow, changes, true)
default:
}

View File

@ -329,7 +329,7 @@ func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) {
// Must update our msgseq/uids tracking with latest pending changes.
l := pendingChanges
pendingChanges = nil
c.xapplyChanges(overflow, l, false, false)
c.xapplyChanges(overflow, l, false)
// If we couldn't find the message, send a NO response. We've just applied pending
// changes, which should have expunged the absent message.

View File

@ -208,7 +208,7 @@ type conn struct {
baseTLSConfig *tls.Config // Base TLS config to use for handshake.
remoteIP net.IP
noRequireSTARTTLS bool
cmd string // Currently executing, for deciding to applyChanges and logging.
cmd string // Currently executing, for deciding to xapplyChanges and logging.
cmdMetric string // Currently executing, for metrics.
cmdStart time.Time
ncmds int // Number of commands processed. Used to abort connection when first incoming command is unknown/invalid.
@ -675,10 +675,14 @@ func (c *conn) xbwriteresultf(format string, args ...any) {
switch c.cmd {
case "fetch", "store", "search":
// ../rfc/9051:5862 ../rfc/7162:2033
case "select", "examine":
// We don't send changes before having confirmed opening the mailbox, to prevent
// clients from trying to interpret changes when it considers there isn't a
// selected mailbox yet.
default:
if c.comm != nil {
overflow, changes := c.comm.Get()
c.xapplyChanges(overflow, changes, false, true)
c.xapplyChanges(overflow, changes, true)
}
}
c.xbwritelinef(format, args...)
@ -1304,7 +1308,7 @@ func (c *conn) command() {
case <-c.comm.Pending:
overflow, changes := c.comm.Get()
c.xapplyChanges(overflow, changes, false, false)
c.xapplyChanges(overflow, changes, false)
c.xflush()
case <-mox.Shutdown.Done():
@ -1770,11 +1774,9 @@ func (c *conn) xmailboxID(tx *bstore.Tx, id int64) store.Mailbox {
}
// Apply changes to our session state.
// If initial is false, updates like EXISTS and EXPUNGE are written to the client.
// If initial is true, we only apply the changes.
// Should not be called while holding locks, as changes are written to client connections, which can block.
// Does not flush output.
func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sendDelayed bool) {
func (c *conn) xapplyChanges(overflow bool, changes []store.Change, sendDelayed bool) {
// If more changes were generated than we can process, we send a
// NOTIFICATIONOVERFLOW as defined in the NOTIFY extension. ../rfc/5465:712
if overflow {
@ -1786,9 +1788,6 @@ func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sen
// NOTIFY, but we also follow this for IDLE. ../rfc/5465:717
c.notify = &notify{}
c.xbwritelinef("* OK [NOTIFICATIONOVERFLOW] out of sync after too many pending changes")
if !initial {
return
}
changes = nil
}
@ -1861,16 +1860,10 @@ func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sen
if !ok {
break
}
if initial && !c.uidonly && c.sequence(ch.UID) > 0 {
continue
}
c.uidAppend(ch.UID)
adds = append(adds, ch)
}
if len(adds) > 0 {
if initial {
continue
}
// Write the exists, and the UID and flags as well. Hopefully the client waits for
// long enough after the EXISTS to see these messages, and doesn't request them
// again with a FETCH.
@ -1900,31 +1893,19 @@ func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sen
for _, uid := range ch.UIDs {
// With uidonly, we must always return VANISHED. ../rfc/9586:232
if c.uidonly {
if !initial {
c.exists--
vanishedUIDs.append(uint32(uid))
}
continue
}
var seq msgseq
if initial {
seq = c.sequence(uid)
if seq <= 0 {
continue
}
} else {
seq = c.xsequence(uid)
}
seq := c.xsequence(uid)
c.sequenceRemove(seq, uid)
if !initial {
if qresync {
vanishedUIDs.append(uint32(uid))
} else {
c.xbwritelinef("* %d EXPUNGE", seq)
}
}
}
if !vanishedUIDs.empty() {
// VANISHED without EARLIER. ../rfc/7162:2004
for _, s := range vanishedUIDs.Strings(4*1024 - 32) {
@ -1933,9 +1914,6 @@ func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sen
}
case store.ChangeFlags:
if initial {
continue
}
var modseqStr string
if condstore {
modseqStr = fmt.Sprintf(" MODSEQ (%d)", ch.ModSeq.Client())
@ -1987,9 +1965,10 @@ func (c *conn) xapplyChanges(overflow bool, changes []store.Change, initial, sen
}
}
// Like applyChanges, but for notify, with configurable mailboxes to notify about,
// and configurable events to send, including which fetch attributes to return.
// All calls must go through applyChanges, for overflow handling.
// xapplyChangesNotify is like xapplyChanges, but for NOTIFY, with configurable
// mailboxes to notify about, and configurable events to send, including which
// fetch attributes to return. All calls must go through xapplyChanges, for overflow
// handling.
func (c *conn) xapplyChangesNotify(changes []store.Change, sendDelayed bool) {
if sendDelayed && len(c.notify.Delayed) > 0 {
changes = append(c.notify.Delayed, changes...)
@ -3240,14 +3219,13 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
name = xcheckmailboxname(name, true)
var highestModSeq store.ModSeq
var highDeletedModSeq store.ModSeq
var firstUnseen msgseq = 0
var mb store.Mailbox
c.account.WithRLock(func() {
c.xdbread(func(tx *bstore.Tx) {
mb = c.xmailbox(tx, name, "")
var firstUnseen msgseq = 0
c.uidnext = mb.UIDNext
if c.uidonly {
c.exists = uint32(mb.MailboxCounts.Total + mb.MailboxCounts.Deleted)
@ -3270,23 +3248,6 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
c.exists = uint32(len(c.uids))
}
// Condstore extension, find the highest modseq.
if c.enabled[capCondstore] {
highestModSeq = mb.ModSeq
}
// For QRESYNC, we need to know the highest modset of deleted expunged records to
// maintain synchronization.
if c.enabled[capQresync] {
var err error
highDeletedModSeq, err = c.account.HighestDeletedModSeq(tx)
xcheckf(err, "getting highest deleted modseq")
}
})
})
overflow, changes := c.comm.Get()
c.xapplyChanges(overflow, changes, true, false)
var flags string
if len(mb.Keywords) > 0 {
flags = " " + strings.Join(mb.Keywords, " ")
@ -3307,7 +3268,7 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
if c.enabled[capCondstore] {
// ../rfc/7162:417
// ../rfc/7162-eid5055 ../rfc/7162:484 ../rfc/7162:1167
c.xbwritelinef(`* OK [HIGHESTMODSEQ %d] x`, highestModSeq.Client())
c.xbwritelinef(`* OK [HIGHESTMODSEQ %d] x`, mb.ModSeq.Client())
}
// If QRESYNC uidvalidity matches, we send any changes. ../rfc/7162:1509
@ -3360,10 +3321,6 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
// send expunge/vanished before the tagged OK.
// ../rfc/7162:1340
// We are reading without account lock. Similar to when we process FETCH/SEARCH
// requests. We don't have to reverify existence of the mailbox, so we don't
// rlock, even briefly.
c.xdbread(func(tx *bstore.Tx) {
if oldClientUID > 0 {
// The client sent a UID that is now removed. This is typically fine. But we check
// that it is consistent with the modseq the client sent. If the UID already didn't
@ -3390,7 +3347,7 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
q.FilterNonzero(store.Message{MailboxID: mb.ID})
// Note: we don't filter by Expunged.
q.FilterGreater("ModSeq", store.ModSeqFromClient(qrmodseq))
q.FilterLessEqual("ModSeq", highestModSeq)
q.FilterLessEqual("ModSeq", mb.ModSeq)
q.FilterLess("UID", c.uidnext)
q.SortAsc("ModSeq")
err := q.ForEach(func(m store.Message) error {
@ -3415,6 +3372,9 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
})
xcheckf(err, "listing changed messages")
highDeletedModSeq, err := c.account.HighestDeletedModSeq(tx)
xcheckf(err, "getting highest deleted modseq")
// If we don't have enough history, we go through all UIDs and look them up, and
// add them to the vanished list if they have disappeared.
if qrmodseq < highDeletedModSeq.Client() {
@ -3470,7 +3430,6 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
}
}
}
})
// Now that we have all vanished UIDs, send them over compactly.
if len(vanishedUIDs) > 0 {
@ -3481,6 +3440,8 @@ func (c *conn) cmdSelectExamine(isselect bool, tag, cmd string, p *parser) {
}
}
}
})
})
if isselect {
c.xbwriteresultf("%s OK [READ-WRITE] x", tag)
@ -4242,7 +4203,7 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) {
if c.mailboxID == mb.ID {
l := pendingChanges
pendingChanges = nil
c.xapplyChanges(overflow, l, false, true)
c.xapplyChanges(overflow, l, true)
for _, a := range appends {
c.uidAppend(a.m.UID)
}
@ -4277,7 +4238,7 @@ func (c *conn) cmdIdle(tag, cmd string, p *parser) {
// With NOTIFY enabled, flush all pending changes.
if c.notify != nil && len(c.notify.Delayed) > 0 {
c.xapplyChanges(false, nil, false, true)
c.xapplyChanges(false, nil, true)
c.xflush()
}
@ -4303,7 +4264,7 @@ Wait:
break Wait
case <-c.comm.Pending:
overflow, changes := c.comm.Get()
c.xapplyChanges(overflow, changes, false, true)
c.xapplyChanges(overflow, changes, true)
c.xflush()
case <-mox.Shutdown.Done():
// ../rfc/9051:5375