switch to slog.Logger for logging, for easier reuse of packages by external software

we don't want external software to include internal details like mlog.
slog.Logger is/will be the standard.

we still have mlog for its helper functions, and its handler that logs in
concise logfmt used by mox.

packages that are not meant for reuse still pass around mlog.Log for
convenience.

we use golang.org/x/exp/slog because we also support the previous Go toolchain
version. with the next Go release, we'll switch to the builtin slog.
This commit is contained in:
Mechiel Lukkien
2023-12-05 13:35:58 +01:00
parent 56b2a9d980
commit 5b20cba50a
150 changed files with 5176 additions and 1898 deletions

View File

@ -15,6 +15,7 @@ import (
"strings"
"time"
"golang.org/x/exp/slog"
"golang.org/x/net/proxy"
"github.com/prometheus/client_golang/prometheus"
@ -36,8 +37,6 @@ import (
"github.com/mjl-/mox/tlsrptdb"
)
var xlog = mlog.New("queue")
var (
metricConnection = promauto.NewCounterVec(
prometheus.CounterOpts{
@ -163,7 +162,9 @@ func Init() error {
// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
func Shutdown() {
err := DB.Close()
xlog.Check(err, "closing queue db")
if err != nil {
mlog.New("queue", nil).Errorx("closing queue db", err)
}
DB = nil
}
@ -221,7 +222,7 @@ func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf
//
// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
// such as Queued, NextAttempt, LastAttempt, LastError.
func Add(ctx context.Context, log *mlog.Log, qm *Msg, msgFile *os.File) error {
func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
if qm.ID != 0 {
@ -238,7 +239,7 @@ func Add(ctx context.Context, log *mlog.Log, qm *Msg, msgFile *os.File) error {
if qm.SenderAccount == "" {
return fmt.Errorf("cannot queue with localserve without local account")
}
acc, err := store.OpenAccount(qm.SenderAccount)
acc, err := store.OpenAccount(log, qm.SenderAccount)
if err != nil {
return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
}
@ -279,14 +280,14 @@ func Add(ctx context.Context, log *mlog.Log, qm *Msg, msgFile *os.File) error {
defer func() {
if dst != "" {
err := os.Remove(dst)
log.Check(err, "removing destination message file for queue", mlog.Field("path", dst))
log.Check(err, "removing destination message file for queue", slog.String("path", dst))
}
}()
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(dstDir); err != nil {
} else if err := moxio.SyncDir(log, dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)
}
@ -359,7 +360,7 @@ func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *
// Drop removes messages from the queue that match all nonzero parameters.
// If all parameters are zero, all messages are removed.
// Returns number of messages removed.
func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
func Drop(ctx context.Context, log mlog.Log, ID int64, toDomain string, recipient string) (int, error) {
q := bstore.QueryDB[Msg](ctx, DB)
if ID > 0 {
q.FilterID(ID)
@ -381,7 +382,7 @@ func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int
for _, m := range msgs {
p := m.MessagePath()
if err := os.Remove(p); err != nil {
xlog.WithContext(ctx).Errorx("removing queue message from file system", err, mlog.Field("queuemsgid", m.ID), mlog.Field("path", p))
log.Errorx("removing queue message from file system", err, slog.Int64("queuemsgid", m.ID), slog.String("path", p))
}
}
return n, nil
@ -427,6 +428,8 @@ func Start(resolver dns.Resolver, done chan struct{}) error {
return err
}
log := mlog.New("queue", nil)
// High-level delivery strategy advice: ../rfc/5321:3685
go func() {
// Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
@ -449,14 +452,14 @@ func Start(resolver dns.Resolver, done chan struct{}) error {
continue
}
launchWork(resolver, busyDomains)
timer.Reset(nextWork(mox.Shutdown, busyDomains))
launchWork(log, resolver, busyDomains)
timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
}
}()
return nil
}
func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duration {
func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
q := bstore.QueryDB[Msg](ctx, DB)
if len(busyDomains) > 0 {
var doms []any
@ -471,13 +474,13 @@ func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duratio
if err == bstore.ErrAbsent {
return 24 * time.Hour
} else if err != nil {
xlog.Errorx("finding time for next delivery attempt", err)
log.Errorx("finding time for next delivery attempt", err)
return 1 * time.Minute
}
return time.Until(qm.NextAttempt)
}
func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
q := bstore.QueryDB[Msg](mox.Shutdown, DB)
q.FilterLessEqual("NextAttempt", time.Now())
q.SortAsc("NextAttempt")
@ -491,14 +494,14 @@ func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
}
msgs, err := q.List()
if err != nil {
xlog.Errorx("querying for work in queue", err)
log.Errorx("querying for work in queue", err)
mox.Sleep(mox.Shutdown, 1*time.Second)
return -1
}
for _, m := range msgs {
busyDomains[formatIPDomain(m.RecipientDomain)] = struct{}{}
go deliver(resolver, m)
go deliver(log, resolver, m)
}
return len(msgs)
}
@ -521,16 +524,15 @@ func queueDelete(ctx context.Context, msgID int64) error {
// deliver attempts to deliver a message.
// The queue is updated, either by removing a delivered or permanently failed
// message, or updating the time for the next attempt. A DSN may be sent.
func deliver(resolver dns.Resolver, m Msg) {
cid := mox.Cid()
qlog := xlog.WithCid(cid).Fields(mlog.Field("from", m.Sender()), mlog.Field("recipient", m.Recipient()), mlog.Field("attempts", m.Attempts), mlog.Field("msgid", m.ID))
func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
qlog := log.WithCid(mox.Cid()).With(slog.Any("from", m.Sender()), slog.Any("recipient", m.Recipient()), slog.Int("attempts", m.Attempts), slog.Int64("msgid", m.ID))
defer func() {
deliveryResult <- formatIPDomain(m.RecipientDomain)
x := recover()
if x != nil {
qlog.Error("deliver panic", mlog.Field("panic", x))
qlog.Error("deliver panic", slog.Any("panic", x))
debug.PrintStack()
metrics.PanicInc(metrics.Queue)
}
@ -578,8 +580,8 @@ func deliver(resolver dns.Resolver, m Msg) {
}
if transportName != "" {
qlog = qlog.Fields(mlog.Field("transport", transportName))
qlog.Debug("delivering with transport", mlog.Field("transport", transportName))
qlog = qlog.With(slog.String("transport", transportName))
qlog.Debug("delivering with transport")
}
// We gather TLS connection successes and failures during delivery, and we store
@ -630,7 +632,7 @@ func deliver(resolver dns.Resolver, m Msg) {
// Ensure we store policy domain in unicode in database.
policyDomain, err := dns.ParseDomain(r.Policy.Domain)
if err != nil {
qlog.Errorx("parsing policy domain for tls result", err, mlog.Field("policydomain", r.Policy.Domain))
qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
return
}
@ -667,12 +669,12 @@ func deliver(resolver dns.Resolver, m Msg) {
var dialer smtpclient.Dialer = &net.Dialer{}
if transport.Submissions != nil {
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
} else if transport.Submission != nil {
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
} else if transport.SMTP != nil {
// todo future: perhaps also gather tlsrpt results for submissions.
deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
} else {
ourHostname := mox.Conf.Static.HostnameDomain
if transport.Socks != nil {
@ -688,7 +690,7 @@ func deliver(resolver dns.Resolver, m Msg) {
}
ourHostname = transport.Socks.Hostname
}
recipientDomainResult, hostResults = deliverDirect(cid, qlog, resolver, dialer, ourHostname, transportName, m, backoff)
recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, m, backoff)
}
}