diff --git a/imapserver/fetch.go b/imapserver/fetch.go index e550919..8ddbbde 100644 --- a/imapserver/fetch.go +++ b/imapserver/fetch.go @@ -4,6 +4,7 @@ package imapserver import ( "bytes" + "context" "errors" "fmt" "io" @@ -25,18 +26,16 @@ import ( // functions to handle fetch attribute requests are defined on fetchCmd. type fetchCmd struct { conn *conn - mailboxID int64 - uid store.UID - tx *bstore.Tx // Writable tx, for storing message when first parsed as mime parts. - changes []store.Change // For updated Seen flag. - markSeen bool - needFlags bool - needModseq bool // Whether untagged responses needs modseq. - expungeIssued bool // Set if a message cannot be read. Can happen for expunged messages. - modseq store.ModSeq // Initialized on first change, for marking messages as seen. - isUID bool // If this is a UID FETCH command. - hasChangedSince bool // Whether CHANGEDSINCE was set. Enables MODSEQ in response. - deltaCounts store.MailboxCounts // By marking \Seen, the number of unread/unseen messages will go down. We update counts at the end. + isUID bool // If this is a UID FETCH command. + rtx *bstore.Tx // Read-only transaction, kept open while processing all messages. + updateSeen []int64 // IDs of messages to mark as seen, after processing all messages. + hasChangedSince bool // Whether CHANGEDSINCE was set. Enables MODSEQ in response. + expungeIssued bool // Set if any message cannot be read. Can happen for expunged messages. + + uid store.UID // UID currently processing. + markSeen bool + needFlags bool + needModseq bool // Whether untagged responses needs modseq. // Loaded when first needed, closed when message was processed. m *store.Message // Message currently being processed. @@ -125,31 +124,49 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) { } p.xempty() - // We don't use c.account.WithRLock because we write to the client while reading messages. - // We get the rlock, then we check the mailbox, release the lock and read the messages. - // The db transaction still locks out any changes to the database... - c.account.RLock() - runlock := c.account.RUnlock - // Note: we call runlock in a closure because we replace it below. + // We only keep a wlock, only for initial checks and listing the uids. Then we + // unlock and work without a lock. So changes to the store can happen, and we need + // to deal with that. If we need to mark messages as seen, we do so after + // processing the fetch for all messages, in a single write transaction. We don't + // send untagged changes for those \seen flag changes before finishing this + // command, because we have to sequence all changes properly, and since we don't + // (want to) hold a wlock while processing messages (can be many!), other changes + // may have happened to the store. So instead, we'll silently mark messages as seen + // (the client should know this is happening anyway!), then broadcast the changes + // to everyone, including ourselves. A noop/idle command that may come next will + // return the \seen flag changes, in the correct order, with the correct modseq. We + // also cannot just apply pending changes while processing. It is not allowed at + // all for non-uid-fetch. It would also make life more complicated, e.g. we would + // perhaps have to check if newly added messages also match uid fetch set that was + // requested. + + var uids []store.UID + var vanishedUIDs []store.UID + + cmd := &fetchCmd{conn: c, isUID: isUID, hasChangedSince: haveChangedSince} + defer func() { - runlock() + if cmd.rtx == nil { + return + } + err := cmd.rtx.Rollback() + c.log.Check(err, "rollback rtx") + cmd.rtx = nil }() - var vanishedUIDs []store.UID - cmd := &fetchCmd{conn: c, mailboxID: c.mailboxID, isUID: isUID, hasChangedSince: haveChangedSince} - c.xdbwrite(func(tx *bstore.Tx) { - cmd.tx = tx + c.account.WithRLock(func() { + var err error + cmd.rtx, err = c.account.DB.Begin(context.TODO(), false) + cmd.xcheckf(err, "begin transaction") // Ensure the mailbox still exists. - mb := c.xmailboxID(tx, c.mailboxID) - - var uids []store.UID + c.xmailboxID(cmd.rtx, c.mailboxID) // With changedSince, the client is likely asking for a small set of changes. Use a // database query to trim down the uids we need to look at. // ../rfc/7162:871 if changedSince > 0 { - q := bstore.QueryTx[store.Message](tx) + q := bstore.QueryTx[store.Message](cmd.rtx) q.FilterNonzero(store.Message{MailboxID: c.mailboxID}) q.FilterGreater("ModSeq", store.ModSeqFromClient(changedSince)) if !vanished { @@ -176,81 +193,131 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) { } // Send vanished for all missing requested UIDs. ../rfc/7162:1718 - if vanished { - delModSeq, err := c.account.HighestDeletedModSeq(tx) - xcheckf(err, "looking up highest deleted modseq") - if changedSince < delModSeq.Client() { - // First sort the uids we already found, for fast lookup. - sort.Slice(vanishedUIDs, func(i, j int) bool { - return vanishedUIDs[i] < vanishedUIDs[j] - }) + if !vanished { + return + } - // We'll be gathering any more vanished uids in more. - more := map[store.UID]struct{}{} - checkVanished := func(uid store.UID) { - if uidSearch(c.uids, uid) <= 0 && uidSearch(vanishedUIDs, uid) <= 0 { - more[uid] = struct{}{} - } - } - // Now look through the requested uids. We may have a searchResult, handle it - // separately from a numset with potential stars, over which we can more easily - // iterate. - if nums.searchResult { - for _, uid := range c.searchResult { - checkVanished(uid) - } - } else { - iter := nums.interpretStar(c.uids).newIter() - for { - num, ok := iter.Next() - if !ok { - break - } - checkVanished(store.UID(num)) - } - } - vanishedUIDs = append(vanishedUIDs, maps.Keys(more)...) + delModSeq, err := c.account.HighestDeletedModSeq(cmd.rtx) + xcheckf(err, "looking up highest deleted modseq") + if changedSince >= delModSeq.Client() { + return + } + + // First sort the uids we already found, for fast lookup. + sort.Slice(vanishedUIDs, func(i, j int) bool { + return vanishedUIDs[i] < vanishedUIDs[j] + }) + + // We'll be gathering any more vanished uids in more. + more := map[store.UID]struct{}{} + checkVanished := func(uid store.UID) { + if uidSearch(c.uids, uid) <= 0 && uidSearch(vanishedUIDs, uid) <= 0 { + more[uid] = struct{}{} } } - - // Release the account lock. - runlock() - runlock = func() {} // Prevent defer from unlocking again. - - // First report all vanished UIDs. ../rfc/7162:1714 - if len(vanishedUIDs) > 0 { - // Mention all vanished UIDs in compact numset form. - // ../rfc/7162:1985 - sort.Slice(vanishedUIDs, func(i, j int) bool { - return vanishedUIDs[i] < vanishedUIDs[j] - }) - // No hard limit on response sizes, but clients are recommended to not send more - // than 8k. We send a more conservative max 4k. - for _, s := range compactUIDSet(vanishedUIDs).Strings(4*1024 - 32) { - c.bwritelinef("* VANISHED (EARLIER) %s", s) + // Now look through the requested uids. We may have a searchResult, handle it + // separately from a numset with potential stars, over which we can more easily + // iterate. + if nums.searchResult { + for _, uid := range c.searchResult { + checkVanished(uid) + } + } else { + iter := nums.interpretStar(c.uids).newIter() + for { + num, ok := iter.Next() + if !ok { + break + } + checkVanished(store.UID(num)) } } - - for _, uid := range uids { - cmd.uid = uid - cmd.conn.log.Debug("processing uid", slog.Any("uid", uid)) - cmd.process(atts) - } - - var zeromc store.MailboxCounts - if cmd.deltaCounts != zeromc || cmd.modseq != 0 { - mb.Add(cmd.deltaCounts) // Unseen/Unread will be <= 0. - mb.ModSeq = cmd.modseq - err := tx.Update(&mb) - xcheckf(err, "updating mailbox counts") - cmd.changes = append(cmd.changes, mb.ChangeCounts()) - // No need to update account total message size. - } + vanishedUIDs = append(vanishedUIDs, maps.Keys(more)...) }) + // We are continuing without a lock, working off our snapshot of uids to process. - if len(cmd.changes) > 0 { - // Broadcast seen updates to other connections. - c.broadcast(cmd.changes) + // First report all vanished UIDs. ../rfc/7162:1714 + if len(vanishedUIDs) > 0 { + // Mention all vanished UIDs in compact numset form. + // ../rfc/7162:1985 + sort.Slice(vanishedUIDs, func(i, j int) bool { + return vanishedUIDs[i] < vanishedUIDs[j] + }) + // No hard limit on response sizes, but clients are recommended to not send more + // than 8k. We send a more conservative max 4k. + for _, s := range compactUIDSet(vanishedUIDs).Strings(4*1024 - 32) { + c.bwritelinef("* VANISHED (EARLIER) %s", s) + } + } + + for _, cmd.uid = range uids { + cmd.conn.log.Debug("processing uid", slog.Any("uid", cmd.uid)) + cmd.process(atts) + } + + // We've returned all data. Now we mark messages as seen in one go, in a new write + // transaction. We don't send untagged messages for the changes, since there may be + // unprocessed pending changes. Instead, we broadcast them to ourselve too, so a + // next noop/idle will return the flags to the client. + + err := cmd.rtx.Rollback() + c.log.Check(err, "fetch read tx rollback") + cmd.rtx = nil + + // ../rfc/9051:4432 We mark all messages that need it as seen at the end of the + // command, in a single transaction. + if len(cmd.updateSeen) > 0 { + c.account.WithWLock(func() { + changes := make([]store.Change, 0, len(cmd.updateSeen)+1) + + c.xdbwrite(func(wtx *bstore.Tx) { + mb := store.Mailbox{ID: c.mailboxID} + err = wtx.Get(&mb) + xcheckf(err, "get mailbox for updating counts after marking as seen") + + var modseq store.ModSeq + + for _, id := range cmd.updateSeen { + m := store.Message{ID: id} + err := wtx.Get(&m) + xcheckf(err, "get message") + if m.Expunged { + // Message has been deleted in the mean time. + cmd.expungeIssued = true + continue + } + if m.Seen { + // Message already marked as seen by another process. + continue + } + + if modseq == 0 { + modseq, err = c.account.NextModSeq(wtx) + xcheckf(err, "get next mod seq") + } + + oldFlags := m.Flags + mb.Sub(m.MailboxCounts()) + m.Seen = true + mb.Add(m.MailboxCounts()) + changes = append(changes, m.ChangeFlags(oldFlags)) + + m.ModSeq = modseq + err = wtx.Update(&m) + xcheckf(err, "mark message as seen") + } + + changes = append(changes, mb.ChangeCounts()) + + mb.ModSeq = modseq + err = wtx.Update(&mb) + xcheckf(err, "update mailbox with counts and modseq") + }) + + // Broadcast these changes also to ourselves, so we'll send the updated flags, but + // in the correct order, after other changes. + store.BroadcastChanges(c.account, changes) + }) } if cmd.expungeIssued { @@ -261,22 +328,13 @@ func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) { } } -func (cmd *fetchCmd) xmodseq() store.ModSeq { - if cmd.modseq == 0 { - var err error - cmd.modseq, err = cmd.conn.account.NextModSeq(cmd.tx) - cmd.xcheckf(err, "assigning next modseq") - } - return cmd.modseq -} - func (cmd *fetchCmd) xensureMessage() *store.Message { if cmd.m != nil { return cmd.m } - q := bstore.QueryTx[store.Message](cmd.tx) - q.FilterNonzero(store.Message{MailboxID: cmd.mailboxID, UID: cmd.uid}) + q := bstore.QueryTx[store.Message](cmd.rtx) + q.FilterNonzero(store.Message{MailboxID: cmd.conn.mailboxID, UID: cmd.uid}) q.FilterEqual("Expunged", false) m, err := q.Get() cmd.xcheckf(err, "get message for uid %d", cmd.uid) @@ -344,16 +402,7 @@ func (cmd *fetchCmd) process(atts []fetchAtt) { if cmd.markSeen { m := cmd.xensureMessage() - cmd.deltaCounts.Sub(m.MailboxCounts()) - origFlags := m.Flags - m.Seen = true - cmd.deltaCounts.Add(m.MailboxCounts()) - m.ModSeq = cmd.xmodseq() - err := cmd.tx.Update(m) - xcheckf(err, "marking message as seen") - // No need to update account total message size. - - cmd.changes = append(cmd.changes, m.ChangeFlags(origFlags)) + cmd.updateSeen = append(cmd.updateSeen, m.ID) } if cmd.needFlags { @@ -376,7 +425,7 @@ func (cmd *fetchCmd) process(atts []fetchAtt) { // other mentioning of cases elsewhere in the RFC would be too superfluous. // // ../rfc/7162:877 ../rfc/7162:388 ../rfc/7162:909 ../rfc/7162:1426 - if cmd.needModseq || cmd.hasChangedSince || cmd.conn.enabled[capQresync] && (cmd.isUID || cmd.markSeen) { + if cmd.needModseq || cmd.hasChangedSince || cmd.conn.enabled[capQresync] && cmd.isUID { m := cmd.xensureMessage() data = append(data, bare("MODSEQ"), listspace{bare(fmt.Sprintf("%d", m.ModSeq.Client()))}) } @@ -395,6 +444,7 @@ func (cmd *fetchCmd) xprocessAtt(a fetchAtt) []token { case "UID": // Always present. return nil + case "ENVELOPE": _, part := cmd.xensureParsed() envelope := xenvelope(part) diff --git a/imapserver/fetch_test.go b/imapserver/fetch_test.go index fdde292..4d091f5 100644 --- a/imapserver/fetch_test.go +++ b/imapserver/fetch_test.go @@ -98,26 +98,37 @@ func TestFetch(t *testing.T) { // Should be returned unmodified, because there is no content-transfer-encoding. tc.transactf("ok", "fetch 1 binary[]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binary1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binary1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.transactf("ok", "fetch 1 binary[1]") tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypart1}}) // Seen flag not changed. tc.client.StoreFlagsClear("1", true, `\Seen`) - tc.transactf("ok", "fetch 1 binary[]<1.1>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartial1, flagsSeen}}) + tc.transactf("ok", "uid fetch 1 binary[]<1.1>") + tc.xuntagged( + imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartial1, noflags}}, + imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}, // For UID FETCH, we get the flags during the command. + ) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 binary[1]<1.1>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartpartial1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartpartial1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 binary[]<10000.10001>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binaryend1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binaryend1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 binary[1]<10000.10001>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartend1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, binarypartend1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 binary.size[]") @@ -128,29 +139,43 @@ func TestFetch(t *testing.T) { tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.transactf("ok", "fetch 1 body[]<1.2>") tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyoff1}}) // Already seen. + tc.transactf("ok", "noop") + tc.xuntagged() // Already seen. tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[1]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodypart1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodypart1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[1]<1.2>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1off1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1off1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[1]<100000.100000>") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyend1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyend1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[header]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyheader1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodyheader1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 body[text]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodytext1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, bodytext1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) // equivalent to body.peek[header], ../rfc/3501:3183 tc.client.StoreFlagsClear("1", true, `\Seen`) @@ -160,12 +185,16 @@ func TestFetch(t *testing.T) { // equivalent to body[text], ../rfc/3501:3199 tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 rfc822.text") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfctext1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfctext1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) // equivalent to body[], ../rfc/3501:3179 tc.client.StoreFlagsClear("1", true, `\Seen`) tc.transactf("ok", "fetch 1 rfc822") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfc1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, rfc1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) // With PEEK, we should not get the \Seen flag. tc.client.StoreFlagsClear("1", true, `\Seen`) @@ -194,7 +223,9 @@ func TestFetch(t *testing.T) { tc.transactf("bad", "fetch 2 body[]") tc.transactf("ok", "fetch 1:1 body[]") - tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, flagsSeen}}) + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, body1, noflags}}) + tc.transactf("ok", "noop") + tc.xuntagged(imapclient.UntaggedFetch{Seq: 1, Attrs: []imapclient.FetchAttr{uid1, flagsSeen}}) // UID fetch tc.transactf("ok", "uid fetch 1 body[]") diff --git a/imapserver/replace.go b/imapserver/replace.go index aae1a0c..2fb63c2 100644 --- a/imapserver/replace.go +++ b/imapserver/replace.go @@ -337,9 +337,7 @@ func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) { }) // Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID. - if c.comm != nil { - pendingChanges = c.comm.Get() - } + pendingChanges = c.comm.Get() if oldMsgExpunged { return diff --git a/imapserver/server.go b/imapserver/server.go index 7580d3d..b79cac1 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -3565,9 +3565,7 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) { committed = true // Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID. - if c.comm != nil { - pendingChanges = c.comm.Get() - } + pendingChanges = c.comm.Get() // Broadcast the change to other connections. for _, a := range appends {