update to latest bstore (with support for an index on a []string: Message.DKIMDomains), and cyclic data types (to be used for Message.Part soon); also adds a context.Context to database operations.

This commit is contained in:
Mechiel Lukkien
2023-05-22 14:40:36 +02:00
parent f6ed860ccb
commit e81930ba20
58 changed files with 1970 additions and 1035 deletions

View File

@ -122,7 +122,7 @@ func Init() error {
}
var err error
queueDB, err = bstore.Open(qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, Msg{})
queueDB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, Msg{})
if err != nil {
if isNew {
os.Remove(qpath)
@ -141,8 +141,8 @@ func Shutdown() {
// List returns all messages in the delivery queue.
// Ordered by earliest delivery attempt first.
func List() ([]Msg, error) {
qmsgs, err := bstore.QueryDB[Msg](queueDB).List()
func List(ctx context.Context) ([]Msg, error) {
qmsgs, err := bstore.QueryDB[Msg](ctx, queueDB).List()
if err != nil {
return nil, err
}
@ -165,8 +165,8 @@ func List() ([]Msg, error) {
}
// Count returns the number of messages in the delivery queue.
func Count() (int, error) {
return bstore.QueryDB[Msg](queueDB).Count()
func Count(ctx context.Context) (int, error) {
return bstore.QueryDB[Msg](ctx, queueDB).Count()
}
// Add a new message to the queue. The queue is kicked immediately to start a
@ -179,7 +179,7 @@ func Count() (int, error) {
// this data is used as the message when delivering the DSN and the remote SMTP
// server supports SMTPUTF8. If the remote SMTP server does not support SMTPUTF8,
// the regular non-utf8 message is delivered.
func Add(log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, consumeFile bool) error {
func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, consumeFile bool) error {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
if Localserve {
@ -187,7 +187,7 @@ func Add(log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bi
return fmt.Errorf("no queuing with localserve")
}
tx, err := queueDB.Begin(true)
tx, err := queueDB.Begin(ctx, true)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
@ -287,8 +287,8 @@ func queuekick() {
// and kicks the queue, attempting delivery of those messages. If all parameters
// are zero, all messages are kicked.
// Returns number of messages queued for immediate delivery.
func Kick(ID int64, toDomain string, recipient string) (int, error) {
q := bstore.QueryDB[Msg](queueDB)
func Kick(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
q := bstore.QueryDB[Msg](ctx, queueDB)
if ID > 0 {
q.FilterID(ID)
}
@ -311,8 +311,8 @@ func Kick(ID int64, toDomain string, recipient string) (int, error) {
// Drop removes messages from the queue that match all nonzero parameters.
// If all parameters are zero, all messages are removed.
// Returns number of messages removed.
func Drop(ID int64, toDomain string, recipient string) (int, error) {
q := bstore.QueryDB[Msg](queueDB)
func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
q := bstore.QueryDB[Msg](ctx, queueDB)
if ID > 0 {
q.FilterID(ID)
}
@ -337,9 +337,9 @@ type ReadReaderAtCloser interface {
}
// OpenMessage opens a message present in the queue.
func OpenMessage(id int64) (ReadReaderAtCloser, error) {
func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
qm := Msg{ID: id}
err := queueDB.Get(&qm)
err := queueDB.Get(ctx, &qm)
if err != nil {
return nil, err
}
@ -382,14 +382,14 @@ func Start(resolver dns.Resolver, done chan struct{}) error {
}
launchWork(resolver, busyDomains)
timer.Reset(nextWork(busyDomains))
timer.Reset(nextWork(mox.Shutdown, busyDomains))
}
}()
return nil
}
func nextWork(busyDomains map[string]struct{}) time.Duration {
q := bstore.QueryDB[Msg](queueDB)
func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duration {
q := bstore.QueryDB[Msg](ctx, queueDB)
if len(busyDomains) > 0 {
var doms []any
for d := range busyDomains {
@ -410,7 +410,7 @@ func nextWork(busyDomains map[string]struct{}) time.Duration {
}
func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
q := bstore.QueryDB[Msg](queueDB)
q := bstore.QueryDB[Msg](mox.Shutdown, queueDB)
q.FilterLessEqual("NextAttempt", time.Now())
q.SortAsc("NextAttempt")
q.Limit(maxConcurrentDeliveries)
@ -424,7 +424,7 @@ func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
msgs, err := q.List()
if err != nil {
xlog.Errorx("querying for work in queue", err)
mox.Sleep(mox.Context, 1*time.Second)
mox.Sleep(mox.Shutdown, 1*time.Second)
return -1
}
@ -436,8 +436,8 @@ func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
}
// Remove message from queue in database and file system.
func queueDelete(msgID int64) error {
if err := queueDB.Delete(&Msg{ID: msgID}); err != nil {
func queueDelete(ctx context.Context, msgID int64) error {
if err := queueDB.Delete(ctx, &Msg{ID: msgID}); err != nil {
return err
}
// If removing from database fails, we'll also leave the file in the file system.
@ -483,7 +483,7 @@ func deliver(resolver dns.Resolver, m Msg) {
now := time.Now()
m.LastAttempt = &now
m.NextAttempt = now.Add(backoff)
qup := bstore.QueryDB[Msg](queueDB)
qup := bstore.QueryDB[Msg](mox.Shutdown, queueDB)
qup.FilterID(m.ID)
update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
if _, err := qup.UpdateNonzero(update); err != nil {
@ -496,13 +496,13 @@ func deliver(resolver dns.Resolver, m Msg) {
qlog.Errorx("permanent failure delivering from queue", errors.New(errmsg))
queueDSNFailure(qlog, m, remoteMTA, secodeOpt, errmsg)
if err := queueDelete(m.ID); err != nil {
if err := queueDelete(context.Background(), m.ID); err != nil {
qlog.Errorx("deleting message from queue after permanent failure", err)
}
return
}
qup := bstore.QueryDB[Msg](queueDB)
qup := bstore.QueryDB[Msg](context.Background(), queueDB)
qup.FilterID(m.ID)
if _, err := qup.UpdateNonzero(Msg{LastError: errmsg, DialedIPs: m.DialedIPs}); err != nil {
qlog.Errorx("storing delivery error", err, mlog.Field("deliveryerror", errmsg))
@ -534,7 +534,7 @@ func deliver(resolver dns.Resolver, m Msg) {
var policy *mtasts.Policy
tlsModeDefault := smtpclient.TLSOpportunistic
if !effectiveDomain.IsZero() {
cidctx := context.WithValue(mox.Context, mlog.CidKey, cid)
cidctx := context.WithValue(mox.Shutdown, mlog.CidKey, cid)
policy, policyFresh, err = mtastsdb.Get(cidctx, resolver, effectiveDomain)
if err != nil {
// No need to refuse to deliver if we have some mtasts error.
@ -586,7 +586,7 @@ func deliver(resolver dns.Resolver, m Msg) {
}
if ok {
nqlog.Info("delivered from queue")
if err := queueDelete(m.ID); err != nil {
if err := queueDelete(context.Background(), m.ID); err != nil {
nqlog.Errorx("deleting message from queue after delivery", err)
}
return