new feature: when delivering messages from the queue, make it possible to use a "transport"

the default transport is still just "direct delivery", where we connect to the
destination domain's MX servers.

other transports are:

- regular smtp without authentication, this is relaying to a smarthost.
- submission with authentication, e.g. to a third party email sending service.
- direct delivery, but with with connections going through a socks proxy. this
  can be helpful if your ip is blocked, you need to get email out, and you have
  another IP that isn't blocked.

keep in mind that for all of the above, appropriate SPF/DKIM settings have to
be configured. the "dnscheck" for a domain does a check for any SOCKS IP in the
SPF record. SPF for smtp/submission (ranges? includes?) and any DKIM
requirements cannot really be checked.

which transport is used can be configured through routes. routes can be set on
an account, a domain, or globally. the routes are evaluated in that order, with
the first match selecting the transport. these routes are evaluated for each
delivery attempt. common selection criteria are recipient domain and sender
domain, but also which delivery attempt this is. you could configured mox to
attempt sending through a 3rd party from the 4th attempt onwards.

routes and transports are optional. if no route matches, or an empty/zero
transport is selected, normal direct delivery is done.

we could already "submit" emails with 3rd party accounts with "sendmail". but
we now support more SASL authentication mechanisms with SMTP (not only PLAIN,
but also SCRAM-SHA-256, SCRAM-SHA-1 and CRAM-MD5), which sendmail now also
supports. sendmail will use the most secure mechanism supported by the server,
or the explicitly configured mechanism.

for issue #36 by dmikushin. also based on earlier discussion on hackernews.
This commit is contained in:
Mechiel Lukkien
2023-06-16 18:38:28 +02:00
parent 2eecf38842
commit 8096441f67
36 changed files with 3125 additions and 650 deletions

366
queue/direct.go Normal file
View File

