queue: implement adding a message to the queue that gets sent to multiple recipients

and in a way that allows us to send that message to multiple recipients in a
single smtp transaction.
This commit is contained in:
Mechiel Lukkien
2024-03-05 20:10:28 +01:00
parent 15e450df61
commit 47ebfa8152
15 changed files with 346 additions and 276 deletions

View File

@ -78,7 +78,14 @@ var Localserve bool
// Use MakeMsg to make a message with fields that Add needs. Add will further set
// queueing related fields.
type Msg struct {
ID int64
ID int64
// A message for multiple recipients will get a BaseID that is identical to the
// first Msg.ID queued. They may be delivered in a single SMTP transaction if they
// are going to the same mail server. For messages with a single recipient, this
// field will be 0.
BaseID int64 `bstore:"index"`
Queued time.Time `bstore:"default now"`
SenderAccount string // Failures are delivered back to this local account. Also used for routing.
SenderLocalpart smtp.Localpart // Should be a local user and domain.
@ -208,10 +215,9 @@ func Count(ctx context.Context) (int, error) {
}
// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
now := time.Now()
return Msg{
SenderAccount: senderAccount,
SenderLocalpart: sender.Localpart,
SenderDomain: sender.IPDomain,
RecipientLocalpart: recipient.Localpart,
@ -228,25 +234,31 @@ func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf
}
}
// Add a new message to the queue. The queue is kicked immediately to start a
// first delivery attempt.
// Add one or more new messages to the queue. They'll get the same BaseID, so they
// can be delivered in a single SMTP transaction, with a single DATA command, but
// may be split into multiple transactions if errors/limits are encountered. The
// queue is kicked immediately to start a first delivery attempt.
//
// ID must be 0 and will be set after inserting in the queue.
// ID of the messagse must be 0 and will be set after inserting in the queue.
//
// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
// such as Queued, NextAttempt, LastAttempt, LastError.
func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
if len(qml) == 0 {
return fmt.Errorf("must queue at least one message")
}
if qm.ID != 0 {
return fmt.Errorf("id of queued message must be 0")
for _, qm := range qml {
if qm.ID != 0 {
return fmt.Errorf("id of queued messages must be 0")
}
}
if Localserve {
if qm.SenderAccount == "" {
if senderAccount == "" {
return fmt.Errorf("cannot queue with localserve without local account")
}
acc, err := store.OpenAccount(log, qm.SenderAccount)
acc, err := store.OpenAccount(log, senderAccount)
if err != nil {
return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
}
@ -254,17 +266,24 @@ func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
err := acc.Close()
log.Check(err, "closing account")
}()
m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
conf, _ := acc.Conf()
dest := conf.Destinations[qm.Sender().String()]
err = nil
acc.WithWLock(func() {
err = acc.DeliverDestination(log, dest, &m, msgFile)
for i, qm := range qml {
qml[i].SenderAccount = senderAccount
m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
dest := conf.Destinations[qm.Sender().String()]
err = acc.DeliverDestination(log, dest, &m, msgFile)
if err != nil {
err = fmt.Errorf("delivering message: %v", err)
return // Returned again outside WithWLock.
}
}
})
if err != nil {
return fmt.Errorf("delivering message: %v", err)
if err == nil {
log.Debug("immediately delivered from queue to sender")
}
log.Debug("immediately delivered from queue to sender")
return nil
return err
}
tx, err := DB.Begin(ctx, true)
@ -279,30 +298,49 @@ func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
}
}()
if err := tx.Insert(qm); err != nil {
return err
// Insert messages into queue. If there are multiple messages, they all get a
// non-zero BaseID that is the Msg.ID of the first message inserted.
var baseID int64
for i := range qml {
qml[i].SenderAccount = senderAccount
qml[i].BaseID = baseID
if err := tx.Insert(&qml[i]); err != nil {
return err
}
if i == 0 && len(qml) > 1 {
baseID = qml[i].ID
qml[i].BaseID = baseID
if err := tx.Update(&qml[i]); err != nil {
return err
}
}
}
dst := qm.MessagePath()
var paths []string
defer func() {
if dst != "" {
err := os.Remove(dst)
log.Check(err, "removing destination message file for queue", slog.String("path", dst))
for _, p := range paths {
err := os.Remove(p)
log.Check(err, "removing destination message file for queue", slog.String("path", p))
}
}()
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(log, dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)
for _, qm := range qml {
dst := qm.MessagePath()
paths = append(paths, dst)
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(log, dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %s", err)
}
tx = nil
dst = ""
paths = nil
queuekick()
return nil

View File

@ -117,12 +117,12 @@ func TestQueue(t *testing.T) {
var qm Msg
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery")
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery")
msgs, err = List(ctxbg)
@ -451,8 +451,8 @@ func TestQueue(t *testing.T) {
// Add a message to be delivered with submit because of its route.
topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
qm = MakeMsg("mjl", path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery")
wasNetDialer = testDeliver(fakeSubmitServer)
if !wasNetDialer {
@ -460,11 +460,11 @@ func TestQueue(t *testing.T) {
}
// Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml := []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
transportSubmitTLS := "submittls"
n, err = Kick(ctxbg, qm.ID, "", "", &transportSubmitTLS)
n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSubmitTLS)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -509,11 +509,11 @@ func TestQueue(t *testing.T) {
}
// Add a message to be delivered with socks.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
transportSocks := "socks"
n, err = Kick(ctxbg, qm.ID, "", "", &transportSocks)
n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSocks)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -525,10 +525,10 @@ func TestQueue(t *testing.T) {
// Add message to be delivered with opportunistic TLS verification.
clearTLSResults(t)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -539,10 +539,10 @@ func TestQueue(t *testing.T) {
// Test fallback to plain text with TLS handshake fails.
clearTLSResults(t)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -559,10 +559,10 @@ func TestQueue(t *testing.T) {
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
},
}
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -580,10 +580,10 @@ func TestQueue(t *testing.T) {
// Add message to be delivered with verified TLS and REQUIRETLS.
yes := true
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -597,10 +597,10 @@ func TestQueue(t *testing.T) {
{},
},
}
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -618,10 +618,10 @@ func TestQueue(t *testing.T) {
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
},
}
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -640,10 +640,10 @@ func TestQueue(t *testing.T) {
// Check that message is delivered with TLS-Required: No and non-matching DANE record.
no := false
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -651,10 +651,10 @@ func TestQueue(t *testing.T) {
testDeliver(fakeSMTPSTARTTLSServer)
// Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -662,10 +662,10 @@ func TestQueue(t *testing.T) {
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
// Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -677,10 +677,10 @@ func TestQueue(t *testing.T) {
resolver.TLSA = nil
// Add message with requiretls that fails immediately due to no verification policy for recipient domain.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes)
err = Add(ctxbg, pkglog, &qm, mf)
qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes)}
err = Add(ctxbg, pkglog, "mjl", mf, qml...)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, qm.ID, "", "", nil)
n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -692,8 +692,8 @@ func TestQueue(t *testing.T) {
})
// Add another message that we'll fail to deliver entirely.
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery")
msgs, err = List(ctxbg)
@ -883,8 +883,8 @@ func TestQueueStart(t *testing.T) {
mf := prepareFile(t)
defer os.Remove(mf.Name())
defer mf.Close()
qm := MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, &qm, mf)
qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, pkglog, "mjl", mf, qm)
tcheck(t, err, "add message to queue for delivery")
checkDialed(true)