add "mox localserve" subcommand, for running mox locally for email-related testing/developing

localserve creates a config for listening on localhost for
imap/smtp/submission/http, on port numbers 1000 + the common service port
numbers. all incoming email is accepted (if checks pass), and a few pattern in
localparts are recognized and result in delivery errors.
This commit is contained in:
Mechiel Lukkien
2023-03-12 10:38:02 +01:00
parent bddc8e4062
commit 0099197d00
12 changed files with 694 additions and 108 deletions

View File

@ -19,6 +19,7 @@ import (
"net"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
@ -61,6 +62,10 @@ var xlog = mlog.New("smtpserver")
// These errors signal the connection must be closed.
var errIO = errors.New("fatal io error")
// If set, regular delivery/submit is sidestepped, email is accepted and
// delivered to the account named mox.
var Localserve bool
var limiterConnectionRate, limiterConnections *ratelimit.Limiter
// For delivery rate limiting. Variable because changed during tests.
@ -1287,6 +1292,11 @@ func (c *conn) cmdMail(p *parser) {
c.log.Info("delivery from address without domain", mlog.Field("mailfrom", rpath.String()))
xsmtpUserErrorf(smtp.C550MailboxUnavail, smtp.SePol7Other0, "domain name required")
}
if Localserve && strings.HasPrefix(string(rpath.Localpart), "mailfrom") {
c.xlocalserveError(rpath.Localpart)
}
c.mailFrom = &rpath
c.bwritecodeline(smtp.C250Completed, smtp.SeAddr1Other0, "looking good", nil)
@ -1347,7 +1357,7 @@ func (c *conn) cmdRcpt(p *parser) {
// ../rfc/5321:3598
// ../rfc/5321:4045
// Also see ../rfc/7489:2214
if !c.submission && len(c.recipients) == 1 {
if !c.submission && len(c.recipients) == 1 && !Localserve {
// note: because of check above, mailFrom cannot be the null address.
var pass bool
d := c.mailFrom.IPDomain.Domain
@ -1376,7 +1386,18 @@ func (c *conn) cmdRcpt(p *parser) {
}
}
if len(fpath.IPDomain.IP) > 0 {
if Localserve {
if strings.HasPrefix(string(fpath.Localpart), "rcptto") {
c.xlocalserveError(fpath.Localpart)
}
// If account or destination doesn't exist, it will be handled during delivery. For
// submissions, which is the common case, we'll deliver to the logged in user,
// which is typically the mox user.
acc, _ := mox.Conf.Account("mox")
dest := acc.Destinations["mox@localhost"]
c.recipients = append(c.recipients, rcptAccount{fpath, true, "mox", dest, "mox@localhost"})
} else if len(fpath.IPDomain.IP) > 0 {
if !c.submission {
xsmtpUserErrorf(smtp.C550MailboxUnavail, smtp.SeAddr1UnknownDestMailbox1, "not accepting email for ip")
}
@ -1674,23 +1695,95 @@ func (c *conn) submit(ctx context.Context, recvHdrFor func(string) string, msgWr
}
msgPrefix = append(msgPrefix, []byte(authResults.Header())...)
for i, rcptAcc := range c.recipients {
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
if !msgWriter.HaveHeaders {
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
if Localserve {
var timeout bool
c.account.WithWLock(func() {
for i, rcptAcc := range c.recipients {
var code int
code, timeout = localserveNeedsError(rcptAcc.rcptTo.Localpart)
if timeout {
// Get out of wlock, and sleep there.
return
} else if code != 0 {
c.log.Info("failure due to special localpart", mlog.Field("code", code))
xsmtpServerErrorf(codes{code, smtp.SeOther00}, "failure with code %d due to special localpart", code)
}
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
if !msgWriter.HaveHeaders {
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
}
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
ipmasked1, ipmasked2, ipmasked3 := ipmasked(c.remoteIP)
m := store.Message{
Received: time.Now(),
RemoteIP: c.remoteIP.String(),
RemoteIPMasked1: ipmasked1,
RemoteIPMasked2: ipmasked2,
RemoteIPMasked3: ipmasked3,
EHLODomain: c.hello.Domain.Name(),
MailFrom: c.mailFrom.String(),
MailFromLocalpart: c.mailFrom.Localpart,
MailFromDomain: c.mailFrom.IPDomain.Domain.Name(),
RcptToLocalpart: rcptAcc.rcptTo.Localpart,
RcptToDomain: rcptAcc.rcptTo.IPDomain.Domain.Name(),
MsgFromLocalpart: msgFrom.Localpart,
MsgFromDomain: msgFrom.Domain.Name(),
MsgFromOrgDomain: publicsuffix.Lookup(ctx, msgFrom.Domain).Name(),
EHLOValidated: true,
MailFromValidated: true,
MsgFromValidated: true,
EHLOValidation: store.ValidationPass,
MailFromValidation: store.ValidationRelaxed,
MsgFromValidation: store.ValidationRelaxed,
DKIMDomains: nil,
Size: msgSize,
MsgPrefix: xmsgPrefix,
}
if err := c.account.Deliver(c.log, rcptAcc.destination, &m, dataFile, i == len(c.recipients)-1); err != nil {
// Aborting the transaction is not great. But continuing and generating DSNs will
// probably result in errors as well...
metricSubmission.WithLabelValues("localserveerror").Inc()
c.log.Errorx("delivering message", err)
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
}
metricSubmission.WithLabelValues("ok").Inc()
c.log.Info("submitted message delivered", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
}
})
if timeout {
c.log.Info("timing out submission due to special localpart")
mox.Sleep(mox.Context, time.Hour)
xsmtpServerErrorf(codes{smtp.C451LocalErr, smtp.SeSys3Other0}, "timing out submission due to special localpart")
}
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
if err := queue.Add(c.log, c.account.Name, *c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, xmsgPrefix, dataFile, nil, i == len(c.recipients)-1); err != nil {
// Aborting the transaction is not great. But continuing and generating DSNs will
// probably result in errors as well...
metricSubmission.WithLabelValues("queueerror").Inc()
c.log.Errorx("queuing message", err)
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
} else {
// We always deliver through the queue. It would be more efficient to deliver
// directly, but we don't want to circumvent all the anti-spam measures. Accounts
// on a single mox instance should be allowed to block each other.
for i, rcptAcc := range c.recipients {
xmsgPrefix := append([]byte(recvHdrFor(rcptAcc.rcptTo.String())), msgPrefix...)
// todo: don't convert the headers to a body? it seems the body part is optional. does this have consequences for us in other places? ../rfc/5322:343
if !msgWriter.HaveHeaders {
xmsgPrefix = append(xmsgPrefix, "\r\n"...)
}
msgSize := int64(len(xmsgPrefix)) + msgWriter.Size
if err := queue.Add(c.log, c.account.Name, *c.mailFrom, rcptAcc.rcptTo, msgWriter.Has8bit, c.smtputf8, msgSize, xmsgPrefix, dataFile, nil, i == len(c.recipients)-1); err != nil {
// Aborting the transaction is not great. But continuing and generating DSNs will
// probably result in errors as well...
metricSubmission.WithLabelValues("queueerror").Inc()
c.log.Errorx("queuing message", err)
xsmtpServerErrorf(errCodes(smtp.C451LocalErr, smtp.SeSys3Other0, err), "error delivering message: %v", err)
}
metricSubmission.WithLabelValues("ok").Inc()
c.log.Info("message queued for delivery", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
}
metricSubmission.WithLabelValues("ok").Inc()
c.log.Info("message queued for delivery", mlog.Field("mailfrom", *c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo), mlog.Field("smtputf8", c.smtputf8), mlog.Field("msgsize", msgSize))
}
err = dataFile.Close()
c.log.Check(err, "closing file after submission")
@ -1703,6 +1796,55 @@ func (c *conn) submit(ctx context.Context, recvHdrFor func(string) string, msgWr
c.writecodeline(smtp.C250Completed, smtp.SeMailbox2Other0, "it is done", nil)
}
func ipmasked(ip net.IP) (string, string, string) {
if ip.To4() != nil {
m1 := ip.String()
m2 := ip.Mask(net.CIDRMask(26, 32)).String()
m3 := ip.Mask(net.CIDRMask(21, 32)).String()
return m1, m2, m3
}
m1 := ip.Mask(net.CIDRMask(64, 128)).String()
m2 := ip.Mask(net.CIDRMask(48, 128)).String()
m3 := ip.Mask(net.CIDRMask(32, 128)).String()
return m1, m2, m3
}
func localserveNeedsError(lp smtp.Localpart) (code int, timeout bool) {
s := string(lp)
if strings.HasSuffix(s, "temperror") {
return smtp.C451LocalErr, false
} else if strings.HasSuffix(s, "permerror") {
return smtp.C550MailboxUnavail, false
} else if strings.HasSuffix(s, "timeout") {
return 0, true
}
if len(s) < 3 {
return 0, false
}
s = s[len(s)-3:]
v, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return 0, false
}
if v < 400 || v > 600 {
return 0, false
}
return int(v), false
}
func (c *conn) xlocalserveError(lp smtp.Localpart) {
code, timeout := localserveNeedsError(lp)
if timeout {
c.log.Info("timing out due to special localpart")
mox.Sleep(mox.Context, time.Hour)
xsmtpServerErrorf(codes{smtp.C451LocalErr, smtp.SeSys3Other0}, "timing out command due to special localpart")
} else if code != 0 {
c.log.Info("failure due to special localpart", mlog.Field("code", code))
metricDelivery.WithLabelValues("delivererror", "localserve").Inc()
xsmtpServerErrorf(codes{code, smtp.SeOther00}, "failure with code %d due to special localpart", code)
}
}
// deliver is called for incoming messages from external, typically untrusted
// sources. i.e. not submitted by authenticated users.
func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgWriter *message.Writer, iprevStatus iprev.Status, pdataFile **os.File) {
@ -1968,16 +2110,7 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
c.log.Debug("dmarc verification", mlog.Field("result", dmarcResult.Status), mlog.Field("domain", msgFrom.Domain))
// Prepare for analyzing content, calculating reputation.
var ipmasked1, ipmasked2, ipmasked3 string
if c.remoteIP.To4() != nil {
ipmasked1 = c.remoteIP.String()
ipmasked2 = c.remoteIP.Mask(net.CIDRMask(26, 32)).String()
ipmasked3 = c.remoteIP.Mask(net.CIDRMask(21, 32)).String()
} else {
ipmasked1 = c.remoteIP.Mask(net.CIDRMask(64, 128)).String()
ipmasked2 = c.remoteIP.Mask(net.CIDRMask(48, 128)).String()
ipmasked3 = c.remoteIP.Mask(net.CIDRMask(32, 128)).String()
}
ipmasked1, ipmasked2, ipmasked3 := ipmasked(c.remoteIP)
var verifiedDKIMDomains []string
for _, r := range dkimResults {
// A message can have multiple signatures for the same identity. For example when
@ -2019,7 +2152,6 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
// For each recipient, do final spam analysis and delivery.
for _, rcptAcc := range c.recipients {
log := c.log.Fields(mlog.Field("mailfrom", c.mailFrom), mlog.Field("rcptto", rcptAcc.rcptTo))
// If this is not a valid local user, we send back a DSN. This can only happen when
@ -2238,34 +2370,47 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
mox.Sleep(mox.Context, reputationlessSenderDeliveryDelay)
}
acc.WithWLock(func() {
// Gather the message-id before we deliver and the file may be consumed.
if !parsedMessageID {
if p, err := message.Parse(store.FileMsgReader(m.MsgPrefix, dataFile)); err != nil {
log.Infox("parsing message for message-id", err)
} else if header, err := p.Header(); err != nil {
log.Infox("parsing message header for message-id", err)
} else {
messageID = header.Get("Message-Id")
}
// Gather the message-id before we deliver and the file may be consumed.
if !parsedMessageID {
if p, err := message.Parse(store.FileMsgReader(m.MsgPrefix, dataFile)); err != nil {
log.Infox("parsing message for message-id", err)
} else if header, err := p.Header(); err != nil {
log.Infox("parsing message header for message-id", err)
} else {
messageID = header.Get("Message-Id")
}
}
if err := acc.Deliver(log, rcptAcc.destination, m, dataFile, false); err != nil {
log.Errorx("delivering", err)
metricDelivery.WithLabelValues("delivererror", a.reason).Inc()
addError(rcptAcc, smtp.C451LocalErr, smtp.SeSys3Other0, false, "error processing")
return
if Localserve {
code, timeout := localserveNeedsError(rcptAcc.rcptTo.Localpart)
if timeout {
c.log.Info("timing out due to special localpart")
mox.Sleep(mox.Context, time.Hour)
xsmtpServerErrorf(codes{smtp.C451LocalErr, smtp.SeOther00}, "timing out delivery due to special localpart")
} else if code != 0 {
c.log.Info("failure due to special localpart", mlog.Field("code", code))
metricDelivery.WithLabelValues("delivererror", "localserve").Inc()
addError(rcptAcc, code, smtp.SeOther00, false, fmt.Sprintf("failure with code %d due to special localpart", code))
}
metricDelivery.WithLabelValues("delivered", a.reason).Inc()
log.Info("incoming message delivered", mlog.Field("reason", a.reason))
conf, _ := acc.Conf()
if conf.RejectsMailbox != "" && messageID != "" {
if err := acc.RejectsRemove(log, conf.RejectsMailbox, messageID); err != nil {
log.Errorx("removing message from rejects mailbox", err, mlog.Field("messageid", messageID))
} else {
acc.WithWLock(func() {
if err := acc.Deliver(log, rcptAcc.destination, m, dataFile, false); err != nil {
log.Errorx("delivering", err)
metricDelivery.WithLabelValues("delivererror", a.reason).Inc()
addError(rcptAcc, smtp.C451LocalErr, smtp.SeSys3Other0, false, "error processing")
return
}
}
})
metricDelivery.WithLabelValues("delivered", a.reason).Inc()
log.Info("incoming message delivered", mlog.Field("reason", a.reason))
conf, _ := acc.Conf()
if conf.RejectsMailbox != "" && messageID != "" {
if err := acc.RejectsRemove(log, conf.RejectsMailbox, messageID); err != nil {
log.Errorx("removing message from rejects mailbox", err, mlog.Field("messageid", messageID))
}
}
})
}
err = acc.Close()
log.Check(err, "closing account after delivering")
@ -2347,7 +2492,9 @@ func (c *conn) deliver(ctx context.Context, recvHdrFor func(string) string, msgW
}
dsnMsg.Original = header
if err := queueDSN(c, *c.mailFrom, dsnMsg); err != nil {
if Localserve {
c.log.Error("not queueing dsn for incoming delivery due to localserve")
} else if err := queueDSN(c, *c.mailFrom, dsnMsg); err != nil {
metricServerErrors.WithLabelValues("queuedsn").Inc()
c.log.Errorx("queuing DSN for incoming delivery, no DSN sent", err)
}