improve queue management

- add option to put messages in the queue "on hold", preventing delivery
  attempts until taken off hold again.
- add "hold rules", to automatically mark some/all submitted messages as "on
  hold", e.g. from a specific account or to a specific domain.
- add operation to "fail" a message, causing a DSN to be delivered to the
  sender. previously we could only drop a message from the queue.
- update admin page & add new cli tools for these operations, with new
  filtering rules for selecting the messages to operate on. in the admin
  interface, add filtering and checkboxes to select a set of messages to operate
  on.
This commit is contained in:
Mechiel Lukkien
2024-03-18 08:50:42 +01:00
parent 79f1054b64
commit 40ade995a5
19 changed files with 2554 additions and 565 deletions

View File

@ -61,12 +61,18 @@ var (
"result", // ok, timeout, canceled, temperror, permerror, error
},
)
metricHold = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "mox_queue_hold",
Help: "Messages in queue that are on hold.",
},
)
)
var jitter = mox.NewPseudoRand()
var DBTypes = []any{Msg{}} // Types stored in DB.
var DB *bstore.DB // Exported for making backups.
var DBTypes = []any{Msg{}, HoldRule{}} // Types stored in DB.
var DB *bstore.DB // Exported for making backups.
// Allow requesting delivery starting from up to this interval from time of submission.
const FutureReleaseIntervalMax = 60 * 24 * time.Hour
@ -74,6 +80,27 @@ const FutureReleaseIntervalMax = 60 * 24 * time.Hour
// Set for mox localserve, to prevent queueing.
var Localserve bool
// HoldRule is a set of conditions that cause a matching message to be marked as on
// hold when it is queued. All-empty conditions matches all messages, effectively
// pausing the entire queue.
type HoldRule struct {
ID int64
Account string
SenderDomain dns.Domain
RecipientDomain dns.Domain
SenderDomainStr string // Unicode.
RecipientDomainStr string // Unicode.
}
func (pr HoldRule) All() bool {
pr.ID = 0
return pr == HoldRule{}
}
func (pr HoldRule) matches(m Msg) bool {
return pr.All() || pr.Account == m.SenderAccount || pr.SenderDomainStr == m.SenderDomainStr || pr.RecipientDomainStr == m.RecipientDomainStr
}
// Msg is a message in the queue.
//
// Use MakeMsg to make a message with fields that Add needs. Add will further set
@ -89,12 +116,14 @@ type Msg struct {
BaseID int64 `bstore:"index"`
Queued time.Time `bstore:"default now"`
Hold bool // If set, delivery won't be attempted.
SenderAccount string // Failures are delivered back to this local account. Also used for routing.
SenderLocalpart smtp.Localpart // Should be a local user and domain.
SenderDomain dns.IPDomain
SenderDomainStr string // For filtering, unicode.
RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
RecipientDomain dns.IPDomain
RecipientDomainStr string // For filtering.
RecipientDomainStr string // For filtering, unicode.
Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
@ -174,9 +203,20 @@ func Init() error {
}
return fmt.Errorf("open queue database: %s", err)
}
metricHoldUpdate()
return nil
}
// When we update the gauge, we just get the full current value, not try to account
// for adds/removes.
func metricHoldUpdate() {
count, err := bstore.QueryDB[Msg](context.Background(), DB).FilterNonzero(Msg{Hold: true}).Count()
if err != nil {
mlog.New("queue", nil).Errorx("querying number of queued messages that are on hold", err)
}
metricHold.Set(float64(count))
}
// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
func Shutdown() {
err := DB.Close()
@ -186,10 +226,85 @@ func Shutdown() {
DB = nil
}
// Filter filters messages to list or operate on. Used by admin web interface
// and cli.
//
// Only non-empty/non-zero values are applied to the filter. Leaving all fields
// empty/zero matches all messages.
type Filter struct {
IDs []int64
Account string
From string
To string
Hold *bool
Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
Transport *string
}
func (f Filter) apply(q *bstore.Query[Msg]) error {
if len(f.IDs) > 0 {
q.FilterIDs(f.IDs)
}
applyTime := func(field string, s string) error {
orig := s
var before bool
if strings.HasPrefix(s, "<") {
before = true
} else if !strings.HasPrefix(s, ">") {
return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
}
s = s[1:]
var t time.Time
if s == "now" {
t = time.Now()
} else if d, err := time.ParseDuration(s); err != nil {
return fmt.Errorf("parsing duration %q: %v", orig, err)
} else {
t = time.Now().Add(d)
}
if before {
q.FilterLess(field, t)
} else {
q.FilterGreater(field, t)
}
return nil
}
if f.Hold != nil {
q.FilterEqual("Hold", *f.Hold)
}
if f.Submitted != "" {
if err := applyTime("Queued", f.Submitted); err != nil {
return fmt.Errorf("applying filter for submitted: %v", err)
}
}
if f.NextAttempt != "" {
if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
return fmt.Errorf("applying filter for next attempt: %v", err)
}
}
if f.Account != "" {
q.FilterNonzero(Msg{SenderAccount: f.Account})
}
if f.Transport != nil {
q.FilterEqual("Transport", *f.Transport)
}
if f.From != "" || f.To != "" {
q.FilterFn(func(m Msg) bool {
return f.From != "" && strings.Contains(m.Sender().XString(true), f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
})
}
return nil
}
// List returns all messages in the delivery queue.
// Ordered by earliest delivery attempt first.
func List(ctx context.Context) ([]Msg, error) {
qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
func List(ctx context.Context, f Filter) ([]Msg, error) {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return nil, err
}
qmsgs, err := q.List()
if err != nil {
return nil, err
}
@ -216,6 +331,59 @@ func Count(ctx context.Context) (int, error) {
return bstore.QueryDB[Msg](ctx, DB).Count()
}
// HoldRuleList returns all hold rules.
func HoldRuleList(ctx context.Context) ([]HoldRule, error) {
return bstore.QueryDB[HoldRule](ctx, DB).List()
}
// HoldRuleAdd adds a new hold rule causing newly submitted messages to be marked
// as "on hold", and existing matching messages too.
func HoldRuleAdd(ctx context.Context, log mlog.Log, hr HoldRule) (HoldRule, error) {
err := DB.Write(ctx, func(tx *bstore.Tx) error {
hr.ID = 0
hr.SenderDomainStr = hr.SenderDomain.Name()
hr.RecipientDomainStr = hr.RecipientDomain.Name()
if err := tx.Insert(&hr); err != nil {
return err
}
log.Info("adding hold rule", slog.Any("holdrule", hr))
q := bstore.QueryTx[Msg](tx)
if !hr.All() {
q.FilterNonzero(Msg{
SenderAccount: hr.Account,
SenderDomainStr: hr.SenderDomainStr,
RecipientDomainStr: hr.RecipientDomainStr,
})
}
n, err := q.UpdateField("Hold", true)
if err != nil {
return fmt.Errorf("marking existing matching messages in queue on hold: %v", err)
}
log.Info("marked messages in queue as on hold", slog.Int("messages", n))
return nil
})
if err != nil {
return HoldRule{}, err
}
queuekick()
metricHoldUpdate()
return hr, nil
}
// HoldRuleRemove removes a hold rule. The Hold field of existing messages are not
// changed.
func HoldRuleRemove(ctx context.Context, log mlog.Log, holdRuleID int64) error {
return DB.Write(ctx, func(tx *bstore.Tx) error {
hr := HoldRule{ID: holdRuleID}
if err := tx.Get(&hr); err != nil {
return err
}
log.Info("removing hold rule", slog.Any("holdrule", hr))
return tx.Delete(HoldRule{ID: holdRuleID})
})
}
// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time) Msg {
return Msg{
@ -223,7 +391,6 @@ func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, me
SenderDomain: sender.IPDomain,
RecipientLocalpart: recipient.Localpart,
RecipientDomain: recipient.IPDomain,
RecipientDomainStr: formatIPDomain(recipient.IPDomain),
Has8bit: has8bit,
SMTPUTF8: smtputf8,
Size: size,
@ -242,25 +409,20 @@ func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, me
//
// ID of the messagse must be 0 and will be set after inserting in the queue.
//
// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
// such as Queued, NextAttempt, LastAttempt, LastError.
// Add sets derived fields like SenderDomainStr and RecipientDomainStr, and fields
// related to queueing, such as Queued, NextAttempt, LastAttempt, LastError.
func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
if len(qml) == 0 {
return fmt.Errorf("must queue at least one message")
}
for _, qm := range qml {
for i, qm := range qml {
if qm.ID != 0 {
return fmt.Errorf("id of queued messages must be 0")
}
if qm.RecipientDomainStr == "" {
return fmt.Errorf("recipient domain cannot be empty")
}
// Sanity check, internal consistency.
rcptDom := formatIPDomain(qm.RecipientDomain)
if qm.RecipientDomainStr != rcptDom {
return fmt.Errorf("mismatch between recipient domain and string form of domain")
}
qml[i].SenderDomainStr = formatIPDomain(qm.SenderDomain)
qml[i].RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
}
if Localserve {
@ -307,12 +469,24 @@ func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.Fi
}
}()
// Mark messages Hold if they match a hold rule.
holdRules, err := bstore.QueryTx[HoldRule](tx).List()
if err != nil {
return fmt.Errorf("getting queue hold rules")
}
// Insert messages into queue. If there are multiple messages, they all get a
// non-zero BaseID that is the Msg.ID of the first message inserted.
var baseID int64
for i := range qml {
qml[i].SenderAccount = senderAccount
qml[i].BaseID = baseID
for _, hr := range holdRules {
if hr.matches(qml[i]) {
qml[i].Hold = true
break
}
}
if err := tx.Insert(&qml[i]); err != nil {
return err
}
@ -351,7 +525,15 @@ func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.Fi
tx = nil
paths = nil
for _, m := range qml {
if m.Hold {
metricHoldUpdate()
break
}
}
queuekick()
return nil
}
@ -374,36 +556,43 @@ func queuekick() {
}
}
// Kick sets the NextAttempt for messages matching all filter parameters (ID,
// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
// of those messages. If all parameters are zero, all messages are kicked. If
// transport is set, the delivery attempts for the matching messages will use the
// transport. An empty string is the default transport, i.e. direct delivery.
// Returns number of messages queued for immediate delivery.
func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
q := bstore.QueryDB[Msg](ctx, DB)
if ID > 0 {
q.FilterID(ID)
}
if toDomain != "" {
q.FilterEqual("RecipientDomainStr", toDomain)
}
if recipient != "" {
q.FilterFn(func(qm Msg) bool {
return qm.Recipient().XString(true) == recipient
})
}
up := map[string]any{"NextAttempt": time.Now()}
if transport != nil {
if *transport != "" {
_, ok := mox.Conf.Static.Transports[*transport]
if !ok {
return 0, fmt.Errorf("unknown transport %q", *transport)
// NextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
// kicks the queue.
func NextAttemptAdd(ctx context.Context, f Filter, d time.Duration) (affected int, err error) {
err = DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return err
}
var msgs []Msg
msgs, err := q.List()
if err != nil {
return fmt.Errorf("listing matching messages: %v", err)
}
for _, m := range msgs {
m.NextAttempt = m.NextAttempt.Add(d)
if err := tx.Update(&m); err != nil {
return err
}
}
up["Transport"] = *transport
affected = len(msgs)
return nil
})
if err != nil {
return 0, err
}
n, err := q.UpdateFields(up)
queuekick()
return affected, nil
}
// NextAttemptSet sets NextAttempt for all matching messages to a new time, and
// kicks the queue.
func NextAttemptSet(ctx context.Context, f Filter, t time.Time) (affected int, err error) {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return 0, err
}
n, err := q.UpdateNonzero(Msg{NextAttempt: t})
if err != nil {
return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
}
@ -411,21 +600,74 @@ func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *
return n, nil
}
// Drop removes messages from the queue that match all nonzero parameters.
// If all parameters are zero, all messages are removed.
// Returns number of messages removed.
func Drop(ctx context.Context, log mlog.Log, ID int64, toDomain string, recipient string) (int, error) {
// HoldSet sets Hold for all matching messages and kicks the queue.
func HoldSet(ctx context.Context, f Filter, hold bool) (affected int, err error) {
q := bstore.QueryDB[Msg](ctx, DB)
if ID > 0 {
q.FilterID(ID)
if err := f.apply(q); err != nil {
return 0, err
}
if toDomain != "" {
q.FilterEqual("RecipientDomainStr", toDomain)
n, err := q.UpdateFields(map[string]any{"Hold": hold})
if err != nil {
return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
}
if recipient != "" {
q.FilterFn(func(qm Msg) bool {
return qm.Recipient().XString(true) == recipient
})
queuekick()
metricHoldUpdate()
return n, nil
}
// TransportSet changes the transport to use for the matching messages.
func TransportSet(ctx context.Context, f Filter, transport string) (affected int, err error) {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return 0, err
}
n, err := q.UpdateFields(map[string]any{"Transport": transport})
if err != nil {
return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
}
queuekick()
return n, nil
}
// Fail marks matching messages as failed for delivery and delivers DSNs to the sender.
func Fail(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
err = DB.Write(ctx, func(tx *bstore.Tx) error {
q := bstore.QueryTx[Msg](tx)
if err := f.apply(q); err != nil {
return err
}
var msgs []Msg
q.Gather(&msgs)
n, err := q.Delete()
if err != nil {
return fmt.Errorf("selecting and deleting messages from queue: %v", err)
}
var remoteMTA dsn.NameIP
for _, m := range msgs {
if m.LastAttempt == nil {
now := time.Now()
m.LastAttempt = &now
}
deliverDSNFailure(ctx, log, m, remoteMTA, "", "delivery canceled by admin", nil)
}
affected = n
return nil
})
if err != nil {
return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
}
queuekick()
metricHoldUpdate()
return affected, nil
}
// Drop removes matching messages from the queue.
// Returns number of messages removed.
func Drop(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return 0, err
}
var msgs []Msg
q.Gather(&msgs)
@ -439,19 +681,20 @@ func Drop(ctx context.Context, log mlog.Log, ID int64, toDomain string, recipien
log.Errorx("removing queue message from file system", err, slog.Int64("queuemsgid", m.ID), slog.String("path", p))
}
}
queuekick()
metricHoldUpdate()
return n, nil
}
// SaveRequireTLS updates the RequireTLS field of the message with id.
func SaveRequireTLS(ctx context.Context, id int64, requireTLS *bool) error {
return DB.Write(ctx, func(tx *bstore.Tx) error {
m := Msg{ID: id}
if err := tx.Get(&m); err != nil {
return fmt.Errorf("get message: %w", err)
}
m.RequireTLS = requireTLS
return tx.Update(&m)
})
// RequireTLSSet updates the RequireTLS field of matching messages.
func RequireTLSSet(ctx context.Context, f Filter, requireTLS *bool) (affected int, err error) {
q := bstore.QueryDB[Msg](ctx, DB)
if err := f.apply(q); err != nil {
return 0, err
}
n, err := q.UpdateFields(map[string]any{"RequireTLS": requireTLS})
queuekick()
return n, err
}
type ReadReaderAtCloser interface {
@ -522,6 +765,7 @@ func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}
}
q.FilterNotEqual("RecipientDomainStr", doms...)
}
q.FilterEqual("Hold", false)
q.SortAsc("NextAttempt")
q.Limit(1)
qm, err := q.Get()
@ -537,6 +781,7 @@ func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}
func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
q := bstore.QueryDB[Msg](mox.Shutdown, DB)
q.FilterLessEqual("NextAttempt", time.Now())
q.FilterEqual("Hold", false)
q.SortAsc("NextAttempt")
q.Limit(maxConcurrentDeliveries)
if len(busyDomains) > 0 {
@ -679,6 +924,7 @@ func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
q.FilterNonzero(Msg{BaseID: m.BaseID, RecipientDomainStr: m.RecipientDomainStr, Attempts: m.Attempts - 1})
q.FilterNotEqual("ID", m.ID)
q.FilterLessEqual("NextAttempt", origNextAttempt)
q.FilterEqual("Hold", false)
err := q.ForEach(func(xm Msg) error {
mrtls := m.RequireTLS != nil
xmrtls := xm.RequireTLS != nil