implement outgoing dmarc aggregate reporting

in smtpserver, we store dmarc evaluations (under the right conditions).
in dmarcdb, we periodically (hourly) send dmarc reports if there are
evaluations. for failed deliveries, we deliver the dsn quietly to a submailbox
of the postmaster mailbox.

this is on by default, but can be disabled in mox.conf.
This commit is contained in:
Mechiel Lukkien
2023-11-01 17:55:40 +01:00
parent d1e93020d8
commit e7699708ef
40 changed files with 2689 additions and 245 deletions

View File

@ -92,7 +92,7 @@ func fail(qlog *mlog.Log, m Msg, backoff time.Duration, permanent bool, remoteMT
// todo future: when we implement relaying, and a dsn cannot be delivered, and requiretls was active, we cannot drop the message. instead deliver to local postmaster? though ../rfc/8689:383 may intend to say the dsn should be delivered without requiretls?
// todo future: when we implement smtp dsn extension, parameter RET=FULL must be disregarded for messages with REQUIRETLS. ../rfc/8689:379
if permanent || m.Attempts >= 8 {
if permanent || m.MaxAttempts == 0 && m.Attempts >= 8 || m.MaxAttempts > 0 && m.Attempts >= m.MaxAttempts {
qlog.Errorx("permanent failure delivering from queue", errors.New(errmsg))
deliverDSNFailure(qlog, m, remoteMTA, secodeOpt, errmsg)
@ -230,12 +230,13 @@ func deliverDirect(cid int64, qlog *mlog.Log, resolver dns.Resolver, dialer smtp
enforceMTASTS := policy != nil && policy.Mode == mtasts.ModeEnforce
permanent, daneRequired, badTLS, secodeOpt, remoteIP, errmsg, ok = deliverHost(nqlog, resolver, dialer, cid, ourHostname, transportName, h, enforceMTASTS, haveMX, origNextHopAuthentic, origNextHop, expandedNextHopAuthentic, expandedNextHop, &m, tlsMode)
// If we had a TLS-related failure when doing TLS, and we don't have a requirement for MTA-STS/DANE,
// we try again without TLS. This could be an old
// server that only does ancient TLS versions, or has a misconfiguration. Note that
// If we had a TLS-related failure when doing TLS, and we don't have a requirement
// for MTA-STS/DANE, we try again without TLS. This could be an old server that
// only does ancient TLS versions, or has a misconfiguration. Note that
// opportunistic TLS does not do regular certificate verification, so that can't be
// the problem.
if !ok && badTLS && (!enforceMTASTS && tlsMode == smtpclient.TLSOpportunistic && !daneRequired || m.RequireTLS != nil && !*m.RequireTLS) {
// We don't fall back to plain text for DMARC reports. ../rfc/7489:1768 ../rfc/7489:2683
if !ok && badTLS && (!enforceMTASTS && tlsMode == smtpclient.TLSOpportunistic && !daneRequired && !m.IsDMARCReport || m.RequireTLS != nil && !*m.RequireTLS) {
metricPlaintextFallback.Inc()
if m.RequireTLS != nil && !*m.RequireTLS {
metricTLSRequiredNoIgnored.WithLabelValues("badtls").Inc()

View File

@ -6,6 +6,9 @@ import (
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/mjl-/mox/dns"
"github.com/mjl-/mox/dsn"
"github.com/mjl-/mox/message"
@ -15,6 +18,15 @@ import (
"github.com/mjl-/mox/store"
)
var (
metricDMARCReportFailure = promauto.NewCounter(
prometheus.CounterOpts{
Name: "mox_queue_dmarcreport_failure_total",
Help: "Permanent failures to deliver a DMARC report.",
},
)
)
func deliverDSNFailure(log *mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg string) {
const subject = "mail delivery failed"
message := fmt.Sprintf(`
@ -33,6 +45,12 @@ Error during the last delivery attempt:
}
func deliverDSNDelay(log *mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg string, retryUntil time.Time) {
// Should not happen, but doesn't hurt to prevent sending delayed delivery
// notifications for DMARC reports. We don't want to waste postmaster attention.
if m.IsDMARCReport {
return
}
const subject = "mail delivery delayed"
message := fmt.Sprintf(`
Delivery has been delayed of your email to:
@ -133,7 +151,12 @@ func deliverDSN(log *mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg st
msgData = append(msgData, []byte("Return-Path: <"+dsnMsg.From.XString(m.SMTPUTF8)+">\r\n")...)
mailbox := "Inbox"
acc, err := store.OpenAccount(m.SenderAccount)
senderAccount := m.SenderAccount
if m.IsDMARCReport {
// senderAccount should already by postmaster, but doesn't hurt to ensure it.
senderAccount = mox.Conf.Static.Postmaster.Account
}
acc, err := store.OpenAccount(senderAccount)
if err != nil {
acc, err = store.OpenAccount(mox.Conf.Static.Postmaster.Account)
if err != nil {
@ -171,6 +194,17 @@ func deliverDSN(log *mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg st
Size: msgWriter.Size,
MsgPrefix: []byte{},
}
// If this is a DMARC report, deliver it as seen message to a submailbox of the
// postmaster mailbox. We mark it as seen so it doesn't waste postmaster attention,
// but we deliver them so they can be checked in case of problems.
if m.IsDMARCReport {
mailbox = fmt.Sprintf("%s/dmarc", mox.Conf.Static.Postmaster.Mailbox)
msg.Seen = true
metricDMARCReportFailure.Inc()
log.Info("delivering dsn for failure to deliver outgoing dmarc report")
}
acc.WithWLock(func() {
if err := acc.DeliverMailbox(log, mailbox, msg, msgFile); err != nil {
qlog("delivering dsn to mailbox", err)

View File

@ -70,6 +70,9 @@ var DB *bstore.DB // Exported for making backups.
var Localserve bool
// Msg is a message in the queue.
//
// Use MakeMsg to make a message with fields that Add needs. Add will further set
// queueing related fields.
type Msg struct {
ID int64
Queued time.Time `bstore:"default now"`
@ -80,15 +83,19 @@ type Msg struct {
RecipientDomain dns.IPDomain
RecipientDomainStr string // For filtering.
Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
NextAttempt time.Time // For scheduling.
LastAttempt *time.Time
LastError string
Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
Size int64 // Full size of message, combined MsgPrefix with contents of message file.
MessageID string // Used when composing a DSN, in its References header.
MsgPrefix []byte
Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
IsTLSReport bool // Delivery failures for TLS reports are handled differently.
Size int64 // Full size of message, combined MsgPrefix with contents of message file.
MessageID string // Used when composing a DSN, in its References header.
MsgPrefix []byte
// If set, this message is a DSN and this is a version using utf-8, for the case
// the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
@ -188,44 +195,71 @@ func Count(ctx context.Context) (int, error) {
return bstore.QueryDB[Msg](ctx, DB).Count()
}
// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
return Msg{
SenderAccount: mox.Conf.Static.Postmaster.Account,
SenderLocalpart: sender.Localpart,
SenderDomain: sender.IPDomain,
RecipientLocalpart: recipient.Localpart,
RecipientDomain: recipient.IPDomain,
Has8bit: has8bit,
SMTPUTF8: smtputf8,
Size: size,
MessageID: messageID,
MsgPrefix: prefix,
RequireTLS: requireTLS,
}
}
// Add a new message to the queue. The queue is kicked immediately to start a
// first delivery attempt.
//
// dnsutf8Opt is a utf8-version of the message, to be used only for DNSs. If set,
// this data is used as the message when delivering the DSN and the remote SMTP
// server supports SMTPUTF8. If the remote SMTP server does not support SMTPUTF8,
// the regular non-utf8 message is delivered.
func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, requireTLS *bool) (int64, error) {
// ID must be 0 and will be set after inserting in the queue.
//
// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
// such as Queued, NextAttempt, LastAttempt, LastError.
func Add(ctx context.Context, log *mlog.Log, qm *Msg, msgFile *os.File) error {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
if qm.ID != 0 {
return fmt.Errorf("id of queued message must be 0")
}
qm.Queued = time.Now()
qm.DialedIPs = nil
qm.NextAttempt = qm.Queued
qm.LastAttempt = nil
qm.LastError = ""
qm.RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
if Localserve {
if senderAccount == "" {
return 0, fmt.Errorf("cannot queue with localserve without local account")
if qm.SenderAccount == "" {
return fmt.Errorf("cannot queue with localserve without local account")
}
acc, err := store.OpenAccount(senderAccount)
acc, err := store.OpenAccount(qm.SenderAccount)
if err != nil {
return 0, fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
}
defer func() {
err := acc.Close()
log.Check(err, "closing account")
}()
m := store.Message{Size: size, MsgPrefix: msgPrefix}
m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
conf, _ := acc.Conf()
dest := conf.Destinations[mailFrom.String()]
dest := conf.Destinations[qm.Sender().String()]
acc.WithWLock(func() {
err = acc.DeliverDestination(log, dest, &m, msgFile)
})
if err != nil {
return 0, fmt.Errorf("delivering message: %v", err)
return fmt.Errorf("delivering message: %v", err)
}
log.Debug("immediately delivered from queue to sender")
return 0, nil
return nil
}
tx, err := DB.Begin(ctx, true)
if err != nil {
return 0, fmt.Errorf("begin transaction: %w", err)
return fmt.Errorf("begin transaction: %w", err)
}
defer func() {
if tx != nil {
@ -235,11 +269,8 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
}
}()
now := time.Now()
qm := Msg{0, now, senderAccount, mailFrom.Localpart, mailFrom.IPDomain, rcptTo.Localpart, rcptTo.IPDomain, formatIPDomain(rcptTo.IPDomain), 0, nil, now, nil, "", has8bit, smtputf8, size, messageID, msgPrefix, dsnutf8Opt, "", requireTLS}
if err := tx.Insert(&qm); err != nil {
return 0, err
if err := tx.Insert(qm); err != nil {
return err
}
dst := qm.MessagePath()
@ -252,19 +283,19 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return 0, fmt.Errorf("linking/copying message to new file: %s", err)
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(dstDir); err != nil {
return 0, fmt.Errorf("sync directory: %v", err)
return fmt.Errorf("sync directory: %v", err)
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit transaction: %s", err)
return fmt.Errorf("commit transaction: %s", err)
}
tx = nil
dst = ""
queuekick()
return qm.ID, nil
return nil
}
func formatIPDomain(d dns.IPDomain) string {

View File

@ -110,10 +110,14 @@ func TestQueue(t *testing.T) {
defer os.Remove(mf.Name())
defer mf.Close()
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
var qm Msg
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
msgs, err = List(ctxbg)
@ -440,7 +444,8 @@ func TestQueue(t *testing.T) {
// Add a message to be delivered with submit because of its route.
topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
_, err = Add(ctxbg, xlog, "mjl", path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
wasNetDialer = testDeliver(fakeSubmitServer)
if !wasNetDialer {
@ -448,10 +453,11 @@ func TestQueue(t *testing.T) {
}
// Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
msgID, err := Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
transportSubmitTLS := "submittls"
n, err = Kick(ctxbg, msgID, "", "", &transportSubmitTLS)
n, err = Kick(ctxbg, qm.ID, "", "", &transportSubmitTLS)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -472,10 +478,11 @@ func TestQueue(t *testing.T) {
}
// Add a message to be delivered with socks.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
transportSocks := "socks"
n, err = Kick(ctxbg, msgID, "", "", &transportSocks)
n, err = Kick(ctxbg, qm.ID, "", "", &transportSocks)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -486,9 +493,10 @@ func TestQueue(t *testing.T) {
}
// Add message to be delivered with opportunistic TLS verification.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -496,9 +504,10 @@ func TestQueue(t *testing.T) {
testDeliver(fakeSMTPSTARTTLSServer)
// Test fallback to plain text with TLS handshake fails.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -512,9 +521,10 @@ func TestQueue(t *testing.T) {
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
},
}
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -530,9 +540,10 @@ func TestQueue(t *testing.T) {
// Add message to be delivered with verified TLS and REQUIRETLS.
yes := true
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, mf, nil, &yes)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -545,9 +556,10 @@ func TestQueue(t *testing.T) {
{},
},
}
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -562,9 +574,10 @@ func TestQueue(t *testing.T) {
{Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
},
}
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -581,9 +594,10 @@ func TestQueue(t *testing.T) {
// Check that message is delivered with TLS-Required: No and non-matching DANE record.
no := false
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, mf, nil, &no)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -591,9 +605,10 @@ func TestQueue(t *testing.T) {
testDeliver(fakeSMTPSTARTTLSServer)
// Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, mf, nil, &no)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -601,9 +616,10 @@ func TestQueue(t *testing.T) {
testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
// Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, mf, nil, &yes)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -615,9 +631,10 @@ func TestQueue(t *testing.T) {
resolver.TLSA = nil
// Add message with requiretls that fails immediately due to no verification policy for recipient domain.
msgID, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, mf, nil, &yes)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
n, err = Kick(ctxbg, msgID, "", "", nil)
n, err = Kick(ctxbg, qm.ID, "", "", nil)
tcheck(t, err, "kick queue")
if n != 1 {
t.Fatalf("kick changed %d messages, expected 1", n)
@ -629,7 +646,8 @@ func TestQueue(t *testing.T) {
})
// Add another message that we'll fail to deliver entirely.
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
qm = MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
msgs, err = List(ctxbg)
@ -788,7 +806,8 @@ func TestQueueStart(t *testing.T) {
mf := prepareFile(t)
defer os.Remove(mf.Name())
defer mf.Close()
_, err = Add(ctxbg, xlog, "mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, mf, nil, nil)
qm := MakeMsg("mjl", path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil)
err = Add(ctxbg, xlog, &qm, mf)
tcheck(t, err, "add message to queue for delivery")
checkDialed(true)