imapserver: Don't keep account write-locked during IMAP FETCH command

We effectively held the account write-locked by using a writable transaction
while processing the FETCH command. We did this because we may have to update
\Seen flags, for non-PEEK attribute fetches. This meant other FETCHes would
block, and other write access to the account too.

We now read the messages in a read-only transaction. We gather messages that
need marking as \Seen, and make that change in one (much shorter) database
transaction at the end of the FETCH command.

In practice, it doesn't seem too sensible to mark messages as seen
automatically. Most clients probably use the PEEK-variant of attribute fetches.

Related to issue #128.
This commit is contained in:
Mechiel Lukkien 2025-02-27 09:35:14 +01:00
parent caaace403a
commit b822533df3
No known key found for this signature in database
4 changed files with 215 additions and 138 deletions

View File

@ -4,6 +4,7 @@ package imapserver
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@ -25,18 +26,16 @@ import (
// functions to handle fetch attribute requests are defined on fetchCmd.
type fetchCmd struct {
conn *conn
mailboxID int64
uid store.UID
tx *bstore.Tx // Writable tx, for storing message when first parsed as mime parts.
changes []store.Change // For updated Seen flag.
markSeen bool
needFlags bool
needModseq bool // Whether untagged responses needs modseq.
expungeIssued bool // Set if a message cannot be read. Can happen for expunged messages.
modseq store.ModSeq // Initialized on first change, for marking messages as seen.
isUID bool // If this is a UID FETCH command.
hasChangedSince bool // Whether CHANGEDSINCE was set. Enables MODSEQ in response.
deltaCounts store.MailboxCounts // By marking \Seen, the number of unread/unseen messages will go down. We update counts at the end.
isUID bool // If this is a UID FETCH command.
rtx *bstore.Tx // Read-only transaction, kept open while processing all messages.
updateSeen []int64 // IDs of messages to mark as seen, after processing all messages.
hasChangedSince bool // Whether CHANGEDSINCE was set. Enables MODSEQ in response.
expungeIssued bool // Set if any message cannot be read. Can happen for expunged messages.
uid store.UID // UID currently processing.
markSeen bool
needFlags bool
needModseq bool // Whether untagged responses needs modseq.
// Loaded when first needed, closed when message was processed.
m *store.Message // Message currently being processed.
@ -125,31 +124,49 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) {
}
p.xempty()
// We don't use c.account.WithRLock because we write to the client while reading messages.
// We get the rlock, then we check the mailbox, release the lock and read the messages.
// The db transaction still locks out any changes to the database...
c.account.RLock()
runlock := c.account.RUnlock
// Note: we call runlock in a closure because we replace it below.
// We only keep a wlock, only for initial checks and listing the uids. Then we
// unlock and work without a lock. So changes to the store can happen, and we need
// to deal with that. If we need to mark messages as seen, we do so after
// processing the fetch for all messages, in a single write transaction. We don't
// send untagged changes for those \seen flag changes before finishing this
// command, because we have to sequence all changes properly, and since we don't
// (want to) hold a wlock while processing messages (can be many!), other changes
// may have happened to the store. So instead, we'll silently mark messages as seen
// (the client should know this is happening anyway!), then broadcast the changes
// to everyone, including ourselves. A noop/idle command that may come next will
// return the \seen flag changes, in the correct order, with the correct modseq. We
// also cannot just apply pending changes while processing. It is not allowed at
// all for non-uid-fetch. It would also make life more complicated, e.g. we would
// perhaps have to check if newly added messages also match uid fetch set that was
// requested.
var uids []store.UID
var vanishedUIDs []store.UID
cmd := &fetchCmd{conn: c, isUID: isUID, hasChangedSince: haveChangedSince}
defer func() {
runlock()
if cmd.rtx == nil {
return
}
err := cmd.rtx.Rollback()
c.log.Check(err, "rollback rtx")
cmd.rtx = nil
}()
var vanishedUIDs []store.UID
cmd := &fetchCmd{conn: c, mailboxID: c.mailboxID, isUID: isUID, hasChangedSince: haveChangedSince}
c.xdbwrite(func(tx *bstore.Tx) {
cmd.tx = tx
c.account.WithRLock(func() {
var err error
cmd.rtx, err = c.account.DB.Begin(context.TODO(), false)
cmd.xcheckf(err, "begin transaction")
// Ensure the mailbox still exists.
mb := c.xmailboxID(tx, c.mailboxID)
var uids []store.UID
c.xmailboxID(cmd.rtx, c.mailboxID)
// With changedSince, the client is likely asking for a small set of changes. Use a
// database query to trim down the uids we need to look at.
// ../rfc/7162:871
if changedSince > 0 {
q := bstore.QueryTx[store.Message](tx)
q := bstore.QueryTx[store.Message](cmd.rtx)
q.FilterNonzero(store.Message{MailboxID: c.mailboxID})
q.FilterGreater("ModSeq", store.ModSeqFromClient(changedSince))
if !vanished {
@ -176,81 +193,131 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) {
}
// Send vanished for all missing requested UIDs. ../rfc/7162:1718
if vanished {
delModSeq, err := c.account.HighestDeletedModSeq(tx)
xcheckf(err, "looking up highest deleted modseq")
if changedSince < delModSeq.Client() {
// First sort the uids we already found, for fast lookup.
sort.Slice(vanishedUIDs, func(i, j int) bool {
return vanishedUIDs[i] < vanishedUIDs[j]
})
if !vanished {
return
}
// We'll be gathering any more vanished uids in more.
more := map[store.UID]struct{}{}
checkVanished := func(uid store.UID) {
if uidSearch(c.uids, uid) <= 0 && uidSearch(vanishedUIDs, uid) <= 0 {
more[uid] = struct{}{}
}
}
// Now look through the requested uids. We may have a searchResult, handle it
// separately from a numset with potential stars, over which we can more easily
// iterate.
if nums.searchResult {
for _, uid := range c.searchResult {
checkVanished(uid)
}
} else {
iter := nums.interpretStar(c.uids).newIter()
for {
num, ok := iter.Next()
if !ok {
break
}
checkVanished(store.UID(num))
}
}
vanishedUIDs = append(vanishedUIDs, maps.Keys(more)...)
delModSeq, err := c.account.HighestDeletedModSeq(cmd.rtx)
xcheckf(err, "looking up highest deleted modseq")
if changedSince >= delModSeq.Client() {
return
}
// First sort the uids we already found, for fast lookup.
sort.Slice(vanishedUIDs, func(i, j int) bool {
return vanishedUIDs[i] < vanishedUIDs[j]
})
// We'll be gathering any more vanished uids in more.
more := map[store.UID]struct{}{}
checkVanished := func(uid store.UID) {
if uidSearch(c.uids, uid) <= 0 && uidSearch(vanishedUIDs, uid) <= 0 {
more[uid] = struct{}{}
}
}
// Release the account lock.
runlock()
runlock = func() {} // Prevent defer from unlocking again.
// First report all vanished UIDs. ../rfc/7162:1714
if len(vanishedUIDs) > 0 {
// Mention all vanished UIDs in compact numset form.
// ../rfc/7162:1985
sort.Slice(vanishedUIDs, func(i, j int) bool {
return vanishedUIDs[i] < vanishedUIDs[j]
})
// No hard limit on response sizes, but clients are recommended to not send more
// than 8k. We send a more conservative max 4k.
for _, s := range compactUIDSet(vanishedUIDs).Strings(4*1024 - 32) {
c.bwritelinef("* VANISHED (EARLIER) %s", s)
// Now look through the requested uids. We may have a searchResult, handle it
// separately from a numset with potential stars, over which we can more easily
// iterate.
if nums.searchResult {
for _, uid := range c.searchResult {
checkVanished(uid)
}
} else {
iter := nums.interpretStar(c.uids).newIter()
for {
num, ok := iter.Next()
if !ok {
break
}
checkVanished(store.UID(num))
}
}
for _, uid := range uids {
cmd.uid = uid
cmd.conn.log.Debug("processing uid", slog.Any("uid", uid))
cmd.process(atts)
}
var zeromc store.MailboxCounts
if cmd.deltaCounts != zeromc || cmd.modseq != 0 {
mb.Add(cmd.deltaCounts) // Unseen/Unread will be <= 0.
mb.ModSeq = cmd.modseq
err := tx.Update(&mb)
xcheckf(err, "updating mailbox counts")
cmd.changes = append(cmd.changes, mb.ChangeCounts())
// No need to update account total message size.
}
vanishedUIDs = append(vanishedUIDs, maps.Keys(more)...)
})
// We are continuing without a lock, working off our snapshot of uids to process.
if len(cmd.changes) > 0 {
// Broadcast seen updates to other connections.
c.broadcast(cmd.changes)
// First report all vanished UIDs. ../rfc/7162:1714
if len(vanishedUIDs) > 0 {
// Mention all vanished UIDs in compact numset form.
// ../rfc/7162:1985
sort.Slice(vanishedUIDs, func(i, j int) bool {
return vanishedUIDs[i] < vanishedUIDs[j]
})
// No hard limit on response sizes, but clients are recommended to not send more
// than 8k. We send a more conservative max 4k.
for _, s := range compactUIDSet(vanishedUIDs).Strings(4*1024 - 32) {
c.bwritelinef("* VANISHED (EARLIER) %s", s)
}
}
for _, cmd.uid = range uids {
cmd.conn.log.Debug("processing uid", slog.Any("uid", cmd.uid))
cmd.process(atts)
}
// We've returned all data. Now we mark messages as seen in one go, in a new write
// transaction. We don't send untagged messages for the changes, since there may be
// unprocessed pending changes. Instead, we broadcast them to ourselve too, so a
// next noop/idle will return the flags to the client.
err := cmd.rtx.Rollback()
c.log.Check(err, "fetch read tx rollback")
cmd.rtx = nil
// ../rfc/9051:4432 We mark all messages that need it as seen at the end of the
// command, in a single transaction.
if len(cmd.updateSeen) > 0 {
c.account.WithWLock(func() {
changes := make([]store.Change, 0, len(cmd.updateSeen)+1)
c.xdbwrite(func(wtx *bstore.Tx) {
mb := store.Mailbox{ID: c.mailboxID}
err = wtx.Get(&mb)
xcheckf(err, "get mailbox for updating counts after marking as seen")
var modseq store.ModSeq
for _, id := range cmd.updateSeen {
m := store.Message{ID: id}
err := wtx.Get(&m)
xcheckf(err, "get message")
if m.Expunged {
// Message has been deleted in the mean time.
cmd.expungeIssued = true
continue
}
if m.Seen {
// Message already marked as seen by another process.
continue
}
if modseq == 0 {
modseq, err = c.account.NextModSeq(wtx)
xcheckf(err, "get next mod seq")
}
oldFlags := m.Flags
mb.Sub(m.MailboxCounts())
m.Seen = true
mb.Add(m.MailboxCounts())
changes = append(changes, m.ChangeFlags(oldFlags))
m.ModSeq = modseq
err = wtx.Update(&m)
xcheckf(err, "mark message as seen")
}
changes = append(changes, mb.ChangeCounts())
mb.ModSeq = modseq
err = wtx.Update(&mb)
xcheckf(err, "update mailbox with counts and modseq")
})
// Broadcast these changes also to ourselves, so we'll send the updated flags, but
// in the correct order, after other changes.
store.BroadcastChanges(c.account, changes)
})
}
if cmd.expungeIssued {
@ -261,22 +328,13 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) {
}
}
func (cmd *fetchCmd) xmodseq() store.ModSeq {
if cmd.modseq == 0 {
var err error
cmd.modseq, err = cmd.conn.account.NextModSeq(cmd.tx)
cmd.xcheckf(err, "assigning next modseq")
}
return cmd.modseq
}
func (cmd *fetchCmd) xensureMessage() *store.Message {
if cmd.m != nil {
return cmd.m
}
q := bstore.QueryTx[store.Message](cmd.tx)
q.FilterNonzero(store.Message{MailboxID: cmd.mailboxID, UID: cmd.uid})
q := bstore.QueryTx[store.Message](cmd.rtx)
q.FilterNonzero(store.Message{MailboxID: cmd.conn.mailboxID, UID: cmd.uid})
q.FilterEqual("Expunged", false)
m, err := q.Get()
cmd.xcheckf(err, "get message for uid %d", cmd.uid)
@ -344,16 +402,7 @@ func (cmd *fetchCmd) process(atts []fetchAtt) {
if cmd.markSeen {
m := cmd.xensureMessage()
cmd.deltaCounts.Sub(m.MailboxCounts())
origFlags := m.Flags
m.Seen = true
cmd.deltaCounts.Add(m.MailboxCounts())
m.ModSeq = cmd.xmodseq()
err := cmd.tx.Update(m)
xcheckf(err, "marking message as seen")
// No need to update account total message size.
cmd.changes = append(cmd.changes, m.ChangeFlags(origFlags))
cmd.updateSeen = append(cmd.updateSeen, m.ID)
}
if cmd.needFlags {
@ -376,7 +425,7 @@ func (cmd *fetchCmd) process(atts []fetchAtt) {
// other mentioning of cases elsewhere in the RFC would be too superfluous.
//
// ../rfc/7162:877 ../rfc/7162:388 ../rfc/7162:909 ../rfc/7162:1426
if cmd.needModseq || cmd.hasChangedSince || cmd.conn.enabled[capQresync] && (cmd.isUID || cmd.markSeen) {
if cmd.needModseq || cmd.hasChangedSince || cmd.conn.enabled[capQresync] && cmd.isUID {
m := cmd.xensureMessage()
data = append(data, bare("MODSEQ"), listspace{bare(fmt.Sprintf("%d", m.ModSeq.Client()))})
}
@ -395,6 +444,7 @@ func (cmd *fetchCmd) xprocessAtt(a fetchAtt) []token {
case "UID":
// Always present.
return nil
case "ENVELOPE":
_, part := cmd.xensureParsed()
envelope := xenvelope(part)

View File

@ -98,26 +98,37 @@ func TestFetch(t *testing.T) {
// Should be returned unmodified, because there is no content-transfer-encoding.
tc.transactf("ok", "fetch 1 binary[]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binary1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binary1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.transactf("ok", "fetch 1 binary[1]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypart1}}) // Seen flag not changed.
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 binary[]<1.1>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartial1, flagsSeen}})
tc.transactf("ok", "uid fetch 1 binary[]<1.1>")
tc.xuntagged(
imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartial1, noflags}},
imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}, // For UID FETCH, we get the flags during the command.
)
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 binary[1]<1.1>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartpartial1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartpartial1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 binary[]<10000.10001>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binaryend1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binaryend1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 binary[1]<10000.10001>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartend1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartend1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 binary.size[]")
@ -128,29 +139,43 @@ func TestFetch(t *testing.T) {
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.transactf("ok", "fetch 1 body[]<1.2>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyoff1}}) // Already seen.
tc.transactf("ok", "noop")
tc.xuntagged() // Already seen.
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[1]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodypart1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodypart1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[1]<1.2>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1off1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1off1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[1]<100000.100000>")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyend1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyend1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[header]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyheader1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyheader1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 body[text]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodytext1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodytext1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
// equivalent to body.peek[header], ../rfc/3501:3183
tc.client.StoreFlagsClear("1", true, `\Seen`)
@ -160,12 +185,16 @@ func TestFetch(t *testing.T) {
// equivalent to body[text], ../rfc/3501:3199
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 rfc822.text")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfctext1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfctext1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
// equivalent to body[], ../rfc/3501:3179
tc.client.StoreFlagsClear("1", true, `\Seen`)
tc.transactf("ok", "fetch 1 rfc822")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfc1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfc1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
// With PEEK, we should not get the \Seen flag.
tc.client.StoreFlagsClear("1", true, `\Seen`)
@ -194,7 +223,9 @@ func TestFetch(t *testing.T) {
tc.transactf("bad", "fetch 2 body[]")
tc.transactf("ok", "fetch 1:1 body[]")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, flagsSeen}})
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, noflags}})
tc.transactf("ok", "noop")
tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}})
// UID fetch
tc.transactf("ok", "uid fetch 1 body[]")

View File

@ -337,9 +337,7 @@ func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) {
})
// Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
if c.comm != nil {
pendingChanges = c.comm.Get()
}
pendingChanges = c.comm.Get()
if oldMsgExpunged {
return

View File

@ -3565,9 +3565,7 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) {
committed = true
// Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
if c.comm != nil {
pendingChanges = c.comm.Get()
}
pendingChanges = c.comm.Get()
// Broadcast the change to other connections.
for _, a := range appends {