From 75036c3a7180be0bf67fdb6f29894d8435fcf9f2 Mon Sep 17 00:00:00 2001 From: Mechiel Lukkien Date: Fri, 21 Mar 2025 10:18:39 +0100 Subject: [PATCH] Before moving message files in imapserver and webmail API, ensure the message destination directory for the newly assigned IDs exist. Example symptom, when deleting a message in the webmail (which moves to Trash): l=error m="duplicating message in old mailbox for current sessions" err="link data/accounts/mjl/msg/I/368638 data/accounts/mjl/msg/J/368640: no such file or directory" pkg=webmail Problem introduced a few weeks ago, where moving messages starting duplicating the message first, and the copy is erased once all references (in IMAP sessions) to the old mailbox have been removed. --- imapserver/server.go | 17 ++++++++++++++++- main.go | 1 + queue/queue.go | 15 +++++++++++++-- store/account.go | 14 ++++++++++++-- store/account_test.go | 6 +++--- webops/xops.go | 17 ++++++++++++++++- 6 files changed, 61 insertions(+), 9 deletions(-) diff --git a/imapserver/server.go b/imapserver/server.go index 723b724..e911988 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -4263,6 +4263,9 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe xcheckf(fmt.Errorf("moved %d messages, expected %d", len(l), expectCount), "move messages") } + // For newly created message directories that we sync after hardlinking/copying files. + syncDirs := map[string]struct{}{} + for _, om := range l { nm := om nm.MailboxID = mbDst.ID @@ -4294,7 +4297,14 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe err = tx.Insert(&om) xcheckf(err, "inserting expunged message in old mailbox") - err = moxio.LinkOrCopy(c.log, c.account.MessagePath(om.ID), c.account.MessagePath(nm.ID), nil, false) + dstPath := c.account.MessagePath(om.ID) + dstDir := filepath.Dir(dstPath) + if _, ok := syncDirs[dstDir]; !ok { + os.MkdirAll(dstDir, 0770) + syncDirs[dstDir] = struct{}{} + } + + err = moxio.LinkOrCopy(c.log, dstPath, c.account.MessagePath(nm.ID), nil, false) xcheckf(err, "duplicating message in old mailbox for current sessions") newIDs = append(newIDs, nm.ID) // We don't sync the directory. In case of a crash and files disappearing, the @@ -4321,6 +4331,11 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe } xcheckf(err, "move messages") + for dir := range syncDirs { + err := moxio.SyncDir(c.log, dir) + xcheckf(err, "sync directory") + } + changes = append(changes, changeRemoveUIDs, mbSrc.ChangeCounts()) err = tx.Update(mbSrc) diff --git a/main.go b/main.go index bd04037..b3e03d6 100644 --- a/main.go +++ b/main.go @@ -459,6 +459,7 @@ func main() { // mox server should never use it. But integration tests enable it again with a // flag. store.CheckConsistencyOnClose = false + store.MsgFilesPerDirShiftSet(13) // For 1<<13 = 8k message files per directory. ctxbg := context.Background() mox.Shutdown = ctxbg diff --git a/queue/queue.go b/queue/queue.go index 2fdcf33..07c5949 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -713,14 +713,25 @@ func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.Fi } }() + syncDirs := map[string]struct{}{} + for _, qm := range qml { dst := qm.MessagePath() paths = append(paths, dst) + dstDir := filepath.Dir(dst) - os.MkdirAll(dstDir, 0770) + if _, ok := syncDirs[dstDir]; !ok { + os.MkdirAll(dstDir, 0770) + syncDirs[dstDir] = struct{}{} + } + if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil { return fmt.Errorf("linking/copying message to new file: %s", err) - } else if err := moxio.SyncDir(log, dstDir); err != nil { + } + } + + for dir := range syncDirs { + if err := moxio.SyncDir(log, dir); err != nil { return fmt.Errorf("sync directory: %v", err) } } diff --git a/store/account.go b/store/account.go index 1691f40..fe1871a 100644 --- a/store/account.go +++ b/store/account.go @@ -3198,9 +3198,19 @@ func OpenEmail(log mlog.Log, email string, checkLoginDisabled bool) (*Account, s return acc, accountName, dest, nil } +// We store max 1</msg or queue. @@ -3212,7 +3222,7 @@ func MessagePath(messageID int64) string { // messagePathElems returns the elems, for a single join without intermediate // string allocations. func messagePathElems(messageID int64) []string { - v := messageID >> 13 // 8k files per directory. + v := messageID >> msgFilesPerDirShift dir := "" for { dir += string(msgDirChars[int(v)&(len(msgDirChars)-1)]) diff --git a/store/account_test.go b/store/account_test.go index bb30de6..7c679a7 100644 --- a/store/account_test.go +++ b/store/account_test.go @@ -394,7 +394,7 @@ func TestNextMessageID(t *testing.T) { acc, err = OpenAccount(log, "mjl", false) tcheck(t, err, "open account") - // Deliver a message. It should get ID 8*1024+1. + // Deliver a message. It should get ID $msgFilesPerDir+1. mf, err = CreateMessageTemp(log, "account-test") tcheck(t, err, "creating temp message file") _, err = mf.Write(msgData) @@ -405,8 +405,8 @@ func TestNextMessageID(t *testing.T) { } err = acc.DeliverMailbox(log, "Inbox", &m, mf) tcheck(t, err, "deliver mailbox") - if m.ID != 8*1024+1 { - t.Fatalf("got message id %d, expected 8*1024+1", m.ID) + if m.ID != msgFilesPerDir+1 { + t.Fatalf("got message id %d, expected $msgFilesPerDir+1", m.ID) } err = acc.Close() diff --git a/webops/xops.go b/webops/xops.go index e83fe4a..e0af97c 100644 --- a/webops/xops.go +++ b/webops/xops.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "os" + "path/filepath" "slices" "sort" "time" @@ -451,6 +452,8 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun nkeywords := len(mbDst.Keywords) now := time.Now() + syncDirs := map[string]struct{}{} + for _, om := range l { if om.MailboxID != mbSrc.ID { if mbSrc.ID != 0 { @@ -494,7 +497,14 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun err = tx.Insert(&om) x.Checkf(ctx, err, "inserting expunged message in old mailbox") - err = moxio.LinkOrCopy(log, acc.MessagePath(om.ID), acc.MessagePath(nm.ID), nil, false) + dstPath := acc.MessagePath(om.ID) + dstDir := filepath.Dir(dstPath) + if _, ok := syncDirs[dstDir]; !ok { + os.MkdirAll(dstDir, 0770) + syncDirs[dstDir] = struct{}{} + } + + err = moxio.LinkOrCopy(log, dstPath, acc.MessagePath(nm.ID), nil, false) x.Checkf(ctx, err, "duplicating message in old mailbox for current sessions") newIDs = append(newIDs, nm.ID) // We don't sync the directory. In case of a crash and files disappearing, the @@ -520,6 +530,11 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun changes = append(changes, nm.ChangeAddUID()) } + for dir := range syncDirs { + err := moxio.SyncDir(log, dir) + x.Checkf(ctx, err, "sync directory") + } + xflushMailbox() changes = append(changes, mbDst.ChangeCounts())