@ -0,0 +1,366 @@
package queue
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"time"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/dsn"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/mtasts"
"github.com/mjl-/mox/mtastsdb"
"github.com/mjl-/mox/smtpclient"
"github.com/mjl-/mox/store"
)
// todo: rename function, perhaps put some of the params in a delivery struct so we don't pass all the params all the time?
func fail(qlog *mlog.Log, m Msg, backoff time.Duration, permanent bool, remoteMTA dsn.NameIP, secodeOpt, errmsg string) {
if permanent || m.Attempts >= 8 {
qlog.Errorx("permanent failure delivering from queue", errors.New(errmsg))
queueDSNFailure(qlog, m, remoteMTA, secodeOpt, errmsg)
if err := queueDelete(context.Background(), m.ID); err != nil {
qlog.Errorx("deleting message from queue after permanent failure", err)
}
return
}
qup := bstore.QueryDB[Msg](context.Background(), DB)
qup.FilterID(m.ID)
if _, err := qup.UpdateNonzero(Msg{LastError: errmsg, DialedIPs: m.DialedIPs}); err != nil {
qlog.Errorx("storing delivery error", err, mlog.Field("deliveryerror", errmsg))
}
if m.Attempts == 5 {
// We've attempted deliveries at these intervals: 0, 7.5m, 15m, 30m, 1h, 2u.
// Let sender know delivery is delayed.
qlog.Errorx("temporary failure delivering from queue, sending delayed dsn", errors.New(errmsg), mlog.Field("backoff", backoff))
retryUntil := m.LastAttempt.Add((4 + 8 + 16) * time.Hour)
queueDSNDelay(qlog, m, remoteMTA, secodeOpt, errmsg, retryUntil)
} else {
qlog.Errorx("temporary failure delivering from queue", errors.New(errmsg), mlog.Field("backoff", backoff), mlog.Field("nextattempt", m.NextAttempt))
}
}
// Delivery by directly dialing MX hosts for destination domain.
func deliverDirect(cid int64, qlog *mlog.Log, resolver dns.Resolver, dialer contextDialer, ourHostname dns.Domain, transportName string, m Msg, backoff time.Duration) {
hosts, effectiveDomain, permanent, err := gatherHosts(resolver, m, cid, qlog)
if err != nil {
fail(qlog, m, backoff, permanent, dsn.NameIP{}, "", err.Error())
return
}
// Check for MTA-STS policy and enforce it if needed. We have to check the
// effective domain (found after following CNAME record(s)): there will certainly
// not be an mtasts record for the original recipient domain, because that is not
// allowed when a CNAME record is present.
var policyFresh bool
var policy *mtasts.Policy
tlsModeDefault := smtpclient.TLSOpportunistic
if !effectiveDomain.IsZero() {
cidctx := context.WithValue(mox.Shutdown, mlog.CidKey, cid)
policy, policyFresh, err = mtastsdb.Get(cidctx, resolver, effectiveDomain)
if err != nil {
// No need to refuse to deliver if we have some mtasts error.
qlog.Infox("mtasts failed, continuing with strict tls requirement", err, mlog.Field("domain", effectiveDomain))
tlsModeDefault = smtpclient.TLSStrictStartTLS
}
// note: policy can be nil, if a domain does not implement MTA-STS or its the first
// time we fetch the policy and if we encountered an error.
}
// We try delivery to each record until we have success or a permanent failure. So
// for transient errors, we'll try the next MX record. For MX records pointing to a
// dual stack host, we turn a permanent failure due to policy on the first delivery
// attempt into a temporary failure and make sure to try the other address family
// the next attempt. This should reduce issues due to one of our IPs being on a
// block list. We won't try multiple IPs of the same address family. Surprisingly,
// RFC 5321 does not specify a clear algorithm, but common practicie is probably
// ../rfc/3974:268.
var remoteMTA dsn.NameIP
var secodeOpt, errmsg string
permanent = false
mtastsFailure := true
// todo: should make distinction between host permanently not accepting the message, and the message not being deliverable permanently. e.g. a mx host may have a size limit, or not accept 8bitmime, while another host in the list does accept the message. same for smtputf8, ../rfc/6531:555
for _, h := range hosts {
var badTLS, ok bool
// ../rfc/8461:913
if policy != nil && policy.Mode == mtasts.ModeEnforce && !policy.Matches(h.Domain) {
var policyHosts []string
for _, mx := range policy.MX {
policyHosts = append(policyHosts, mx.LogString())
}
errmsg = fmt.Sprintf("mx host %s does not match enforced mta-sts policy with hosts %s", h.Domain, strings.Join(policyHosts, ","))
qlog.Error("mx host does not match enforce mta-sts policy, skipping", mlog.Field("host", h.Domain), mlog.Field("policyhosts", policyHosts))
continue
}
qlog.Info("delivering to remote", mlog.Field("remote", h), mlog.Field("queuecid", cid))
cid := mox.Cid()
nqlog := qlog.WithCid(cid)
var remoteIP net.IP
tlsMode := tlsModeDefault
if policy != nil && policy.Mode == mtasts.ModeEnforce {
tlsMode = smtpclient.TLSStrictStartTLS
}
permanent, badTLS, secodeOpt, remoteIP, errmsg, ok = deliverHost(nqlog, resolver, dialer, cid, ourHostname, transportName, h, &m, tlsMode)
if !ok && badTLS && tlsMode == smtpclient.TLSOpportunistic {
// In case of failure with opportunistic TLS, try again without TLS. ../rfc/7435:459
// todo future: revisit this decision. perhaps it should be a configuration option that defaults to not doing this?
nqlog.Info("connecting again for delivery attempt without tls")
permanent, badTLS, secodeOpt, remoteIP, errmsg, ok = deliverHost(nqlog, resolver, dialer, cid, ourHostname, transportName, h, &m, smtpclient.TLSSkip)
}
if ok {
nqlog.Info("delivered from queue")
if err := queueDelete(context.Background(), m.ID); err != nil {
nqlog.Errorx("deleting message from queue after delivery", err)
}
return
}
remoteMTA = dsn.NameIP{Name: h.XString(false), IP: remoteIP}
if !badTLS {
mtastsFailure = false
}
if permanent {
break
}
}
if mtastsFailure && policyFresh {
permanent = true
}
fail(qlog, m, backoff, permanent, remoteMTA, secodeOpt, errmsg)
}
var (
errCNAMELoop = errors.New("cname loop")
errCNAMELimit = errors.New("too many cname records")
errNoRecord = errors.New("no dns record")
errDNS = errors.New("dns lookup error")
errNoMail = errors.New("domain does not accept email as indicated with single dot for mx record")
)
// Gather hosts to try to deliver to. We start with the straight-forward MX record.
// If that does not exist, we'll look for CNAME of the entire domain (following
// chains if needed). If a CNAME does not exist, but the domain name has an A or
// AAAA record, we'll try delivery directly to that host.
// ../rfc/5321:3824
func gatherHosts(resolver dns.Resolver, m Msg, cid int64, qlog *mlog.Log) (hosts []dns.IPDomain, effectiveDomain dns.Domain, permanent bool, err error) {
if len(m.RecipientDomain.IP) > 0 {
return []dns.IPDomain{m.RecipientDomain}, effectiveDomain, false, nil
}
// We start out delivering to the recipient domain. We follow CNAMEs a few times.
rcptDomain := m.RecipientDomain.Domain
// Domain we are actually delivering to, after following CNAME record(s).
effectiveDomain = rcptDomain
domainsSeen := map[string]bool{}
for i := 0; ; i++ {
if domainsSeen[effectiveDomain.ASCII] {
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain %s: already saw %s", errCNAMELoop, rcptDomain, effectiveDomain)
}
domainsSeen[effectiveDomain.ASCII] = true
// note: The Go resolver returns the requested name if the domain has no CNAME record but has a host record.
if i == 16 {
// We have a maximum number of CNAME records we follow. There is no hard limit for
// DNS, and you might think folks wouldn't configure CNAME chains at all, but for
// (non-mail) domains, CNAME chains of 10 records have been encountered according
// to the internet.
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain %s, last resolved domain %s", errCNAMELimit, rcptDomain, effectiveDomain)
}
cidctx := context.WithValue(mox.Context, mlog.CidKey, cid)
ctx, cancel := context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
// Note: LookupMX can return an error and still return records: Invalid records are
// filtered out and an error returned. We must process any records that are valid.
// Only if all are unusable will we return an error. ../rfc/5321:3851
mxl, err := resolver.LookupMX(ctx, effectiveDomain.ASCII+".")
cancel()
if err != nil && len(mxl) == 0 {
if !dns.IsNotFound(err) {
return nil, effectiveDomain, false, fmt.Errorf("%w: mx lookup for %s: %v", errDNS, effectiveDomain, err)
}
// No MX record. First attempt CNAME lookup. ../rfc/5321:3838 ../rfc/3974:197
ctx, cancel = context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
cname, err := resolver.LookupCNAME(ctx, effectiveDomain.ASCII+".")
cancel()
if err != nil && !dns.IsNotFound(err) {
return nil, effectiveDomain, false, fmt.Errorf("%w: cname lookup for %s: %v", errDNS, effectiveDomain, err)
}
if err == nil && cname != effectiveDomain.ASCII+"." {
d, err := dns.ParseDomain(strings.TrimSuffix(cname, "."))
if err != nil {
return nil, effectiveDomain, true, fmt.Errorf("%w: parsing cname domain %s: %v", errDNS, effectiveDomain, err)
}
effectiveDomain = d
// Start again with new domain.
continue
}
// See if the host exists. If so, attempt delivery directly to host. ../rfc/5321:3842
ctx, cancel = context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
_, err = resolver.LookupHost(ctx, effectiveDomain.ASCII+".")
cancel()
if dns.IsNotFound(err) {
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain/host %s", errNoRecord, effectiveDomain)
} else if err != nil {
return nil, effectiveDomain, false, fmt.Errorf("%w: looking up host %s because of no mx record: %v", errDNS, effectiveDomain, err)
}
hosts = []dns.IPDomain{{Domain: effectiveDomain}}
} else if err != nil {
qlog.Infox("partial mx failure, attempting delivery to valid mx records", err)
}
// ../rfc/7505:122
if err == nil && len(mxl) == 1 && mxl[0].Host == "." {
return nil, effectiveDomain, true, errNoMail
}
// The Go resolver already sorts by preference, randomizing records of same
// preference. ../rfc/5321:3885
for _, mx := range mxl {
host, err := dns.ParseDomain(strings.TrimSuffix(mx.Host, "."))
if err != nil {
// note: should not happen because Go resolver already filters these out.
return nil, effectiveDomain, true, fmt.Errorf("%w: invalid host name in mx record %q: %v", errDNS, mx.Host, err)
}
hosts = append(hosts, dns.IPDomain{Domain: host})
}
if len(hosts) > 0 {
err = nil
}
return hosts, effectiveDomain, false, err
}
}
// deliverHost attempts to deliver m to host.
// deliverHost updated m.DialedIPs, which must be saved in case of failure to deliver.
func deliverHost(log *mlog.Log, resolver dns.Resolver, dialer contextDialer, cid int64, ourHostname dns.Domain, transportName string, host dns.IPDomain, m *Msg, tlsMode smtpclient.TLSMode) (permanent, badTLS bool, secodeOpt string, remoteIP net.IP, errmsg string, ok bool) {
// About attempting delivery to multiple addresses of a host: ../rfc/5321:3898
start := time.Now()
var deliveryResult string
defer func() {
metricDelivery.WithLabelValues(fmt.Sprintf("%d", m.Attempts), transportName, string(tlsMode), deliveryResult).Observe(float64(time.Since(start)) / float64(time.Second))
log.Debug("queue deliverhost result", mlog.Field("host", host), mlog.Field("attempt", m.Attempts), mlog.Field("tlsmode", tlsMode), mlog.Field("permanent", permanent), mlog.Field("badtls", badTLS), mlog.Field("secodeopt", secodeOpt), mlog.Field("errmsg", errmsg), mlog.Field("ok", ok), mlog.Field("duration", time.Since(start)))
}()
f, err := os.Open(m.MessagePath())
if err != nil {
return false, false, "", nil, fmt.Sprintf("open message file: %s", err), false
}
msgr := store.FileMsgReader(m.MsgPrefix, f)
defer func() {
err := msgr.Close()
log.Check(err, "closing message after delivery attempt")
}()
cidctx := context.WithValue(mox.Context, mlog.CidKey, cid)
ctx, cancel := context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
conn, ip, dualstack, err := dialHost(ctx, log, resolver, dialer, host, 25, m)
remoteIP = ip
cancel()
var result string
switch {
case err == nil:
result = "ok"
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
result = "timeout"
case errors.Is(err, context.Canceled):
result = "canceled"
default:
result = "error"
}
metricConnection.WithLabelValues(result).Inc()
if err != nil {
log.Debugx("connecting to remote smtp", err, mlog.Field("host", host))
return false, false, "", ip, fmt.Sprintf("dialing smtp server: %v", err), false
}
var mailFrom string
if m.SenderLocalpart != "" || !m.SenderDomain.IsZero() {
mailFrom = m.Sender().XString(m.SMTPUTF8)
}
rcptTo := m.Recipient().XString(m.SMTPUTF8)
// todo future: get closer to timeouts specified in rfc? ../rfc/5321:3610
log = log.Fields(mlog.Field("remoteip", ip))
ctx, cancel = context.WithTimeout(cidctx, 30*time.Minute)
defer cancel()
mox.Connections.Register(conn, "smtpclient", "queue")
sc, err := smtpclient.New(ctx, log, conn, tlsMode, ourHostname, host.Domain, nil)
defer func() {
if sc == nil {
conn.Close()
} else {
sc.Close()
}
mox.Connections.Unregister(conn)
}()
if err == nil {
has8bit := m.Has8bit
smtputf8 := m.SMTPUTF8
var msg io.Reader = msgr
size := m.Size
if m.DSNUTF8 != nil && sc.Supports8BITMIME() && sc.SupportsSMTPUTF8() {
has8bit = true
smtputf8 = true
size = int64(len(m.DSNUTF8))
msg = bytes.NewReader(m.DSNUTF8)
}
err = sc.Deliver(ctx, mailFrom, rcptTo, size, msg, has8bit, smtputf8)
}
if err != nil {
log.Infox("delivery failed", err)
}
var cerr smtpclient.Error
switch {
case err == nil:
deliveryResult = "ok"
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
deliveryResult = "timeout"
case errors.Is(err, context.Canceled):
deliveryResult = "canceled"
case errors.As(err, &cerr):
deliveryResult = "temperror"
if cerr.Permanent {
deliveryResult = "permerror"
}
default:
deliveryResult = "error"
}
if err == nil {
return false, false, "", ip, "", true
} else if cerr, ok := err.(smtpclient.Error); ok {
// If we are being rejected due to policy reasons on the first
// attempt and remote has both IPv4 and IPv6, we'll give it
// another try. Our first IP may be in a block list, the address for
// the other family perhaps is not.
permanent := cerr.Permanent
if permanent && m.Attempts == 1 && dualstack && strings.HasPrefix(cerr.Secode, "7.") {
permanent = false
}
return permanent, errors.Is(cerr, smtpclient.ErrTLS), cerr.Secode, ip, cerr.Error(), false
} else {
return false, errors.Is(cerr, smtpclient.ErrTLS), "", ip, err.Error(), false
}
}

