Before moving message files in imapserver and webmail API, ensure the message destination directory for the newly assigned IDs exist.

Example symptom, when deleting a message in the webmail (which moves to Trash):

        l=error m="duplicating message in old mailbox for current sessions" err="link data/accounts/mjl/msg/I/368638 data/accounts/mjl/msg/J/368640: no such file or directory" pkg=webmail

Problem introduced a few weeks ago, where moving messages starting duplicating
the message first, and the copy is erased once all references (in IMAP
sessions) to the old mailbox have been removed.
This commit is contained in:
Mechiel Lukkien 2025-03-21 10:18:39 +01:00
parent 99f9eb438f
commit 75036c3a71
No known key found for this signature in database
6 changed files with 61 additions and 9 deletions

View File

@ -4263,6 +4263,9 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe
xcheckf(fmt.Errorf("moved %d messages, expected %d", len(l), expectCount), "move messages") xcheckf(fmt.Errorf("moved %d messages, expected %d", len(l), expectCount), "move messages")
} }
// For newly created message directories that we sync after hardlinking/copying files.
syncDirs := map[string]struct{}{}
for _, om := range l { for _, om := range l {
nm := om nm := om
nm.MailboxID = mbDst.ID nm.MailboxID = mbDst.ID
@ -4294,7 +4297,14 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe
err = tx.Insert(&om) err = tx.Insert(&om)
xcheckf(err, "inserting expunged message in old mailbox") xcheckf(err, "inserting expunged message in old mailbox")
err = moxio.LinkOrCopy(c.log, c.account.MessagePath(om.ID), c.account.MessagePath(nm.ID), nil, false) dstPath := c.account.MessagePath(om.ID)
dstDir := filepath.Dir(dstPath)
if _, ok := syncDirs[dstDir]; !ok {
os.MkdirAll(dstDir, 0770)
syncDirs[dstDir] = struct{}{}
}
err = moxio.LinkOrCopy(c.log, dstPath, c.account.MessagePath(nm.ID), nil, false)
xcheckf(err, "duplicating message in old mailbox for current sessions") xcheckf(err, "duplicating message in old mailbox for current sessions")
newIDs = append(newIDs, nm.ID) newIDs = append(newIDs, nm.ID)
// We don't sync the directory. In case of a crash and files disappearing, the // We don't sync the directory. In case of a crash and files disappearing, the
@ -4321,6 +4331,11 @@ func (c *conn) xmoveMessages(tx *bstore.Tx, q *bstore.Query[store.Message], expe
} }
xcheckf(err, "move messages") xcheckf(err, "move messages")
for dir := range syncDirs {
err := moxio.SyncDir(c.log, dir)
xcheckf(err, "sync directory")
}
changes = append(changes, changeRemoveUIDs, mbSrc.ChangeCounts()) changes = append(changes, changeRemoveUIDs, mbSrc.ChangeCounts())
err = tx.Update(mbSrc) err = tx.Update(mbSrc)

View File

@ -459,6 +459,7 @@ func main() {
// mox server should never use it. But integration tests enable it again with a // mox server should never use it. But integration tests enable it again with a
// flag. // flag.
store.CheckConsistencyOnClose = false store.CheckConsistencyOnClose = false
store.MsgFilesPerDirShiftSet(13) // For 1<<13 = 8k message files per directory.
ctxbg := context.Background() ctxbg := context.Background()
mox.Shutdown = ctxbg mox.Shutdown = ctxbg

View File

@ -713,14 +713,25 @@ func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.Fi
} }
}() }()
syncDirs := map[string]struct{}{}
for _, qm := range qml { for _, qm := range qml {
dst := qm.MessagePath() dst := qm.MessagePath()
paths = append(paths, dst) paths = append(paths, dst)
dstDir := filepath.Dir(dst) dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770) if _, ok := syncDirs[dstDir]; !ok {
os.MkdirAll(dstDir, 0770)
syncDirs[dstDir] = struct{}{}
}
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil { if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err) return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(log, dstDir); err != nil { }
}
for dir := range syncDirs {
if err := moxio.SyncDir(log, dir); err != nil {
return fmt.Errorf("sync directory: %v", err) return fmt.Errorf("sync directory: %v", err)
} }
} }

View File

