mirror of
https://github.com/mjl-/mox.git
synced 2025-07-12 17:04:39 +03:00
add a "backup" subcommand to make consistent backups, and a "verifydata" subcommand to verify a backup before restoring, and add tests for future upgrades
the backup command will make consistent snapshots of all the database files. i had been copying the db files before, and it usually works. but if the file is modified during the backup, it is inconsistent and is likely to generate errors when reading (can be at any moment in the future, when reading some db page). "mox backup" opens the database file and writes out a copy in a transaction. it also duplicates the message files. before doing a restore, you could run "mox verifydata" on the to-be-restored "data" directory. it check the database files, and compares the message files with the database. the new "gentestdata" subcommand generates a basic "data" directory, with a queue and a few accounts. we will use it in the future along with "verifydata" to test upgrades from old version to the latest version. both when going to the next version, and when skipping several versions. the script test-upgrades.sh executes these tests and doesn't do anything at the moment, because no releases have this subcommand yet. inspired by a failed upgrade attempt of a pre-release version.
This commit is contained in:
@ -70,7 +70,8 @@ var dial = func(ctx context.Context, timeout time.Duration, addr string, laddr n
|
||||
|
||||
var jitter = mox.NewRand()
|
||||
|
||||
var queueDB *bstore.DB
|
||||
var DBTypes = []any{Msg{}} // Types stored in DB.
|
||||
var DB *bstore.DB // Exported for making backups.
|
||||
|
||||
// Set for mox localserve, to prevent queueing.
|
||||
var Localserve bool
|
||||
@ -122,7 +123,7 @@ func Init() error {
|
||||
}
|
||||
|
||||
var err error
|
||||
queueDB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, Msg{})
|
||||
DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
|
||||
if err != nil {
|
||||
if isNew {
|
||||
os.Remove(qpath)
|
||||
@ -134,15 +135,15 @@ func Init() error {
|
||||
|
||||
// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
|
||||
func Shutdown() {
|
||||
err := queueDB.Close()
|
||||
err := DB.Close()
|
||||
xlog.Check(err, "closing queue db")
|
||||
queueDB = nil
|
||||
DB = nil
|
||||
}
|
||||
|
||||
// List returns all messages in the delivery queue.
|
||||
// Ordered by earliest delivery attempt first.
|
||||
func List(ctx context.Context) ([]Msg, error) {
|
||||
qmsgs, err := bstore.QueryDB[Msg](ctx, queueDB).List()
|
||||
qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -166,7 +167,7 @@ func List(ctx context.Context) ([]Msg, error) {
|
||||
|
||||
// Count returns the number of messages in the delivery queue.
|
||||
func Count(ctx context.Context) (int, error) {
|
||||
return bstore.QueryDB[Msg](ctx, queueDB).Count()
|
||||
return bstore.QueryDB[Msg](ctx, DB).Count()
|
||||
}
|
||||
|
||||
// Add a new message to the queue. The queue is kicked immediately to start a
|
||||
@ -187,7 +188,7 @@ func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcp
|
||||
return fmt.Errorf("no queuing with localserve")
|
||||
}
|
||||
|
||||
tx, err := queueDB.Begin(ctx, true)
|
||||
tx, err := DB.Begin(ctx, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin transaction: %w", err)
|
||||
}
|
||||
@ -288,7 +289,7 @@ func queuekick() {
|
||||
// are zero, all messages are kicked.
|
||||
// Returns number of messages queued for immediate delivery.
|
||||
func Kick(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
|
||||
q := bstore.QueryDB[Msg](ctx, queueDB)
|
||||
q := bstore.QueryDB[Msg](ctx, DB)
|
||||
if ID > 0 {
|
||||
q.FilterID(ID)
|
||||
}
|
||||
@ -312,7 +313,7 @@ func Kick(ctx context.Context, ID int64, toDomain string, recipient string) (int
|
||||
// If all parameters are zero, all messages are removed.
|
||||
// Returns number of messages removed.
|
||||
func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
|
||||
q := bstore.QueryDB[Msg](ctx, queueDB)
|
||||
q := bstore.QueryDB[Msg](ctx, DB)
|
||||
if ID > 0 {
|
||||
q.FilterID(ID)
|
||||
}
|
||||
@ -347,7 +348,7 @@ type ReadReaderAtCloser interface {
|
||||
// OpenMessage opens a message present in the queue.
|
||||
func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
|
||||
qm := Msg{ID: id}
|
||||
err := queueDB.Get(ctx, &qm)
|
||||
err := DB.Get(ctx, &qm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -397,7 +398,7 @@ func Start(resolver dns.Resolver, done chan struct{}) error {
|
||||
}
|
||||
|
||||
func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duration {
|
||||
q := bstore.QueryDB[Msg](ctx, queueDB)
|
||||
q := bstore.QueryDB[Msg](ctx, DB)
|
||||
if len(busyDomains) > 0 {
|
||||
var doms []any
|
||||
for d := range busyDomains {
|
||||
@ -418,7 +419,7 @@ func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duratio
|
||||
}
|
||||
|
||||
func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
|
||||
q := bstore.QueryDB[Msg](mox.Shutdown, queueDB)
|
||||
q := bstore.QueryDB[Msg](mox.Shutdown, DB)
|
||||
q.FilterLessEqual("NextAttempt", time.Now())
|
||||
q.SortAsc("NextAttempt")
|
||||
q.Limit(maxConcurrentDeliveries)
|
||||
@ -445,7 +446,7 @@ func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
|
||||
|
||||
// Remove message from queue in database and file system.
|
||||
func queueDelete(ctx context.Context, msgID int64) error {
|
||||
if err := queueDB.Delete(ctx, &Msg{ID: msgID}); err != nil {
|
||||
if err := DB.Delete(ctx, &Msg{ID: msgID}); err != nil {
|
||||
return err
|
||||
}
|
||||
// If removing from database fails, we'll also leave the file in the file system.
|
||||
@ -491,7 +492,7 @@ func deliver(resolver dns.Resolver, m Msg) {
|
||||
now := time.Now()
|
||||
m.LastAttempt = &now
|
||||
m.NextAttempt = now.Add(backoff)
|
||||
qup := bstore.QueryDB[Msg](mox.Shutdown, queueDB)
|
||||
qup := bstore.QueryDB[Msg](mox.Shutdown, DB)
|
||||
qup.FilterID(m.ID)
|
||||
update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
|
||||
if _, err := qup.UpdateNonzero(update); err != nil {
|
||||
@ -510,7 +511,7 @@ func deliver(resolver dns.Resolver, m Msg) {
|
||||
return
|
||||
}
|
||||
|
||||
qup := bstore.QueryDB[Msg](context.Background(), queueDB)
|
||||
qup := bstore.QueryDB[Msg](context.Background(), DB)
|
||||
qup.FilterID(m.ID)
|
||||
if _, err := qup.UpdateNonzero(Msg{LastError: errmsg, DialedIPs: m.DialedIPs}); err != nil {
|
||||
qlog.Errorx("storing delivery error", err, mlog.Field("deliveryerror", errmsg))
|
||||
|
@ -138,7 +138,7 @@ func TestQueue(t *testing.T) {
|
||||
case <-dialed:
|
||||
i := 0
|
||||
for {
|
||||
m, err := bstore.QueryDB[Msg](ctxbg, queueDB).Get()
|
||||
m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
|
||||
tcheck(t, err, "get")
|
||||
if m.Attempts == 1 {
|
||||
break
|
||||
@ -288,7 +288,7 @@ func TestQueue(t *testing.T) {
|
||||
for i := 1; i < 8; i++ {
|
||||
go func() { <-deliveryResult }() // Deliver sends here.
|
||||
deliver(resolver, msg)
|
||||
err = queueDB.Get(ctxbg, &msg)
|
||||
err = DB.Get(ctxbg, &msg)
|
||||
tcheck(t, err, "get msg")
|
||||
if msg.Attempts != i {
|
||||
t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
|
||||
@ -311,7 +311,7 @@ func TestQueue(t *testing.T) {
|
||||
// Trigger final failure.
|
||||
go func() { <-deliveryResult }() // Deliver sends here.
|
||||
deliver(resolver, msg)
|
||||
err = queueDB.Get(ctxbg, &msg)
|
||||
err = DB.Get(ctxbg, &msg)
|
||||
if err != bstore.ErrAbsent {
|
||||
t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user