View File

@ -4,9 +4,7 @@
package queue
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
@ -17,21 +15,21 @@ import (
"strings"
"time"
"golang.org/x/net/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/mjl-/bstore"
"github.com/mjl-/mox/config"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/dsn"
"github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/moxio"
"github.com/mjl-/mox/mtasts"
"github.com/mjl-/mox/mtastsdb"
"github.com/mjl-/mox/smtp"
"github.com/mjl-/mox/smtpclient"
"github.com/mjl-/mox/store"
)
@ -47,24 +45,36 @@ var (
"result", // "ok", "timeout", "canceled", "error"
},
)
metricDeliveryHost = promauto.NewHistogramVec(
metricDelivery = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "mox_queue_delivery_duration_seconds",
Help: "SMTP client delivery attempt to single host.",
Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
},
[]string{
"attempt", // Number of attempts.
"tlsmode", // strict, opportunistic, skip
"result", // ok, timeout, canceled, temperror, permerror, error
"attempt", // Number of attempts.
"transport", // empty for default direct delivery.
"tlsmode", // strict, opportunistic, skip
"result", // ok, timeout, canceled, temperror, permerror, error
},
)
)
type contextDialer interface {
DialContext(ctx context.Context, network, addr string) (c net.Conn, err error)
}
// Used to dial remote SMTP servers.
// Overridden for tests.
var dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dialer := &net.Dialer{Timeout: timeout, LocalAddr: laddr}
var dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
// If this is a net.Dialer, use its settings and add the timeout and localaddr.
// This is the typical case, but SOCKS5 support can use a different dialer.
if d, ok := dialer.(*net.Dialer); ok {
nd := *d
nd.Timeout = timeout
nd.LocalAddr = laddr
return nd.DialContext(ctx, "tcp", addr)
}
return dialer.DialContext(ctx, "tcp", addr)
}
@ -80,7 +90,7 @@ var Localserve bool
type Msg struct {
ID int64
Queued time.Time `bstore:"default now"`
SenderAccount string // Failures are delivered back to this local account.
SenderAccount string // Failures are delivered back to this local account. Also used for routing.
SenderLocalpart smtp.Localpart // Should be a local user and domain.
SenderDomain dns.IPDomain
RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
@ -95,7 +105,16 @@ type Msg struct {
SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
Size int64 // Full size of message, combined MsgPrefix with contents of message file.
MsgPrefix []byte
DSNUTF8 []byte // If set, this message is a DSN and this is a version using utf-8, for the case the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not relevant.
// If set, this message is a DSN and this is a version using utf-8, for the case
// the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
// relevant.
DSNUTF8 []byte
// If non-empty, the transport to use for this message. Can be set through cli or
// admin interface. If empty (the default for a submitted message), regular routing
// rules apply.
Transport string
}
// Sender of message as used in MAIL FROM.
@ -180,17 +199,17 @@ func Count(ctx context.Context) (int, error) {
// this data is used as the message when delivering the DSN and the remote SMTP
// server supports SMTPUTF8. If the remote SMTP server does not support SMTPUTF8,
// the regular non-utf8 message is delivered.
func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, consumeFile bool) error {
func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, consumeFile bool) (int64, error) {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
if Localserve {
// Safety measure, shouldn't happen.
return fmt.Errorf("no queuing with localserve")
return 0, fmt.Errorf("no queuing with localserve")
}
tx, err := DB.Begin(ctx, true)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
return 0, fmt.Errorf("begin transaction: %w", err)
}
defer func() {
if tx != nil {
@ -201,10 +220,10 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
}()
now := time.Now()
qm := Msg{0, now, senderAccount, mailFrom.Localpart, mailFrom.IPDomain, rcptTo.Localpart, rcptTo.IPDomain, formatIPDomain(rcptTo.IPDomain), 0, nil, now, nil, "", has8bit, smtputf8, size, msgPrefix, dsnutf8Opt}
qm := Msg{0, now, senderAccount, mailFrom.Localpart, mailFrom.IPDomain, rcptTo.Localpart, rcptTo.IPDomain, formatIPDomain(rcptTo.IPDomain), 0, nil, now, nil, "", has8bit, smtputf8, size, msgPrefix, dsnutf8Opt, ""}
if err := tx.Insert(&qm); err != nil {
return err
return 0, err
}
dst := qm.MessagePath()
@ -219,27 +238,27 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
if consumeFile {
if err := os.Rename(msgFile.Name(), dst); err != nil {
// Could be due to cross-filesystem rename. Users shouldn't configure their systems that way.
return fmt.Errorf("move message into queue dir: %w", err)
return 0, fmt.Errorf("move message into queue dir: %w", err)
}
} else if err := os.Link(msgFile.Name(), dst); err != nil {
// Assume file system does not support hardlinks. Copy it instead.
if err := writeFile(dst, &moxio.AtReader{R: msgFile}); err != nil {
return fmt.Errorf("copying message to new file: %s", err)
return 0, fmt.Errorf("copying message to new file: %s", err)
}
}
if err := moxio.SyncDir(dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)
return 0, fmt.Errorf("sync directory: %v", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %s", err)
return 0, fmt.Errorf("commit transaction: %s", err)
}
tx = nil
dst = ""
queuekick()
return nil
return qm.ID, nil
}
// write contents of r to new file dst, for delivering a message.
@ -284,11 +303,13 @@ func queuekick() {
}
}
// Kick sets the NextAttempt for messages matching all parameters that are nonzero,
// and kicks the queue, attempting delivery of those messages. If all parameters
// are zero, all messages are kicked.
// Kick sets the NextAttempt for messages matching all filter parameters (ID,
// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
// of those messages. If all parameters are zero, all messages are kicked. If
// transport is set, the delivery attempts for the matching messages will use the
// transport. An empty string is the default transport, i.e. direct delivery.
// Returns number of messages queued for immediate delivery.
func Kick(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
q := bstore.QueryDB[Msg](ctx, DB)
if ID > 0 {
q.FilterID(ID)
@ -301,7 +322,17 @@ func Kick(ctx context.Context, ID int64, toDomain string, recipient string) (int
return qm.Recipient().XString(true) == recipient
})
}
n, err := q.UpdateNonzero(Msg{NextAttempt: time.Now()})
up := map[string]any{"NextAttempt": time.Now()}
if transport != nil {
if *transport != "" {
_, ok := mox.Conf.Static.Transports[*transport]
if !ok {
return 0, fmt.Errorf("unknown transport %q", *transport)
}
}
up["Transport"] = *transport
}
n, err := q.UpdateFields(up)
if err != nil {
return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
}
@ -500,343 +531,92 @@ func deliver(resolver dns.Resolver, m Msg) {
return
}
fail := func(permanent bool, remoteMTA dsn.NameIP, secodeOpt, errmsg string) {
if permanent || m.Attempts >= 8 {
qlog.Errorx("permanent failure delivering from queue", errors.New(errmsg))
queueDSNFailure(qlog, m, remoteMTA, secodeOpt, errmsg)
if err := queueDelete(context.Background(), m.ID); err != nil {
qlog.Errorx("deleting message from queue after permanent failure", err)
}
// Find route for transport to use for delivery attempt.
var transport config.Transport
var transportName string
if m.Transport != "" {
var ok bool
transport, ok = mox.Conf.Static.Transports[m.Transport]
if !ok {
var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
fail(qlog, m, backoff, false, remoteMTA, "", fmt.Sprintf("cannot find transport %q", m.Transport))
return
}
qup := bstore.QueryDB[Msg](context.Background(), DB)
qup.FilterID(m.ID)
if _, err := qup.UpdateNonzero(Msg{LastError: errmsg, DialedIPs: m.DialedIPs}); err != nil {
qlog.Errorx("storing delivery error", err, mlog.Field("deliveryerror", errmsg))
}
if m.Attempts == 5 {
// We've attempted deliveries at these intervals: 0, 7.5m, 15m, 30m, 1h, 2u.
// Let sender know delivery is delayed.
qlog.Errorx("temporary failure delivering from queue, sending delayed dsn", errors.New(errmsg), mlog.Field("backoff", backoff))
retryUntil := m.LastAttempt.Add((4 + 8 + 16) * time.Hour)
queueDSNDelay(qlog, m, remoteMTA, secodeOpt, errmsg, retryUntil)
} else {
qlog.Errorx("temporary failure delivering from queue", errors.New(errmsg), mlog.Field("backoff", backoff), mlog.Field("nextattempt", m.NextAttempt))
}
}
hosts, effectiveDomain, permanent, err := gatherHosts(resolver, m, cid, qlog)
if err != nil {
fail(permanent, dsn.NameIP{}, "", err.Error())
return
}
// Check for MTA-STS policy and enforce it if needed. We have to check the
// effective domain (found after following CNAME record(s)): there will certainly
// not be an mtasts record for the original recipient domain, because that is not
// allowed when a CNAME record is present.
var policyFresh bool
var policy *mtasts.Policy
tlsModeDefault := smtpclient.TLSOpportunistic
if !effectiveDomain.IsZero() {
cidctx := context.WithValue(mox.Shutdown, mlog.CidKey, cid)
policy, policyFresh, err = mtastsdb.Get(cidctx, resolver, effectiveDomain)
if err != nil {
// No need to refuse to deliver if we have some mtasts error.
qlog.Infox("mtasts failed, continuing with strict tls requirement", err, mlog.Field("domain", effectiveDomain))
tlsModeDefault = smtpclient.TLSStrict
}
// note: policy can be nil, if a domain does not implement MTA-STS or its the first
// time we fetch the policy and if we encountered an error.
}
// We try delivery to each record until we have success or a permanent failure. So
// for transient errors, we'll try the next MX record. For MX records pointing to a
// dual stack host, we turn a permanent failure due to policy on the first delivery
// attempt into a temporary failure and make sure to try the other address family
// the next attempt. This should reduce issues due to one of our IPs being on a
// block list. We won't try multiple IPs of the same address family. Surprisingly,
// RFC 5321 does not specify a clear algorithm, but common practicie is probably
// ../rfc/3974:268.
var remoteMTA dsn.NameIP
var secodeOpt, errmsg string
permanent = false
mtastsFailure := true
// todo: should make distinction between host permanently not accepting the message, and the message not being deliverable permanently. e.g. a mx host may have a size limit, or not accept 8bitmime, while another host in the list does accept the message. same for smtputf8, ../rfc/6531:555
for _, h := range hosts {
var badTLS, ok bool
// ../rfc/8461:913
if policy != nil && policy.Mode == mtasts.ModeEnforce && !policy.Matches(h.Domain) {
var policyHosts []string
for _, mx := range policy.MX {
policyHosts = append(policyHosts, mx.LogString())
}
errmsg = fmt.Sprintf("mx host %s does not match enforced mta-sts policy with hosts %s", h.Domain, strings.Join(policyHosts, ","))
qlog.Error("mx host does not match enforce mta-sts policy, skipping", mlog.Field("host", h.Domain), mlog.Field("policyhosts", policyHosts))
continue
}
qlog.Info("delivering to remote", mlog.Field("remote", h), mlog.Field("queuecid", cid))
cid := mox.Cid()
nqlog := qlog.WithCid(cid)
var remoteIP net.IP
tlsMode := tlsModeDefault
if policy != nil && policy.Mode == mtasts.ModeEnforce {
tlsMode = smtpclient.TLSStrict
}
permanent, badTLS, secodeOpt, remoteIP, errmsg, ok = deliverHost(nqlog, resolver, cid, h, &m, tlsMode)
if !ok && badTLS && tlsMode == smtpclient.TLSOpportunistic {
// In case of failure with opportunistic TLS, try again without TLS. ../rfc/7435:459
// todo future: revisit this decision. perhaps it should be a configuration option that defaults to not doing this?
nqlog.Info("connecting again for delivery attempt without tls")
permanent, badTLS, secodeOpt, remoteIP, errmsg, ok = deliverHost(nqlog, resolver, cid, h, &m, smtpclient.TLSSkip)
}
if ok {
nqlog.Info("delivered from queue")
if err := queueDelete(context.Background(), m.ID); err != nil {
nqlog.Errorx("deleting message from queue after delivery", err)
}
return
}
remoteMTA = dsn.NameIP{Name: h.XString(false), IP: remoteIP}
if !badTLS {
mtastsFailure = false
}
if permanent {
break
}
}
if mtastsFailure && policyFresh {
permanent = true
}
fail(permanent, remoteMTA, secodeOpt, errmsg)
}
var (
errCNAMELoop = errors.New("cname loop")
errCNAMELimit = errors.New("too many cname records")
errNoRecord = errors.New("no dns record")
errDNS = errors.New("dns lookup error")
errNoMail = errors.New("domain does not accept email as indicated with single dot for mx record")
)
// Gather hosts to try to deliver to. We start with the straight-forward MX record.
// If that does not exist, we'll look for CNAME of the entire domain (following
// chains if needed). If a CNAME does not exist, but the domain name has an A or
// AAAA record, we'll try delivery directly to that host.
// ../rfc/5321:3824
func gatherHosts(resolver dns.Resolver, m Msg, cid int64, qlog *mlog.Log) (hosts []dns.IPDomain, effectiveDomain dns.Domain, permanent bool, err error) {
if len(m.RecipientDomain.IP) > 0 {
return []dns.IPDomain{m.RecipientDomain}, effectiveDomain, false, nil
}
// We start out delivering to the recipient domain. We follow CNAMEs a few times.
rcptDomain := m.RecipientDomain.Domain
// Domain we are actually delivering to, after following CNAME record(s).
effectiveDomain = rcptDomain
domainsSeen := map[string]bool{}
for i := 0; ; i++ {
if domainsSeen[effectiveDomain.ASCII] {
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain %s: already saw %s", errCNAMELoop, rcptDomain, effectiveDomain)
}
domainsSeen[effectiveDomain.ASCII] = true
// note: The Go resolver returns the requested name if the domain has no CNAME record but has a host record.
if i == 16 {
// We have a maximum number of CNAME records we follow. There is no hard limit for
// DNS, and you might think folks wouldn't configure CNAME chains at all, but for
// (non-mail) domains, CNAME chains of 10 records have been encountered according
// to the internet.
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain %s, last resolved domain %s", errCNAMELimit, rcptDomain, effectiveDomain)
}
cidctx := context.WithValue(mox.Context, mlog.CidKey, cid)
ctx, cancel := context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
// Note: LookupMX can return an error and still return records: Invalid records are
// filtered out and an error returned. We must process any records that are valid.
// Only if all are unusable will we return an error. ../rfc/5321:3851
mxl, err := resolver.LookupMX(ctx, effectiveDomain.ASCII+".")
cancel()
if err != nil && len(mxl) == 0 {
if !dns.IsNotFound(err) {
return nil, effectiveDomain, false, fmt.Errorf("%w: mx lookup for %s: %v", errDNS, effectiveDomain, err)
}
// No MX record. First attempt CNAME lookup. ../rfc/5321:3838 ../rfc/3974:197
ctx, cancel = context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
cname, err := resolver.LookupCNAME(ctx, effectiveDomain.ASCII+".")
cancel()
if err != nil && !dns.IsNotFound(err) {
return nil, effectiveDomain, false, fmt.Errorf("%w: cname lookup for %s: %v", errDNS, effectiveDomain, err)
}
if err == nil && cname != effectiveDomain.ASCII+"." {
d, err := dns.ParseDomain(strings.TrimSuffix(cname, "."))
if err != nil {
return nil, effectiveDomain, true, fmt.Errorf("%w: parsing cname domain %s: %v", errDNS, effectiveDomain, err)
}
effectiveDomain = d
// Start again with new domain.
continue
}
// See if the host exists. If so, attempt delivery directly to host. ../rfc/5321:3842
ctx, cancel = context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
_, err = resolver.LookupHost(ctx, effectiveDomain.ASCII+".")
cancel()
if dns.IsNotFound(err) {
return nil, effectiveDomain, true, fmt.Errorf("%w: recipient domain/host %s", errNoRecord, effectiveDomain)
} else if err != nil {
return nil, effectiveDomain, false, fmt.Errorf("%w: looking up host %s because of no mx record: %v", errDNS, effectiveDomain, err)
}
hosts = []dns.IPDomain{{Domain: effectiveDomain}}
} else if err != nil {
qlog.Infox("partial mx failure, attempting delivery to valid mx records", err)
}
// ../rfc/7505:122
if err == nil && len(mxl) == 1 && mxl[0].Host == "." {
return nil, effectiveDomain, true, errNoMail
}
// The Go resolver already sorts by preference, randomizing records of same
// preference. ../rfc/5321:3885
for _, mx := range mxl {
host, err := dns.ParseDomain(strings.TrimSuffix(mx.Host, "."))
if err != nil {
// note: should not happen because Go resolver already filters these out.
return nil, effectiveDomain, true, fmt.Errorf("%w: invalid host name in mx record %q: %v", errDNS, mx.Host, err)
}
hosts = append(hosts, dns.IPDomain{Domain: host})
}
if len(hosts) > 0 {
err = nil
}
return hosts, effectiveDomain, false, err
}
}
// deliverHost attempts to deliver m to host.
// deliverHost updated m.DialedIPs, which must be saved in case of failure to deliver.
func deliverHost(log *mlog.Log, resolver dns.Resolver, cid int64, host dns.IPDomain, m *Msg, tlsMode smtpclient.TLSMode) (permanent, badTLS bool, secodeOpt string, remoteIP net.IP, errmsg string, ok bool) {
// About attempting delivery to multiple addresses of a host: ../rfc/5321:3898
start := time.Now()
var deliveryResult string
defer func() {
metricDeliveryHost.WithLabelValues(fmt.Sprintf("%d", m.Attempts), string(tlsMode), deliveryResult).Observe(float64(time.Since(start)) / float64(time.Second))
log.Debug("queue deliverhost result", mlog.Field("host", host), mlog.Field("attempt", m.Attempts), mlog.Field("tlsmode", tlsMode), mlog.Field("permanent", permanent), mlog.Field("badtls", badTLS), mlog.Field("secodeopt", secodeOpt), mlog.Field("errmsg", errmsg), mlog.Field("ok", ok), mlog.Field("duration", time.Since(start)))
}()
f, err := os.Open(m.MessagePath())
if err != nil {
return false, false, "", nil, fmt.Sprintf("open message file: %s", err), false
}
msgr := store.FileMsgReader(m.MsgPrefix, f)
defer func() {
err := msgr.Close()
log.Check(err, "closing message after delivery attempt")
}()
cidctx := context.WithValue(mox.Context, mlog.CidKey, cid)
ctx, cancel := context.WithTimeout(cidctx, 30*time.Second)
defer cancel()
conn, ip, dualstack, err := dialHost(ctx, log, resolver, host, m)
remoteIP = ip
cancel()
var result string
switch {
case err == nil:
result = "ok"
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
result = "timeout"
case errors.Is(err, context.Canceled):
result = "canceled"
default:
result = "error"
}
metricConnection.WithLabelValues(result).Inc()
if err != nil {
log.Debugx("connecting to remote smtp", err, mlog.Field("host", host))
return false, false, "", ip, fmt.Sprintf("dialing smtp server: %v", err), false
}
var mailFrom string
if m.SenderLocalpart != "" || !m.SenderDomain.IsZero() {
mailFrom = m.Sender().XString(m.SMTPUTF8)
}
rcptTo := m.Recipient().XString(m.SMTPUTF8)
// todo future: get closer to timeouts specified in rfc? ../rfc/5321:3610
log = log.Fields(mlog.Field("remoteip", ip))
ctx, cancel = context.WithTimeout(cidctx, 30*time.Minute)
defer cancel()
mox.Connections.Register(conn, "smtpclient", "queue")
sc, err := smtpclient.New(ctx, log, conn, tlsMode, host.String(), "")
defer func() {
if sc == nil {
conn.Close()
} else {
sc.Close()
}
mox.Connections.Unregister(conn)
}()
if err == nil {
has8bit := m.Has8bit
smtputf8 := m.SMTPUTF8
var msg io.Reader = msgr
size := m.Size
if m.DSNUTF8 != nil && sc.Supports8BITMIME() && sc.SupportsSMTPUTF8() {
has8bit = true
smtputf8 = true
size = int64(len(m.DSNUTF8))
msg = bytes.NewReader(m.DSNUTF8)
}
err = sc.Deliver(ctx, mailFrom, rcptTo, size, msg, has8bit, smtputf8)
}
if err != nil {
log.Infox("delivery failed", err)
}
var cerr smtpclient.Error
switch {
case err == nil:
deliveryResult = "ok"
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
deliveryResult = "timeout"
case errors.Is(err, context.Canceled):
deliveryResult = "canceled"
case errors.As(err, &cerr):
deliveryResult = "temperror"
if cerr.Permanent {
deliveryResult = "permerror"
}
default:
deliveryResult = "error"
}
if err == nil {
return false, false, "", ip, "", true
} else if cerr, ok := err.(smtpclient.Error); ok {
// If we are being rejected due to policy reasons on the first
// attempt and remote has both IPv4 and IPv6, we'll give it
// another try. Our first IP may be in a block list, the address for
// the other family perhaps is not.
permanent := cerr.Permanent
if permanent && m.Attempts == 1 && dualstack && strings.HasPrefix(cerr.Secode, "7.") {
permanent = false
}
return permanent, errors.Is(cerr, smtpclient.ErrTLS), cerr.Secode, ip, cerr.Error(), false
transportName = m.Transport
} else {
return false, errors.Is(cerr, smtpclient.ErrTLS), "", ip, err.Error(), false
route := findRoute(m.Attempts-1, m)
transport = route.ResolvedTransport
transportName = route.Transport
}
if transportName != "" {
qlog = qlog.Fields(mlog.Field("transport", transportName))
qlog.Debug("delivering with transport", mlog.Field("transport", transportName))
}
var dialer contextDialer = &net.Dialer{}
if transport.Submissions != nil {
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
} else if transport.Submission != nil {
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
} else if transport.SMTP != nil {
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
} else {
ourHostname := mox.Conf.Static.HostnameDomain
if transport.Socks != nil {
socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
if err != nil {
fail(qlog, m, backoff, false, dsn.NameIP{}, "", fmt.Sprintf("socks dialer: %v", err))
return
} else if d, ok := socksdialer.(contextDialer); !ok {
fail(qlog, m, backoff, false, dsn.NameIP{}, "", "socks dialer is not a contextdialer")
return
} else {
dialer = d
}
ourHostname = transport.Socks.Hostname
}
deliverDirect(cid, qlog, resolver, dialer, ourHostname, transportName, m, backoff)
}
}
func findRoute(attempt int, m Msg) config.Route {
routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
if r, ok := findRouteInList(attempt, m, routesAccount); ok {
return r
}
if r, ok := findRouteInList(attempt, m, routesDomain); ok {
return r
}
if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
return r
}
return config.Route{}
}
func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
for _, r := range routes {
if routeMatch(attempt, m, r) {
return r, true
}
}
return config.Route{}, false
}
func routeMatch(attempt int, m Msg, r config.Route) bool {
return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
}
func routeMatchDomain(l []string, d dns.Domain) bool {
if len(l) == 0 {
return true
}
for _, e := range l {
if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
return true
}
}
return false
}
// dialHost dials host for delivering Msg, taking previous attempts into accounts.
@ -846,7 +626,7 @@ func deliverHost(log *mlog.Log, resolver dns.Resolver, cid int64, host dns.IPDom
// If we have fully specified local smtp listen IPs, we set those for the outgoing
// connection. The admin probably configured these same IPs in SPF, but others
// possibly not.
func dialHost(ctx context.Context, log *mlog.Log, resolver dns.Resolver, host dns.IPDomain, m *Msg) (conn net.Conn, ip net.IP, dualstack bool, rerr error) {
func dialHost(ctx context.Context, log *mlog.Log, resolver dns.Resolver, dialer contextDialer, host dns.IPDomain, port int, m *Msg) (conn net.Conn, ip net.IP, dualstack bool, rerr error) {
var ips []net.IP
if len(host.IP) > 0 {
ips = []net.IP{host.IP}
@ -906,8 +686,8 @@ func dialHost(ctx context.Context, log *mlog.Log, resolver dns.Resolver, host dn
var lastErr error
var lastIP net.IP
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), "25")
log.Debug("dialing remote smtp", mlog.Field("addr", addr))
addr := net.JoinHostPort(ip.String(), fmt.Sprintf("%d", port))
log.Debug("dialing remote host for delivery", mlog.Field("addr", addr))
var laddr net.Addr
for _, lip := range mox.Conf.Static.SpecifiedSMTPListenIPs {
ipIs4 := ip.To4() != nil
@ -917,7 +697,7 @@ func dialHost(ctx context.Context, log *mlog.Log, resolver dns.Resolver, host dn
break
}
}
conn, err := dial(ctx, timeout, addr, laddr)
conn, err := dial(ctx, dialer, timeout, addr, laddr)
if err == nil {
log.Debug("connected for smtp delivery", mlog.Field("host", host), mlog.Field("addr", addr), mlog.Field("laddr", laddr))
if m.DialedIPs == nil {

View File

@ -3,9 +3,14 @@ package queue
import (
"bufio"
"context"
"crypto/ed25519"
cryptorand "crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"math/big"
"net"
"os"
"reflect"
@ -80,11 +85,11 @@ func TestQueue(t *testing.T) {
}
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
mf2 := prepareFile(t)
err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, mf2, nil, false)
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, mf2, nil, false)
tcheck(t, err, "add message to queue for delivery")
os.Remove(mf2.Name())
@ -120,11 +125,14 @@ func TestQueue(t *testing.T) {
// Override dial function. We'll make connecting fail for now.
resolver := dns.MockResolver{
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
A: map[string][]string{
"mox.example.": {"127.0.0.1"},
"submission.example.": {"127.0.0.1"},
},
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
}
dialed := make(chan struct{}, 1)
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dialed <- struct{}{}
return nil, fmt.Errorf("failure from test")
}
@ -167,26 +175,20 @@ func TestQueue(t *testing.T) {
t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
}
n, err = Kick(ctxbg, msg.ID+1, "", "")
n, err = Kick(ctxbg, msg.ID+1, "", "", nil)
tcheck(t, err, "kick")
if n != 0 {
t.Fatalf("kick %d, expected 0", n)
}
n, err = Kick(ctxbg, msg.ID, "", "")
n, err = Kick(ctxbg, msg.ID, "", "", nil)
tcheck(t, err, "kick")
if n != 1 {
t.Fatalf("kicked %d, expected 1", n)
}
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
// client-side to the invocation dial, for the attempted delivery from the queue.
// The delivery should succeed.
server, client := net.Pipe()
defer server.Close()
defer client.Close()
smtpdone := make(chan struct{})
go func() {
fakeSMTPServer := func(server net.Conn) {
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to cyclic dependencies.
fmt.Fprintf(server, "220 mox.example\r\n")
br := bufio.NewReader(server)
@ -205,42 +207,139 @@ func TestQueue(t *testing.T) {
fmt.Fprintf(server, "221 ok\r\n")
smtpdone <- struct{}{}
}()
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dialed <- struct{}{}
return client, nil
}
launchWork(resolver, map[string]struct{}{})
timer.Reset(time.Second)
select {
case <-dialed:
select {
case <-smtpdone:
i := 0
for {
xmsgs, err := List(ctxbg)
tcheck(t, err, "list queue")
if len(xmsgs) == 0 {
break
}
i++
if i == 10 {
t.Fatalf("%d messages in queue, expected 0", len(xmsgs))
}
time.Sleep(100 * time.Millisecond)
}
case <-timer.C:
t.Fatalf("no deliver within 1s")
fakeSubmitServer := func(server net.Conn) {
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to cyclic dependencies.
fmt.Fprintf(server, "220 mox.example\r\n")
br := bufio.NewReader(server)
br.ReadString('\n') // Should be EHLO.
fmt.Fprintf(server, "250-localhost\r\n")
fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
br.ReadString('\n') // Should be AUTH PLAIN
fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
br.ReadString('\n') // Should be MAIL FROM.
fmt.Fprintf(server, "250 ok\r\n")
br.ReadString('\n') // Should be RCPT TO.
fmt.Fprintf(server, "250 ok\r\n")
br.ReadString('\n') // Should be DATA.
fmt.Fprintf(server, "354 continue\r\n")
reader := smtp.NewDataReader(br)
io.Copy(io.Discard, reader)
fmt.Fprintf(server, "250 ok\r\n")
br.ReadString('\n') // Should be QUIT.
fmt.Fprintf(server, "221 ok\r\n")
smtpdone <- struct{}{}
}
testDeliver := func(fakeServer func(conn net.Conn)) bool {
t.Helper()
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
// client-side to the invocation dial, for the attempted delivery from the queue.
// The delivery should succeed.
server, client := net.Pipe()
defer server.Close()
defer client.Close()
var wasNetDialer bool
dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
_, wasNetDialer = dialer.(*net.Dialer)
dialed <- struct{}{}
return client, nil
}
case <-timer.C:
t.Fatalf("no dial within 1s")
waitDeliver := func() {
t.Helper()
timer.Reset(time.Second)
select {
case <-dialed:
select {
case <-smtpdone:
i := 0
for {
xmsgs, err := List(ctxbg)
tcheck(t, err, "list queue")
if len(xmsgs) == 0 {
break
}
i++
if i == 10 {
t.Fatalf("%d messages in queue, expected 0", len(xmsgs))
}
time.Sleep(100 * time.Millisecond)
}
case <-timer.C:
t.Fatalf("no deliver within 1s")
}
case <-timer.C:
t.Fatalf("no dial within 1s")
}
<-deliveryResult // Deliver sends here.
}
go fakeServer(server)
launchWork(resolver, map[string]struct{}{})
waitDeliver()
return wasNetDialer
}
// Test direct delivery.
wasNetDialer := testDeliver(fakeSMTPServer)
if !wasNetDialer {
t.Fatalf("expected net.Dialer as dialer")
}
// Add a message to be delivered with submit because of its route.
topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
_, err = Add(ctxbg, xlog, "mjl", path, topath, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
wasNetDialer = testDeliver(fakeSubmitServer)
if !wasNetDialer {
t.Fatalf("expected net.Dialer as dialer")
}
// Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
msgID, err := Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
transportSubmitTLS := "submittls"
n, err = Kick(ctxbg, msgID, "", "", &transportSubmitTLS)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
}
// Make fake cert, and make it trusted.
cert := fakeCert(t, "submission.example", false)
mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
tlsConfig := tls.Config{
Certificates: []tls.Certificate{cert},
}
wasNetDialer = testDeliver(func(conn net.Conn) {
conn = tls.Server(conn, &tlsConfig)
fakeSubmitServer(conn)
})
if !wasNetDialer {
t.Fatalf("expected net.Dialer as dialer")
}
// Add a message to be delivered with socks.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
transportSocks := "socks"
n, err = Kick(ctxbg, msgID, "", "", &transportSocks)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
}
wasNetDialer = testDeliver(fakeSMTPServer)
if wasNetDialer {
t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
}
<-deliveryResult // Deliver sends here.
// Add another message that we'll fail to deliver entirely.
err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
msgs, err = List(ctxbg)
@ -270,7 +369,7 @@ func TestQueue(t *testing.T) {
}()
seq := 0
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
seq++
switch seq {
default:
@ -337,7 +436,7 @@ func TestQueueStart(t *testing.T) {
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
}
dialed := make(chan struct{}, 1)
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dialed <- struct{}{}
return nil, fmt.Errorf("failure from test")
}
@ -374,7 +473,7 @@ func TestQueueStart(t *testing.T) {
}
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
tcheck(t, err, "add message to queue for delivery")
checkDialed(true)
@ -383,7 +482,7 @@ func TestQueueStart(t *testing.T) {
checkDialed(false)
// Kick for real, should see another attempt.
n, err := Kick(ctxbg, 0, "mox.example", "")
n, err := Kick(ctxbg, 0, "mox.example", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -520,7 +619,7 @@ func TestDialHost(t *testing.T) {
},
}
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
return nil, nil // No error, nil connection isn't used.
}
@ -529,12 +628,44 @@ func TestDialHost(t *testing.T) {
}
m := Msg{DialedIPs: map[string][]net.IP{}}
_, ip, dualstack, err := dialHost(ctxbg, xlog, resolver, ipdomain("dualstack.example"), &m)
_, ip, dualstack, err := dialHost(ctxbg, xlog, resolver, nil, ipdomain("dualstack.example"), 25, &m)
if err != nil || ip.String() != "10.0.0.1" || !dualstack {
t.Fatalf("expected err nil, address 10.0.0.1, dualstack true, got %v %v %v", err, ip, dualstack)
}
_, ip, dualstack, err = dialHost(ctxbg, xlog, resolver, ipdomain("dualstack.example"), &m)
_, ip, dualstack, err = dialHost(ctxbg, xlog, resolver, nil, ipdomain("dualstack.example"), 25, &m)
if err != nil || ip.String() != "2001:db8::1" || !dualstack {
t.Fatalf("expected err nil, address 2001:db8::1, dualstack true, got %v %v %v", err, ip, dualstack)
}
}
// Just a cert that appears valid.
func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
notAfter := time.Now()
if expired {
notAfter = notAfter.Add(-time.Hour)
} else {
notAfter = notAfter.Add(time.Hour)
}
privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
template := &x509.Certificate{
SerialNumber: big.NewInt(1), // Required field...
DNSNames: []string{name},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: notAfter,
}
localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
if err != nil {
t.Fatalf("making certificate: %s", err)
}
cert, err := x509.ParseCertificate(localCertBuf)
if err != nil {
t.Fatalf("parsing generated certificate: %s", err)
}
c := tls.Certificate{
Certificate: [][]byte{localCertBuf},
PrivateKey: privKey,
Leaf: cert,
}
return c
}

191
queue/submit.go Normal file
View File

@ -0,0 +1,191 @@
package queue
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"time"
"github.com/mjl-/mox/config"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/dsn"
"github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
"github.com/mjl-/mox/sasl"
"github.com/mjl-/mox/smtpclient"
"github.com/mjl-/mox/store"
)
// todo: reuse connection? do fewer concurrently (other than with direct delivery).
// deliver via another SMTP server, e.g. relaying to a smart host, possibly
// with authentication (submission).
func deliverSubmit(cid int64, qlog *mlog.Log, resolver dns.Resolver, dialer contextDialer, m Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
// todo: configurable timeouts
port := transport.Port
if port == 0 {
port = defaultPort
}
tlsMode := smtpclient.TLSStrictStartTLS
if dialTLS {
tlsMode = smtpclient.TLSStrictImmediate
} else if transport.STARTTLSInsecureSkipVerify {
tlsMode = smtpclient.TLSOpportunistic
} else if transport.NoSTARTTLS {
tlsMode = smtpclient.TLSSkip
}
start := time.Now()
var deliveryResult string
var permanent bool
var secodeOpt string
var errmsg string
var success bool
defer func() {
metricDelivery.WithLabelValues(fmt.Sprintf("%d", m.Attempts), transportName, string(tlsMode), deliveryResult).Observe(float64(time.Since(start)) / float64(time.Second))
qlog.Debug("queue deliversubmit result", mlog.Field("host", transport.DNSHost), mlog.Field("port", port), mlog.Field("attempt", m.Attempts), mlog.Field("permanent", permanent), mlog.Field("secodeopt", secodeOpt), mlog.Field("errmsg", errmsg), mlog.Field("ok", success), mlog.Field("duration", time.Since(start)))
}()
dialctx, dialcancel := context.WithTimeout(context.Background(), 30*time.Second)
defer dialcancel()
addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
conn, _, _, err := dialHost(dialctx, qlog, resolver, dialer, dns.IPDomain{Domain: transport.DNSHost}, port, &m)
var result string
switch {
case err == nil:
result = "ok"
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
result = "timeout"
case errors.Is(err, context.Canceled):
result = "canceled"
default:
result = "error"
}
metricConnection.WithLabelValues(result).Inc()
if err != nil {
if conn != nil {
err := conn.Close()
qlog.Check(err, "closing connection")
}
qlog.Errorx("dialing for submission", err, mlog.Field("remote", addr))
errmsg = fmt.Sprintf("transport %s: dialing %s for submission: %v", transportName, addr, err)
fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
return
}
dialcancel()
var auth []sasl.Client
if transport.Auth != nil {
a := transport.Auth
for _, mech := range a.EffectiveMechanisms {
switch mech {
case "PLAIN":
auth = append(auth, sasl.NewClientPlain(a.Username, a.Password))
case "CRAM-MD5":
auth = append(auth, sasl.NewClientCRAMMD5(a.Username, a.Password))
case "SCRAM-SHA-1":
auth = append(auth, sasl.NewClientSCRAMSHA1(a.Username, a.Password))
case "SCRAM-SHA-256":
auth = append(auth, sasl.NewClientSCRAMSHA256(a.Username, a.Password))
default:
// Should not happen.
qlog.Error("missing smtp authentication mechanisms implementation", mlog.Field("mechanism", mech))
errmsg = fmt.Sprintf("transport %s: authentication mechanisms %q not implemented", transportName, mech)
fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
return
}
}
}
clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
defer clientcancel()
client, err := smtpclient.New(clientctx, qlog, conn, tlsMode, mox.Conf.Static.HostnameDomain, transport.DNSHost, auth)
if err != nil {
smtperr, ok := err.(smtpclient.Error)
var remoteMTA dsn.NameIP
if ok {
remoteMTA.Name = transport.Host
}
qlog.Errorx("establishing smtp session for submission", err, mlog.Field("remote", addr))
errmsg = fmt.Sprintf("transport %s: establishing smtp session with %s for submission: %v", transportName, addr, err)
secodeOpt = smtperr.Secode
fail(qlog, m, backoff, false, remoteMTA, secodeOpt, errmsg)
return
}
defer func() {
err := client.Close()
qlog.Check(err, "closing smtp client after delivery")
}()
clientcancel()
var msgr io.ReadCloser
var size int64
var req8bit, reqsmtputf8 bool
if len(m.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
msgr = io.NopCloser(bytes.NewReader(m.DSNUTF8))
reqsmtputf8 = true
size = int64(len(m.DSNUTF8))
} else {
req8bit = m.Has8bit // todo: not require this, but just try to submit?
size = m.Size
p := m.MessagePath()
f, err := os.Open(p)
if err != nil {
qlog.Errorx("opening message for delivery", err, mlog.Field("remote", addr), mlog.Field("path", p))
errmsg = fmt.Sprintf("transport %s: opening message file for submission: %v", transportName, err)
fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
return
}
msgr = store.FileMsgReader(m.MsgPrefix, f)
defer func() {
err := msgr.Close()
qlog.Check(err, "closing message after delivery attempt")
}()
}
deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
defer delivercancel()
err = client.Deliver(deliverctx, m.Sender().String(), m.Recipient().String(), size, msgr, req8bit, reqsmtputf8)
if err != nil {
qlog.Infox("delivery failed", err)
}
var cerr smtpclient.Error
switch {
case err == nil:
deliveryResult = "ok"
success = true
case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
deliveryResult = "timeout"
case errors.Is(err, context.Canceled):
deliveryResult = "canceled"
case errors.As(err, &cerr):
deliveryResult = "temperror"
if cerr.Permanent {
deliveryResult = "permerror"
}
default:
deliveryResult = "error"
}
if err != nil {
smtperr, ok := err.(smtpclient.Error)
var remoteMTA dsn.NameIP
if ok {
remoteMTA.Name = transport.Host
}
qlog.Errorx("submitting email", err, mlog.Field("remote", addr))
permanent = smtperr.Permanent
secodeOpt = smtperr.Secode
errmsg = fmt.Sprintf("transport %s: submitting email to %s: %v", transportName, addr, err)
fail(qlog, m, backoff, permanent, remoteMTA, secodeOpt, errmsg)
return
}
qlog.Info("delivered from queue with transport")
if err := queueDelete(context.Background(), m.ID); err != nil {
qlog.Errorx("deleting message from queue after delivery", err)
}
}