@ -3198,9 +3198,19 @@ func OpenEmail(log mlog.Log, email string, checkLoginDisabled bool) (*Account, s
return acc, accountName, dest, nil return acc, accountName, dest, nil
} }
// We store max 1<<shift files in each subdir of an account "msg" directory.
// Defaults to 1 for easy use in tests. Set to 13, for 8k message files, in main
// for normal operation.
var msgFilesPerDirShift = 1
var msgFilesPerDir int64 = 1 << msgFilesPerDirShift
func MsgFilesPerDirShiftSet(shift int) {
msgFilesPerDirShift = shift
msgFilesPerDir = 1 << shift
}
// 64 characters, must be power of 2 for MessagePath // 64 characters, must be power of 2 for MessagePath
const msgDirChars = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ-_" const msgDirChars = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
const msgFilesPerDir = 8 * 1024
// MessagePath returns the filename of the on-disk filename, relative to the // MessagePath returns the filename of the on-disk filename, relative to the
// containing directory such as <account>/msg or queue. // containing directory such as <account>/msg or queue.
@ -3212,7 +3222,7 @@ func MessagePath(messageID int64) string {
// messagePathElems returns the elems, for a single join without intermediate // messagePathElems returns the elems, for a single join without intermediate
// string allocations. // string allocations.
func messagePathElems(messageID int64) []string { func messagePathElems(messageID int64) []string {
v := messageID >> 13 // 8k files per directory. v := messageID >> msgFilesPerDirShift
dir := "" dir := ""
for { for {
dir += string(msgDirChars[int(v)&(len(msgDirChars)-1)]) dir += string(msgDirChars[int(v)&(len(msgDirChars)-1)])

View File

@ -394,7 +394,7 @@ func TestNextMessageID(t *testing.T) {
acc, err = OpenAccount(log, "mjl", false) acc, err = OpenAccount(log, "mjl", false)
tcheck(t, err, "open account") tcheck(t, err, "open account")
// Deliver a message. It should get ID 8*1024+1. // Deliver a message. It should get ID $msgFilesPerDir+1.
mf, err = CreateMessageTemp(log, "account-test") mf, err = CreateMessageTemp(log, "account-test")
tcheck(t, err, "creating temp message file") tcheck(t, err, "creating temp message file")
_, err = mf.Write(msgData) _, err = mf.Write(msgData)
@ -405,8 +405,8 @@ func TestNextMessageID(t *testing.T) {
} }
err = acc.DeliverMailbox(log, "Inbox", &m, mf) err = acc.DeliverMailbox(log, "Inbox", &m, mf)
tcheck(t, err, "deliver mailbox") tcheck(t, err, "deliver mailbox")
if m.ID != 8*1024+1 { if m.ID != msgFilesPerDir+1 {
t.Fatalf("got message id %d, expected 8*1024+1", m.ID) t.Fatalf("got message id %d, expected $msgFilesPerDir+1", m.ID)
} }
err = acc.Close() err = acc.Close()

View File

@ -8,6 +8,7 @@ import (
"io" "io"
"log/slog" "log/slog"
"os" "os"
"path/filepath"
"slices" "slices"
"sort" "sort"
"time" "time"
@ -451,6 +452,8 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun
nkeywords := len(mbDst.Keywords) nkeywords := len(mbDst.Keywords)
now := time.Now() now := time.Now()
syncDirs := map[string]struct{}{}
for _, om := range l { for _, om := range l {
if om.MailboxID != mbSrc.ID { if om.MailboxID != mbSrc.ID {
if mbSrc.ID != 0 { if mbSrc.ID != 0 {
@ -494,7 +497,14 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun
err = tx.Insert(&om) err = tx.Insert(&om)
x.Checkf(ctx, err, "inserting expunged message in old mailbox") x.Checkf(ctx, err, "inserting expunged message in old mailbox")
err = moxio.LinkOrCopy(log, acc.MessagePath(om.ID), acc.MessagePath(nm.ID), nil, false) dstPath := acc.MessagePath(om.ID)
dstDir := filepath.Dir(dstPath)
if _, ok := syncDirs[dstDir]; !ok {
os.MkdirAll(dstDir, 0770)
syncDirs[dstDir] = struct{}{}
}
err = moxio.LinkOrCopy(log, dstPath, acc.MessagePath(nm.ID), nil, false)
x.Checkf(ctx, err, "duplicating message in old mailbox for current sessions") x.Checkf(ctx, err, "duplicating message in old mailbox for current sessions")
newIDs = append(newIDs, nm.ID) newIDs = append(newIDs, nm.ID)
// We don't sync the directory. In case of a crash and files disappearing, the // We don't sync the directory. In case of a crash and files disappearing, the
@ -520,6 +530,11 @@ func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Accoun
changes = append(changes, nm.ChangeAddUID()) changes = append(changes, nm.ChangeAddUID())
} }
for dir := range syncDirs {
err := moxio.SyncDir(log, dir)
x.Checkf(ctx, err, "sync directory")
}
xflushMailbox() xflushMailbox()
changes = append(changes, mbDst.ChangeCounts()) changes = append(changes, mbDst.ChangeCounts())