mirror of
https://github.com/mjl-/mox.git
synced 2025-07-12 11:44:38 +03:00
queue: deliver to multiple recipients in a single smtp transaction
transferring the data only once. we only do this when the recipient domains are the same. when queuing, we now take care to set the same NextAttempt timestamp, so queued messages are actually eligable for combined delivery. this adds a DeliverMultiple to the smtp client. for pipelined requests, it will send all RCPT TO (and MAIL and DATA) in one go, and handles the various responses and error conditions, returning either an overal error, or per recipient smtp responses. the results of the smtp LIMITS extension are also available in the smtp client now. this also takes the "LIMITS RCPTMAX" smtp extension into account: if the server only accepts a single recipient, we won't send multiple. if a server doesn't announce a RCPTMAX limit, but still has one (like mox does for non-spf-verified transactions), we'll recognize code 452 and 552 (for historic reasons) as temporary error, and try again in a separate transaction immediately after. we don't yet implement "LIMITS MAILMAX", doesn't seem likely in practice.
This commit is contained in:
204
queue/queue.go
204
queue/queue.go
@ -5,6 +5,7 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
@ -81,9 +82,10 @@ type Msg struct {
|
||||
ID int64
|
||||
|
||||
// A message for multiple recipients will get a BaseID that is identical to the
|
||||
// first Msg.ID queued. They may be delivered in a single SMTP transaction if they
|
||||
// are going to the same mail server. For messages with a single recipient, this
|
||||
// field will be 0.
|
||||
// first Msg.ID queued. The message contents will be identical for each recipient,
|
||||
// including MsgPrefix. If other properties are identical too, including recipient
|
||||
// domain, multiple Msgs may be delivered in a single SMTP transaction. For
|
||||
// messages with a single recipient, this field will be 0.
|
||||
BaseID int64 `bstore:"index"`
|
||||
|
||||
Queued time.Time `bstore:"default now"`
|
||||
@ -215,22 +217,21 @@ func Count(ctx context.Context) (int, error) {
|
||||
}
|
||||
|
||||
// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
|
||||
func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
|
||||
now := time.Now()
|
||||
func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time) Msg {
|
||||
return Msg{
|
||||
SenderLocalpart: sender.Localpart,
|
||||
SenderDomain: sender.IPDomain,
|
||||
RecipientLocalpart: recipient.Localpart,
|
||||
RecipientDomain: recipient.IPDomain,
|
||||
RecipientDomainStr: formatIPDomain(recipient.IPDomain),
|
||||
Has8bit: has8bit,
|
||||
SMTPUTF8: smtputf8,
|
||||
Size: size,
|
||||
MessageID: messageID,
|
||||
MsgPrefix: prefix,
|
||||
RequireTLS: requireTLS,
|
||||
Queued: now,
|
||||
NextAttempt: now,
|
||||
RecipientDomainStr: formatIPDomain(recipient.IPDomain),
|
||||
Queued: time.Now(),
|
||||
NextAttempt: next,
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,8 +355,8 @@ func formatIPDomain(d dns.IPDomain) string {
|
||||
}
|
||||
|
||||
var (
|
||||
kick = make(chan struct{}, 1)
|
||||
deliveryResult = make(chan string, 1)
|
||||
kick = make(chan struct{}, 1)
|
||||
deliveryResults = make(chan string, 1)
|
||||
)
|
||||
|
||||
func queuekick() {
|
||||
@ -489,7 +490,7 @@ func Start(resolver dns.Resolver, done chan struct{}) error {
|
||||
return
|
||||
case <-kick:
|
||||
case <-timer.C:
|
||||
case domain := <-deliveryResult:
|
||||
case domain := <-deliveryResults:
|
||||
delete(busyDomains, domain)
|
||||
}
|
||||
|
||||
@ -537,7 +538,16 @@ func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]stru
|
||||
}
|
||||
q.FilterNotEqual("RecipientDomainStr", doms...)
|
||||
}
|
||||
msgs, err := q.List()
|
||||
var msgs []Msg
|
||||
seen := map[string]bool{}
|
||||
err := q.ForEach(func(m Msg) error {
|
||||
dom := m.RecipientDomainStr
|
||||
if _, ok := busyDomains[dom]; !ok && !seen[dom] {
|
||||
seen[dom] = true
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorx("querying for work in queue", err)
|
||||
mox.Sleep(mox.Shutdown, 1*time.Second)
|
||||
@ -545,24 +555,37 @@ func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]stru
|
||||
}
|
||||
|
||||
for _, m := range msgs {
|
||||
busyDomains[formatIPDomain(m.RecipientDomain)] = struct{}{}
|
||||
busyDomains[m.RecipientDomainStr] = struct{}{}
|
||||
go deliver(log, resolver, m)
|
||||
}
|
||||
return len(msgs)
|
||||
}
|
||||
|
||||
// Remove message from queue in database and file system.
|
||||
func queueDelete(ctx context.Context, msgID int64) error {
|
||||
if err := DB.Delete(ctx, &Msg{ID: msgID}); err != nil {
|
||||
func queueDelete(ctx context.Context, msgIDs ...int64) error {
|
||||
err := DB.Write(ctx, func(tx *bstore.Tx) error {
|
||||
for _, id := range msgIDs {
|
||||
if err := tx.Delete(&Msg{ID: id}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If removing from database fails, we'll also leave the file in the file system.
|
||||
|
||||
p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(msgID)))
|
||||
if err := os.Remove(p); err != nil {
|
||||
return fmt.Errorf("removing queue message from file system: %v", err)
|
||||
var errs []string
|
||||
for _, id := range msgIDs {
|
||||
p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(id)))
|
||||
if err := os.Remove(p); err != nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: %v", p, err))
|
||||
}
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return fmt.Errorf("removing message files from queue: %s", strings.Join(errs, "; "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -572,17 +595,16 @@ func queueDelete(ctx context.Context, msgID int64) error {
|
||||
func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
|
||||
ctx := mox.Shutdown
|
||||
|
||||
qlog := log.WithCid(mox.Cid()).With(slog.Any("from", m.Sender()),
|
||||
slog.Any("recipient", m.Recipient()),
|
||||
slog.Int("attempts", m.Attempts),
|
||||
slog.Int64("msgid", m.ID))
|
||||
qlog := log.WithCid(mox.Cid()).With(
|
||||
slog.Any("from", m.Sender()),
|
||||
slog.Int("attempts", m.Attempts))
|
||||
|
||||
defer func() {
|
||||
deliveryResult <- formatIPDomain(m.RecipientDomain)
|
||||
deliveryResults <- formatIPDomain(m.RecipientDomain)
|
||||
|
||||
x := recover()
|
||||
if x != nil {
|
||||
qlog.Error("deliver panic", slog.Any("panic", x))
|
||||
qlog.Error("deliver panic", slog.Any("panic", x), slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
|
||||
debug.PrintStack()
|
||||
metrics.PanicInc(metrics.Queue)
|
||||
}
|
||||
@ -600,6 +622,7 @@ func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
|
||||
backoff *= time.Duration(2)
|
||||
}
|
||||
m.Attempts++
|
||||
origNextAttempt := m.NextAttempt
|
||||
now := time.Now()
|
||||
m.LastAttempt = &now
|
||||
m.NextAttempt = now.Add(backoff)
|
||||
@ -607,26 +630,30 @@ func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
|
||||
qup.FilterID(m.ID)
|
||||
update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
|
||||
if _, err := qup.UpdateNonzero(update); err != nil {
|
||||
qlog.Errorx("storing delivery attempt", err)
|
||||
qlog.Errorx("storing delivery attempt", err, slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
|
||||
return
|
||||
}
|
||||
|
||||
// Find route for transport to use for delivery attempt.
|
||||
var transport config.Transport
|
||||
var transportName string
|
||||
if m.Transport != "" {
|
||||
var ok bool
|
||||
transport, ok = mox.Conf.Static.Transports[m.Transport]
|
||||
if !ok {
|
||||
var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
|
||||
fail(ctx, qlog, m, backoff, false, remoteMTA, "", fmt.Sprintf("cannot find transport %q", m.Transport), "", nil)
|
||||
return
|
||||
resolveTransport := func(mm Msg) (string, config.Transport, bool) {
|
||||
if mm.Transport != "" {
|
||||
transport, ok := mox.Conf.Static.Transports[mm.Transport]
|
||||
if !ok {
|
||||
return "", config.Transport{}, false
|
||||
}
|
||||
return mm.Transport, transport, ok
|
||||
}
|
||||
transportName = m.Transport
|
||||
} else {
|
||||
route := findRoute(m.Attempts-1, m)
|
||||
transport = route.ResolvedTransport
|
||||
transportName = route.Transport
|
||||
route := findRoute(mm.Attempts, mm)
|
||||
return route.Transport, route.ResolvedTransport, true
|
||||
}
|
||||
|
||||
// Find route for transport to use for delivery attempt.
|
||||
m.Attempts--
|
||||
transportName, transport, transportOK := resolveTransport(m)
|
||||
m.Attempts++
|
||||
if !transportOK {
|
||||
var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
|
||||
fail(ctx, qlog, []*Msg{&m}, m.DialedIPs, backoff, remoteMTA, fmt.Errorf("cannot find transport %q", m.Transport))
|
||||
return
|
||||
}
|
||||
|
||||
if transportName != "" {
|
||||
@ -634,8 +661,62 @@ func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
|
||||
qlog.Debug("delivering with transport")
|
||||
}
|
||||
|
||||
// Attempt to gather more recipients for this identical message, only with the same
|
||||
// recipient domain, and under the same conditions (recipientdomain, attempts,
|
||||
// requiretls, transport). ../rfc/5321:3759
|
||||
msgs := []*Msg{&m}
|
||||
if m.BaseID != 0 {
|
||||
err := DB.Write(mox.Shutdown, func(tx *bstore.Tx) error {
|
||||
q := bstore.QueryTx[Msg](tx)
|
||||
q.FilterNonzero(Msg{BaseID: m.BaseID, RecipientDomainStr: m.RecipientDomainStr, Attempts: m.Attempts - 1})
|
||||
q.FilterNotEqual("ID", m.ID)
|
||||
q.FilterLessEqual("NextAttempt", origNextAttempt)
|
||||
err := q.ForEach(func(xm Msg) error {
|
||||
mrtls := m.RequireTLS != nil
|
||||
xmrtls := xm.RequireTLS != nil
|
||||
if mrtls != xmrtls || mrtls && *m.RequireTLS != *xm.RequireTLS {
|
||||
return nil
|
||||
}
|
||||
tn, _, ok := resolveTransport(xm)
|
||||
if ok && tn == transportName {
|
||||
msgs = append(msgs, &xm)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("looking up more recipients: %v", err)
|
||||
}
|
||||
|
||||
// Mark these additional messages as attempted too.
|
||||
for _, mm := range msgs[1:] {
|
||||
mm.Attempts++
|
||||
mm.NextAttempt = m.NextAttempt
|
||||
mm.LastAttempt = m.LastAttempt
|
||||
if err := tx.Update(mm); err != nil {
|
||||
return fmt.Errorf("updating more message recipients for smtp transaction: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
qlog.Errorx("error finding more recipients for message, will attempt to send to single recipient", err)
|
||||
msgs = msgs[:1]
|
||||
}
|
||||
}
|
||||
if len(msgs) > 1 {
|
||||
ids := make([]int64, len(msgs))
|
||||
rcpts := make([]smtp.Path, len(msgs))
|
||||
for i, m := range msgs {
|
||||
ids[i] = m.ID
|
||||
rcpts[i] = m.Recipient()
|
||||
}
|
||||
qlog.Debug("delivering to multiple recipients", slog.Any("msgids", ids), slog.Any("recipients", rcpts))
|
||||
} else {
|
||||
qlog.Debug("delivering to single recipient", slog.Any("msgid", m.ID), slog.Any("recipient", m.Recipient()))
|
||||
}
|
||||
|
||||
// We gather TLS connection successes and failures during delivery, and we store
|
||||
// them in tlsrptb. Every 24 hours we send an email with a report to the recipient
|
||||
// them in tlsrptdb. Every 24 hours we send an email with a report to the recipient
|
||||
// domains that opt in via a TLSRPT DNS record. For us, the tricky part is
|
||||
// collecting all reporting information. We've got several TLS modes
|
||||
// (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
|
||||
@ -719,28 +800,28 @@ func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
|
||||
|
||||
var dialer smtpclient.Dialer = &net.Dialer{}
|
||||
if transport.Submissions != nil {
|
||||
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
|
||||
deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submissions, true, 465)
|
||||
} else if transport.Submission != nil {
|
||||
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
|
||||
deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submission, false, 587)
|
||||
} else if transport.SMTP != nil {
|
||||
// todo future: perhaps also gather tlsrpt results for submissions.
|
||||
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
|
||||
deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.SMTP, false, 25)
|
||||
} else {
|
||||
ourHostname := mox.Conf.Static.HostnameDomain
|
||||
if transport.Socks != nil {
|
||||
socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
|
||||
if err != nil {
|
||||
fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", fmt.Sprintf("socks dialer: %v", err), "", nil)
|
||||
fail(ctx, qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer: %v", err))
|
||||
return
|
||||
} else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
|
||||
fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", "socks dialer is not a contextdialer", "", nil)
|
||||
fail(ctx, qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer is not a contextdialer"))
|
||||
return
|
||||
} else {
|
||||
dialer = d
|
||||
}
|
||||
ourHostname = transport.Socks.Hostname
|
||||
}
|
||||
recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, m, backoff)
|
||||
recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, msgs, backoff)
|
||||
}
|
||||
}
|
||||
|
||||
@ -782,3 +863,30 @@ func routeMatchDomain(l []string, d dns.Domain) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Returns string representing delivery result for err, and number of delivered and
|
||||
// failed messages.
|
||||
//
|
||||
// Values: ok, okpartial, timeout, canceled, temperror, permerror, error.
|
||||
func deliveryResult(err error, delivered, failed int) string {
|
||||
var cerr smtpclient.Error
|
||||
switch {
|
||||
case err == nil:
|
||||
if delivered == 0 {
|
||||
return "error"
|
||||
} else if failed > 0 {
|
||||
return "okpartial"
|
||||
}
|
||||
return "ok"
|
||||
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
|
||||
return "timeout"
|
||||
case errors.Is(err, context.Canceled):
|
||||
return "canceled"
|
||||
case errors.As(err, &cerr):
|
||||
if cerr.Permanent {
|
||||
return "permerror"
|
||||
}
|
||||
return "temperror"
|
||||
}
|
||||
return "error"
|
||||
}
|
||||
|
Reference in New Issue
Block a user