mirror of
https://github.com/mjl-/mox.git
synced 2025-07-13 06:14:38 +03:00
imapserver: implement MULTIAPPEND extension, rfc 3502
MULTIAPPEND modifies the existing APPEND command to allow multiple messages. it is somewhat more involved than a regular append of a single message since the operation (of adding multiple messages) must be atomic. either all are added, or none are. we check as early as possible if the messages won't cause an over-quota error.
This commit is contained in:
@ -28,7 +28,6 @@ non-ASCII UTF-8. Until that's enabled, we do use UTF-7 for mailbox names. See
|
||||
- todo: do not return binary data for a fetch body. at least not for imap4rev1. we should be encoding it as base64?
|
||||
- todo: on expunge we currently remove the message even if other sessions still have a reference to the uid. if they try to query the uid, they'll get an error. we could be nicer and only actually remove the message when the last reference has gone. we could add a new flag to store.Message marking the message as expunged, not give new session access to such messages, and make store remove them at startup, and clean them when the last session referencing the session goes. however, it will get much more complicated. renaming messages would need special handling. and should we do the same for removed mailboxes?
|
||||
- todo: try to recover from syntax errors when the last command line ends with a }, i.e. a literal. we currently abort the entire connection. we may want to read some amount of literal data and continue with a next command.
|
||||
- todo future: more extensions: OBJECTID, MULTISEARCH, REPLACE, NOTIFY, CATENATE, MULTIAPPEND, SORT, THREAD.
|
||||
*/
|
||||
|
||||
import (
|
||||
@ -165,12 +164,13 @@ var authFailDelay = time.Second // After authentication failure.
|
||||
// NAMESPACE: ../rfc/2342
|
||||
// COMPRESS=DEFLATE: ../rfc/4978
|
||||
// LIST-METADATA: ../rfc/9590
|
||||
// MULTIAPPEND: ../rfc/3502
|
||||
//
|
||||
// We always announce support for SCRAM PLUS-variants, also on connections without
|
||||
// TLS. The client should not be selecting PLUS variants on non-TLS connections,
|
||||
// instead opting to do the bare SCRAM variant without indicating the server claims
|
||||
// to support the PLUS variant (skipping the server downgrade detection check).
|
||||
const serverCapabilities = "IMAP4rev2 IMAP4rev1 ENABLE LITERAL+ IDLE SASL-IR BINARY UNSELECT UIDPLUS ESEARCH SEARCHRES MOVE UTF8=ACCEPT LIST-EXTENDED SPECIAL-USE CREATE-SPECIAL-USE LIST-STATUS AUTH=SCRAM-SHA-256-PLUS AUTH=SCRAM-SHA-256 AUTH=SCRAM-SHA-1-PLUS AUTH=SCRAM-SHA-1 AUTH=CRAM-MD5 ID APPENDLIMIT=9223372036854775807 CONDSTORE QRESYNC STATUS=SIZE QUOTA QUOTA=RES-STORAGE METADATA SAVEDATE WITHIN NAMESPACE COMPRESS=DEFLATE LIST-METADATA"
|
||||
const serverCapabilities = "IMAP4rev2 IMAP4rev1 ENABLE LITERAL+ IDLE SASL-IR BINARY UNSELECT UIDPLUS ESEARCH SEARCHRES MOVE UTF8=ACCEPT LIST-EXTENDED SPECIAL-USE CREATE-SPECIAL-USE LIST-STATUS AUTH=SCRAM-SHA-256-PLUS AUTH=SCRAM-SHA-256 AUTH=SCRAM-SHA-1-PLUS AUTH=SCRAM-SHA-1 AUTH=CRAM-MD5 ID APPENDLIMIT=9223372036854775807 CONDSTORE QRESYNC STATUS=SIZE QUOTA QUOTA=RES-STORAGE METADATA SAVEDATE WITHIN NAMESPACE COMPRESS=DEFLATE LIST-METADATA MULTIAPPEND"
|
||||
|
||||
type conn struct {
|
||||
cid int64
|
||||
@ -3298,145 +3298,278 @@ func flaglist(fl store.Flags, keywords []string) listspace {
|
||||
}
|
||||
|
||||
// Append adds a message to a mailbox.
|
||||
// The MULTIAPPEND extension is implemented, allowing multiple flags/datetime/data
|
||||
// sets.
|
||||
//
|
||||
// State: Authenticated and selected.
|
||||
func (c *conn) cmdAppend(tag, cmd string, p *parser) {
|
||||
// Command: ../rfc/9051:3406 ../rfc/6855:204 ../rfc/3501:2527
|
||||
// Examples: ../rfc/9051:3482 ../rfc/3501:2589
|
||||
// Command: ../rfc/9051:3406 ../rfc/6855:204 ../rfc/3501:2527 ../rfc/3502:95
|
||||
// Examples: ../rfc/9051:3482 ../rfc/3501:2589 ../rfc/3502:175
|
||||
|
||||
// Request syntax: ../rfc/9051:6325 ../rfc/6855:219 ../rfc/3501:4547
|
||||
// A message that we've (partially) read from the client, and will be delivering to
|
||||
// the mailbox once we have them all. ../rfc/3502:49
|
||||
type appendMsg struct {
|
||||
storeFlags store.Flags
|
||||
keywords []string
|
||||
time time.Time
|
||||
|
||||
file *os.File // Message file we are appending. Can be nil if we are writing to a nopWriteCloser due to being over quota.
|
||||
path string // Path if an actual file, either a temporary file, or of the message file stored in the account.
|
||||
|
||||
mw *message.Writer
|
||||
m store.Message
|
||||
}
|
||||
|
||||
var appends []*appendMsg
|
||||
var committed bool
|
||||
defer func() {
|
||||
for _, a := range appends {
|
||||
if a.file != nil {
|
||||
err := a.file.Close()
|
||||
c.xsanity(err, "closing APPEND temporary file")
|
||||
}
|
||||
|
||||
if !committed && a.path != "" {
|
||||
err := os.Remove(a.path)
|
||||
c.xsanity(err, "removing APPEND temporary file")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Request syntax: ../rfc/9051:6325 ../rfc/6855:219 ../rfc/3501:4547 ../rfc/3502:218
|
||||
p.xspace()
|
||||
name := p.xmailbox()
|
||||
p.xspace()
|
||||
var storeFlags store.Flags
|
||||
var keywords []string
|
||||
if p.hasPrefix("(") {
|
||||
// Error must be a syntax error, to properly abort the connection due to literal.
|
||||
var err error
|
||||
storeFlags, keywords, err = store.ParseFlagsKeywords(p.xflagList())
|
||||
if err != nil {
|
||||
xsyntaxErrorf("parsing flags: %v", err)
|
||||
|
||||
// Check how much quota space is available. We'll keep track of remaining quota as
|
||||
// we accept multiple messages.
|
||||
quotaMsgMax := c.account.QuotaMessageSize()
|
||||
quotaUnlimited := quotaMsgMax == 0
|
||||
var quotaAvail int64
|
||||
var totalSize int64
|
||||
if !quotaUnlimited {
|
||||
c.account.WithRLock(func() {
|
||||
c.xdbread(func(tx *bstore.Tx) {
|
||||
du := store.DiskUsage{ID: 1}
|
||||
err := tx.Get(&du)
|
||||
xcheckf(err, "get quota disk usage")
|
||||
quotaAvail = quotaMsgMax - du.MessageSize
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
var overQuota bool // For response code.
|
||||
var cancel bool // In case we've seen zero-sized message append.
|
||||
|
||||
for {
|
||||
// Append msg early, for potential cleanup.
|
||||
var a appendMsg
|
||||
appends = append(appends, &a)
|
||||
|
||||
if p.hasPrefix("(") {
|
||||
// Error must be a syntax error, to properly abort the connection due to literal.
|
||||
var err error
|
||||
a.storeFlags, a.keywords, err = store.ParseFlagsKeywords(p.xflagList())
|
||||
if err != nil {
|
||||
xsyntaxErrorf("parsing flags: %v", err)
|
||||
}
|
||||
p.xspace()
|
||||
}
|
||||
p.xspace()
|
||||
}
|
||||
var tm time.Time
|
||||
if p.hasPrefix(`"`) {
|
||||
tm = p.xdateTime()
|
||||
p.xspace()
|
||||
} else {
|
||||
tm = time.Now()
|
||||
}
|
||||
// todo: only with utf8 should we we accept message headers with utf-8. we currently always accept them.
|
||||
// todo: this is only relevant if we also support the CATENATE extension?
|
||||
// ../rfc/6855:204
|
||||
utf8 := p.take("UTF8 (")
|
||||
size, sync := p.xliteralSize(utf8, false)
|
||||
if p.hasPrefix(`"`) {
|
||||
a.time = p.xdateTime()
|
||||
p.xspace()
|
||||
} else {
|
||||
a.time = time.Now()
|
||||
}
|
||||
// todo: only with utf8 should we we accept message headers with utf-8. we currently always accept them.
|
||||
// todo: this is only relevant if we also support the CATENATE extension?
|
||||
// ../rfc/6855:204
|
||||
utf8 := p.take("UTF8 (")
|
||||
size, synclit := p.xliteralSize(utf8, false)
|
||||
|
||||
name = xcheckmailboxname(name, true)
|
||||
c.xdbread(func(tx *bstore.Tx) {
|
||||
c.xmailbox(tx, name, "TRYCREATE")
|
||||
})
|
||||
if sync {
|
||||
c.writelinef("+ ")
|
||||
}
|
||||
if !quotaUnlimited && !overQuota {
|
||||
quotaAvail -= size
|
||||
overQuota = quotaAvail < 0
|
||||
}
|
||||
if size == 0 {
|
||||
cancel = true
|
||||
}
|
||||
|
||||
// Read the message into a temporary file.
|
||||
msgFile, err := store.CreateMessageTemp(c.log, "imap-append")
|
||||
xcheckf(err, "creating temp file for message")
|
||||
defer func() {
|
||||
p := msgFile.Name()
|
||||
err := msgFile.Close()
|
||||
c.xsanity(err, "closing APPEND temporary file")
|
||||
err = os.Remove(p)
|
||||
c.xsanity(err, "removing APPEND temporary file")
|
||||
}()
|
||||
defer c.xtrace(mlog.LevelTracedata)()
|
||||
mw := message.NewWriter(msgFile)
|
||||
msize, err := io.Copy(mw, io.LimitReader(c.br, size))
|
||||
c.xtrace(mlog.LevelTrace) // Restore.
|
||||
if err != nil {
|
||||
// Cannot use xcheckf due to %w handling of errIO.
|
||||
panic(fmt.Errorf("reading literal message: %s (%w)", err, errIO))
|
||||
}
|
||||
if msize != size {
|
||||
xserverErrorf("read %d bytes for message, expected %d (%w)", msize, size, errIO)
|
||||
}
|
||||
var f io.Writer
|
||||
if synclit {
|
||||
// Check for mailbox on first iteration.
|
||||
if len(appends) <= 1 {
|
||||
name = xcheckmailboxname(name, true)
|
||||
c.xdbread(func(tx *bstore.Tx) {
|
||||
c.xmailbox(tx, name, "TRYCREATE")
|
||||
})
|
||||
}
|
||||
|
||||
if overQuota {
|
||||
// ../rfc/9051:5155 ../rfc/9208:472
|
||||
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", quotaMsgMax)
|
||||
}
|
||||
|
||||
// ../rfc/3502:140
|
||||
if cancel {
|
||||
xuserErrorf("empty message, cancelling append")
|
||||
}
|
||||
|
||||
// Read the message into a temporary file.
|
||||
var err error
|
||||
a.file, err = store.CreateMessageTemp(c.log, "imap-append")
|
||||
xcheckf(err, "creating temp file for message")
|
||||
a.path = a.file.Name()
|
||||
f = a.file
|
||||
|
||||
c.writelinef("+ ")
|
||||
} else {
|
||||
// We'll discard the message and return an error as soon as we can (possible
|
||||
// synchronizing literal of next message, or after we've seen all messages).
|
||||
if overQuota || cancel {
|
||||
f = io.Discard
|
||||
} else {
|
||||
var err error
|
||||
a.file, err = store.CreateMessageTemp(c.log, "imap-append")
|
||||
xcheckf(err, "creating temp file for message")
|
||||
a.path = a.file.Name()
|
||||
f = a.file
|
||||
}
|
||||
}
|
||||
|
||||
defer c.xtrace(mlog.LevelTracedata)()
|
||||
a.mw = message.NewWriter(f)
|
||||
msize, err := io.Copy(a.mw, io.LimitReader(c.br, size))
|
||||
c.xtrace(mlog.LevelTrace) // Restore.
|
||||
if err != nil {
|
||||
// Cannot use xcheckf due to %w handling of errIO.
|
||||
panic(fmt.Errorf("reading literal message: %s (%w)", err, errIO))
|
||||
}
|
||||
if msize != size {
|
||||
xserverErrorf("read %d bytes for message, expected %d (%w)", msize, size, errIO)
|
||||
}
|
||||
totalSize += msize
|
||||
|
||||
if utf8 {
|
||||
line := c.readline(false)
|
||||
np := newParser(line, c)
|
||||
np.xtake(")")
|
||||
np.xempty()
|
||||
} else {
|
||||
line := c.readline(false)
|
||||
np := newParser(line, c)
|
||||
np.xempty()
|
||||
p = newParser(line, c)
|
||||
if utf8 {
|
||||
p.xtake(")")
|
||||
}
|
||||
|
||||
// The MULTIAPPEND extension allows more appends.
|
||||
if !p.space() {
|
||||
break
|
||||
}
|
||||
}
|
||||
p.xempty()
|
||||
if !sync {
|
||||
name = xcheckmailboxname(name, true)
|
||||
|
||||
name = xcheckmailboxname(name, true)
|
||||
|
||||
if overQuota {
|
||||
// ../rfc/9208:472
|
||||
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", quotaMsgMax)
|
||||
}
|
||||
|
||||
// ../rfc/3502:140
|
||||
if cancel {
|
||||
xuserErrorf("empty message, cancelling append")
|
||||
}
|
||||
|
||||
var mb store.Mailbox
|
||||
var m store.Message
|
||||
var pendingChanges []store.Change
|
||||
|
||||
// Append all messages in a single atomic transaction. ../rfc/3502:143
|
||||
|
||||
c.account.WithWLock(func() {
|
||||
var changes []store.Change
|
||||
|
||||
c.xdbwrite(func(tx *bstore.Tx) {
|
||||
mb = c.xmailbox(tx, name, "TRYCREATE")
|
||||
|
||||
// Ensure keywords are stored in mailbox.
|
||||
// Check quota for all messages at once.
|
||||
ok, maxSize, err := c.account.CanAddMessageSize(tx, totalSize)
|
||||
xcheckf(err, "checking quota")
|
||||
if !ok {
|
||||
// ../rfc/9208:472
|
||||
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
|
||||
}
|
||||
|
||||
modseq, err := c.account.NextModSeq(tx)
|
||||
xcheckf(err, "get next mod seq")
|
||||
|
||||
var mbKwChanged bool
|
||||
mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, keywords)
|
||||
for _, a := range appends {
|
||||
// Ensure keywords are stored in mailbox.
|
||||
var kwch bool
|
||||
mb.Keywords, kwch = store.MergeKeywords(mb.Keywords, a.keywords)
|
||||
mbKwChanged = mbKwChanged || kwch
|
||||
}
|
||||
if mbKwChanged {
|
||||
changes = append(changes, mb.ChangeKeywords())
|
||||
}
|
||||
|
||||
m = store.Message{
|
||||
MailboxID: mb.ID,
|
||||
MailboxOrigID: mb.ID,
|
||||
Received: tm,
|
||||
Flags: storeFlags,
|
||||
Keywords: keywords,
|
||||
Size: mw.Size,
|
||||
for _, a := range appends {
|
||||
a.m = store.Message{
|
||||
MailboxID: mb.ID,
|
||||
MailboxOrigID: mb.ID,
|
||||
Received: a.time,
|
||||
Flags: a.storeFlags,
|
||||
Keywords: a.keywords,
|
||||
Size: a.mw.Size,
|
||||
ModSeq: modseq,
|
||||
CreateSeq: modseq,
|
||||
}
|
||||
mb.Add(a.m.MailboxCounts())
|
||||
}
|
||||
|
||||
ok, maxSize, err := c.account.CanAddMessageSize(tx, m.Size)
|
||||
xcheckf(err, "checking quota")
|
||||
if !ok {
|
||||
// ../rfc/9051:5155 ../rfc/9208:472
|
||||
xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
|
||||
}
|
||||
|
||||
mb.Add(m.MailboxCounts())
|
||||
|
||||
// Update mailbox before delivering, which updates uidnext which we mustn't overwrite.
|
||||
mb.ModSeq = modseq
|
||||
err = tx.Update(&mb)
|
||||
xcheckf(err, "updating mailbox counts")
|
||||
|
||||
err = c.account.DeliverMessage(c.log, tx, &m, msgFile, true, false, false, true)
|
||||
xcheckf(err, "delivering message")
|
||||
for _, a := range appends {
|
||||
err = c.account.DeliverMessage(c.log, tx, &a.m, a.file, true, false, false, true)
|
||||
xcheckf(err, "delivering message")
|
||||
|
||||
// Update path to what is stored in the account. We may still have to clean it up on errors.
|
||||
a.path = c.account.MessagePath(a.m.ID)
|
||||
}
|
||||
})
|
||||
|
||||
// Success, make sure messages aren't cleaned up anymore.
|
||||
committed = true
|
||||
|
||||
// Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
|
||||
if c.comm != nil {
|
||||
pendingChanges = c.comm.Get()
|
||||
}
|
||||
|
||||
// Broadcast the change to other connections.
|
||||
changes = append(changes, m.ChangeAddUID(), mb.ChangeCounts())
|
||||
for _, a := range appends {
|
||||
changes = append(changes, a.m.ChangeAddUID())
|
||||
}
|
||||
changes = append(changes, mb.ChangeCounts())
|
||||
c.broadcast(changes)
|
||||
})
|
||||
|
||||
if c.mailboxID == mb.ID {
|
||||
c.applyChanges(pendingChanges, false)
|
||||
c.uidAppend(m.UID)
|
||||
for _, a := range appends {
|
||||
c.uidAppend(a.m.UID)
|
||||
}
|
||||
// todo spec: with condstore/qresync, is there a mechanism to the client know the modseq for the appended uid? in theory an untagged fetch with the modseq after the OK APPENDUID could make sense, but this probably isn't allowed.
|
||||
c.bwritelinef("* %d EXISTS", len(c.uids))
|
||||
}
|
||||
|
||||
c.writeresultf("%s OK [APPENDUID %d %d] appended", tag, mb.UIDValidity, m.UID)
|
||||
// ../rfc/4315:289 ../rfc/3502:236 APPENDUID
|
||||
// ../rfc/4315:276 ../rfc/4315:310 UID, and UID set for multiappend
|
||||
var uidset string
|
||||
if len(appends) == 1 {
|
||||
uidset = fmt.Sprintf("%d", appends[0].m.UID)
|
||||
} else {
|
||||
uidset = fmt.Sprintf("%d:%d", appends[0].m.UID, appends[len(appends)-1].m.UID)
|
||||
}
|
||||
c.writeresultf("%s OK [APPENDUID %d %s] appended", tag, mb.UIDValidity, uidset)
|
||||
}
|
||||
|
||||
// Idle makes a client wait until the server sends untagged updates, e.g. about
|
||||
|
Reference in New Issue
Block a user