mirror of
https://github.com/mjl-/mox.git
synced 2025-07-13 00:54:38 +03:00
Improve expunged message/UID tracking in IMAP sessions, track synchronization history for mailboxes/annotations.
Keeping the message files around, and the message details in the database, is useful for IMAP sessions that haven't seen/processed the removal of a message yet and try to fetch it. Before, we would return errors. Similarly, a session that has a mailbox selected that is removed can (at least in theory) still read messages. The mechanics to do this need keeping removed mailboxes around too. JMAP needs that anyway, so we now keep modseq/createseq/expunged history for mailboxes too. And while we're at it, for annotations as well. For future JMAP support, we now also keep the mailbox parent id around for a mailbox, with an upgrade step to set the field for existing mailboxes and fixing up potential missing parents (which could possibly have happened in an obscure corner case that I doubt anyone ran into).
This commit is contained in:
1054
store/account.go
1054
store/account.go
File diff suppressed because it is too large
Load Diff
@ -46,7 +46,7 @@ func TestMailbox(t *testing.T) {
|
||||
defer func() {
|
||||
err = acc.Close()
|
||||
tcheck(t, err, "closing account")
|
||||
acc.CheckClosed()
|
||||
acc.WaitClosed()
|
||||
}()
|
||||
defer Switchboard()()
|
||||
|
||||
@ -162,6 +162,8 @@ func TestMailbox(t *testing.T) {
|
||||
|
||||
var modseq ModSeq
|
||||
acc.WithWLock(func() {
|
||||
var changes []Change
|
||||
|
||||
err := acc.DB.Write(ctxbg, func(tx *bstore.Tx) error {
|
||||
_, _, err := acc.MailboxEnsure(tx, "Testbox", true, SpecialUse{}, &modseq)
|
||||
return err
|
||||
@ -200,27 +202,33 @@ func TestMailbox(t *testing.T) {
|
||||
t.Fatalf("did not find Testbox2")
|
||||
}
|
||||
|
||||
changes, err := acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
nchanges, err := acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
tcheck(t, err, "ensuring new subscription")
|
||||
if len(changes) == 0 {
|
||||
if len(nchanges) == 0 {
|
||||
t.Fatalf("new subscription did not result in changes")
|
||||
}
|
||||
changes, err = acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
changes = append(changes, nchanges...)
|
||||
nchanges, err = acc.SubscriptionEnsure(tx, "Testbox2")
|
||||
tcheck(t, err, "ensuring already present subscription")
|
||||
if len(changes) != 0 {
|
||||
if len(nchanges) != 0 {
|
||||
t.Fatalf("already present subscription resulted in changes")
|
||||
}
|
||||
|
||||
// todo: check that messages are removed.
|
||||
mbRej, err := bstore.QueryTx[Mailbox](tx).FilterNonzero(Mailbox{Name: "Rejects"}).Get()
|
||||
tcheck(t, err, "get rejects mailbox")
|
||||
nchanges, hasSpace, err := acc.TidyRejectsMailbox(log, tx, &mbRej)
|
||||
tcheck(t, err, "tidy rejects mailbox")
|
||||
changes = append(changes, nchanges...)
|
||||
if !hasSpace {
|
||||
t.Fatalf("no space for more rejects")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
tcheck(t, err, "write tx")
|
||||
|
||||
// todo: check that messages are removed.
|
||||
hasSpace, err := acc.TidyRejectsMailbox(log, "Rejects")
|
||||
tcheck(t, err, "tidy rejects mailbox")
|
||||
if !hasSpace {
|
||||
t.Fatalf("no space for more rejects")
|
||||
}
|
||||
BroadcastChanges(acc, changes)
|
||||
|
||||
acc.RejectsRemove(log, "Rejects", "m01@mox.example")
|
||||
})
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
@ -18,6 +17,7 @@ import (
|
||||
"github.com/mjl-/bstore"
|
||||
|
||||
"github.com/mjl-/mox/mlog"
|
||||
"github.com/mjl-/mox/mox-"
|
||||
)
|
||||
|
||||
// Archiver can archive multiple mailboxes and their messages.
|
||||
@ -158,9 +158,10 @@ func ExportMessages(ctx context.Context, log mlog.Log, db *bstore.DB, accountDir
|
||||
var trimPrefix string
|
||||
if mailboxOpt != "" {
|
||||
// If exporting a specific mailbox, trim its parent path from stored file names.
|
||||
trimPrefix = path.Dir(mailboxOpt) + "/"
|
||||
trimPrefix = mox.ParentMailboxName(mailboxOpt) + "/"
|
||||
}
|
||||
q := bstore.QueryTx[Mailbox](tx)
|
||||
q.FilterEqual("Expunged", false)
|
||||
q.FilterFn(func(mb Mailbox) bool {
|
||||
return mailboxOpt == "" || mb.Name == mailboxOpt || recursive && strings.HasPrefix(mb.Name, prefix)
|
||||
})
|
||||
|
345
store/state.go
345
store/state.go
@ -1,14 +1,24 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/mjl-/bstore"
|
||||
|
||||
"github.com/mjl-/mox/metrics"
|
||||
"github.com/mjl-/mox/mlog"
|
||||
"github.com/mjl-/mox/mox-"
|
||||
)
|
||||
|
||||
var (
|
||||
register = make(chan *Comm)
|
||||
unregister = make(chan *Comm)
|
||||
broadcast = make(chan changeReq)
|
||||
applied = make(chan removalApplied)
|
||||
)
|
||||
|
||||
type changeReq struct {
|
||||
@ -18,6 +28,11 @@ type changeReq struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
type removalApplied struct {
|
||||
Account *Account
|
||||
MsgIDs []int64
|
||||
}
|
||||
|
||||
type UID uint32 // IMAP UID.
|
||||
|
||||
// Change to mailboxes/subscriptions/messages in an account. One of the Change*
|
||||
@ -38,6 +53,7 @@ type ChangeRemoveUIDs struct {
|
||||
MailboxID int64
|
||||
UIDs []UID // Must be in increasing UID order, for IMAP.
|
||||
ModSeq ModSeq
|
||||
MsgIDs []int64 // Message.ID, for erasing, order does not necessarily correspond with UIDs!
|
||||
}
|
||||
|
||||
// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
|
||||
@ -118,61 +134,295 @@ type ChangeAnnotation struct {
|
||||
ModSeq ModSeq
|
||||
}
|
||||
|
||||
func messageEraser(donec chan struct{}, cleanc chan map[*Account][]int64) {
|
||||
log := mlog.New("store", nil)
|
||||
|
||||
for {
|
||||
clean, ok := <-cleanc
|
||||
if !ok {
|
||||
donec <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
for acc, ids := range clean {
|
||||
eraseMessages(log, acc, ids)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func eraseMessages(log mlog.Log, acc *Account, ids []int64) {
|
||||
// We are responsible for closing the accounts.
|
||||
defer func() {
|
||||
err := acc.Close()
|
||||
log.Check(err, "close account after erasing expunged messages", slog.String("account", acc.Name))
|
||||
}()
|
||||
|
||||
acc.Lock()
|
||||
defer acc.Unlock()
|
||||
err := acc.DB.Write(mox.Context, func(tx *bstore.Tx) error {
|
||||
du := DiskUsage{ID: 1}
|
||||
if err := tx.Get(&du); err != nil {
|
||||
return fmt.Errorf("get disk usage: %v", err)
|
||||
}
|
||||
var duchanged bool
|
||||
|
||||
for _, id := range ids {
|
||||
me := MessageErase{ID: id}
|
||||
if err := tx.Get(&me); err != nil {
|
||||
return fmt.Errorf("delete message erase record %d: %v", id, err)
|
||||
}
|
||||
|
||||
m := Message{ID: id}
|
||||
if err := tx.Get(&m); err != nil {
|
||||
return fmt.Errorf("get message %d to erase: %v", id, err)
|
||||
} else if !m.Expunged {
|
||||
return fmt.Errorf("message %d to erase is not marked expunged", id)
|
||||
}
|
||||
if !me.SkipUpdateDiskUsage {
|
||||
du.MessageSize -= m.Size
|
||||
duchanged = true
|
||||
}
|
||||
m.erase()
|
||||
if err := tx.Update(&m); err != nil {
|
||||
return fmt.Errorf("mark message %d erase in database: %v", id, err)
|
||||
}
|
||||
|
||||
if err := tx.Delete(&me); err != nil {
|
||||
return fmt.Errorf("deleting message erase record %d: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
if duchanged {
|
||||
if err := tx.Update(&du); err != nil {
|
||||
return fmt.Errorf("update disk usage after erasing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorx("erasing expunged messages", err,
|
||||
slog.String("account", acc.Name),
|
||||
slog.Any("ids", ids),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// We remove the files after the database commit. It's better to have the files
|
||||
// still around without being referenced from the database than references in the
|
||||
// database to non-existent files.
|
||||
for _, id := range ids {
|
||||
p := acc.MessagePath(id)
|
||||
err := os.Remove(p)
|
||||
log.Check(err, "removing expunged message file from disk", slog.String("path", p))
|
||||
}
|
||||
}
|
||||
|
||||
func switchboard(stopc, donec chan struct{}, cleanc chan map[*Account][]int64) {
|
||||
regs := map[*Account]map[*Comm]struct{}{}
|
||||
|
||||
// We don't remove message files or clear fields in the Message stored in the
|
||||
// database until all references, from all sessions have gone away. When we see
|
||||
// an expunge of a message, we count how many comms are active (i.e. how many
|
||||
// sessions reference the message). We require each of them to tell us they are no
|
||||
// longer referencing that message. Once we've seen that from all Comms, we remove
|
||||
// the on-disk file and the fields from the database.
|
||||
//
|
||||
// During the initial account open (when there are no active sessions/Comms yet,
|
||||
// and we open the message database file), the message erases will also be applied.
|
||||
//
|
||||
// When we add an account to eraseRefs, we increase the refcount, and we decrease
|
||||
// it again when removing the account.
|
||||
eraseRefs := map[*Account]map[int64]int{}
|
||||
|
||||
// We collect which messages can be erased per account, for sending them off to the
|
||||
// eraser goroutine. When an account is added to this map, its refcount is
|
||||
// increased. It is decreased again by the eraser goroutine.
|
||||
eraseIDs := map[*Account][]int64{}
|
||||
|
||||
addEraseIDs := func(acc *Account, ids ...int64) {
|
||||
if _, ok := eraseIDs[acc]; !ok {
|
||||
openAccounts.Lock()
|
||||
acc.nused++
|
||||
openAccounts.Unlock()
|
||||
}
|
||||
eraseIDs[acc] = append(eraseIDs[acc], ids...)
|
||||
}
|
||||
|
||||
decreaseEraseRefs := func(acc *Account, ids ...int64) {
|
||||
for _, id := range ids {
|
||||
v := eraseRefs[acc][id] - 1
|
||||
if v < 0 {
|
||||
metrics.PanicInc(metrics.Store) // For tests.
|
||||
panic(fmt.Sprintf("negative expunged message references for account %q, message id %d", acc.Name, id))
|
||||
}
|
||||
if v > 0 {
|
||||
eraseRefs[acc][id] = v
|
||||
continue
|
||||
}
|
||||
|
||||
addEraseIDs(acc, id)
|
||||
delete(eraseRefs[acc], id)
|
||||
if len(eraseRefs[acc]) > 0 {
|
||||
continue
|
||||
}
|
||||
delete(eraseRefs, acc)
|
||||
// Note: cannot use acc.Close, it tries to lock acc, but someone broadcasting to
|
||||
// this goroutine will likely have the lock.
|
||||
openAccounts.Lock()
|
||||
acc.nused--
|
||||
n := acc.nused
|
||||
openAccounts.Unlock()
|
||||
if n < 0 {
|
||||
metrics.PanicInc(metrics.Store) // For tests.
|
||||
panic(fmt.Sprintf("negative reference count for account %q, after removing message id %d", acc.Name, id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
// If we have messages to clean, try sending to the eraser.
|
||||
cc := cleanc
|
||||
if len(eraseIDs) == 0 {
|
||||
cc = nil
|
||||
}
|
||||
|
||||
select {
|
||||
case cc <- eraseIDs:
|
||||
eraseIDs = map[*Account][]int64{}
|
||||
|
||||
case c := <-register:
|
||||
if _, ok := regs[c.acc]; !ok {
|
||||
regs[c.acc] = map[*Comm]struct{}{}
|
||||
}
|
||||
regs[c.acc][c] = struct{}{}
|
||||
|
||||
case c := <-unregister:
|
||||
// Drain any ChangeRemoveUIDs references from the comm, to update our eraseRefs and
|
||||
// possibly queue messages for cleaning. No need to take a lock, the caller does
|
||||
// not use the comm anymore.
|
||||
for _, ch := range c.changes {
|
||||
rem, ok := ch.(ChangeRemoveUIDs)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
decreaseEraseRefs(c.acc, rem.MsgIDs...)
|
||||
}
|
||||
|
||||
delete(regs[c.acc], c)
|
||||
if len(regs[c.acc]) == 0 {
|
||||
delete(regs, c.acc)
|
||||
}
|
||||
|
||||
case chReq := <-broadcast:
|
||||
acc := chReq.acc
|
||||
|
||||
// Track references to removed messages in sessions (mostly IMAP) so we can pass
|
||||
// them to the eraser.
|
||||
for _, ch := range chReq.changes {
|
||||
rem, ok := ch.(ChangeRemoveUIDs)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
refs := len(regs[acc])
|
||||
if chReq.comm != nil {
|
||||
// The sender does not get this change and doesn't have to notify us of having
|
||||
// processed the removal.
|
||||
refs--
|
||||
}
|
||||
if refs <= 0 {
|
||||
addEraseIDs(acc, rem.MsgIDs...)
|
||||
continue
|
||||
}
|
||||
|
||||
// Comms/sessions still reference these messages, track how many.
|
||||
for _, id := range rem.MsgIDs {
|
||||
if _, ok := eraseRefs[acc]; !ok {
|
||||
openAccounts.Lock()
|
||||
acc.nused++
|
||||
openAccounts.Unlock()
|
||||
|
||||
eraseRefs[acc] = map[int64]int{}
|
||||
}
|
||||
if _, ok := eraseRefs[acc][id]; ok {
|
||||
metrics.PanicInc(metrics.Store) // For tests.
|
||||
panic(fmt.Sprintf("already have eraseRef for message id %d, account %q", id, acc.Name))
|
||||
}
|
||||
eraseRefs[acc][id] = refs
|
||||
}
|
||||
}
|
||||
|
||||
for c := range regs[acc] {
|
||||
// Do not send the broadcaster back their own changes. chReq.comm is nil if not
|
||||
// originating from a comm, so won't match in that case.
|
||||
if c == chReq.comm {
|
||||
continue
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.changes = append(c.changes, chReq.changes...)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
case c.Pending <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
chReq.done <- struct{}{}
|
||||
|
||||
case removal := <-applied:
|
||||
acc := removal.Account
|
||||
|
||||
// Decrease references of messages, queueing for erasure when the last reference
|
||||
// goes away.
|
||||
decreaseEraseRefs(acc, removal.MsgIDs...)
|
||||
|
||||
case <-stopc:
|
||||
// We may still have eraseRefs, messages currently referenced in a session. Those
|
||||
// messages will be erased when the database file is opened again in the future. If
|
||||
// we have messages ready to erase now, we'll do that first.
|
||||
|
||||
if len(eraseIDs) > 0 {
|
||||
cleanc <- eraseIDs
|
||||
eraseIDs = nil
|
||||
}
|
||||
|
||||
for acc := range eraseRefs {
|
||||
err := acc.Close()
|
||||
log := mlog.New("store", nil)
|
||||
log.Check(err, "closing account")
|
||||
}
|
||||
|
||||
close(cleanc) // Tell eraser to stop.
|
||||
donec <- struct{}{} // Say we are now done.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var switchboardBusy atomic.Bool
|
||||
|
||||
// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
|
||||
func Switchboard() (stop func()) {
|
||||
regs := map[*Account]map[*Comm]struct{}{}
|
||||
done := make(chan struct{})
|
||||
|
||||
if !switchboardBusy.CompareAndSwap(false, true) {
|
||||
panic("switchboard already busy")
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case c := <-register:
|
||||
if _, ok := regs[c.acc]; !ok {
|
||||
regs[c.acc] = map[*Comm]struct{}{}
|
||||
}
|
||||
regs[c.acc][c] = struct{}{}
|
||||
stopc := make(chan struct{})
|
||||
donec := make(chan struct{})
|
||||
cleanc := make(chan map[*Account][]int64)
|
||||
|
||||
case c := <-unregister:
|
||||
delete(regs[c.acc], c)
|
||||
if len(regs[c.acc]) == 0 {
|
||||
delete(regs, c.acc)
|
||||
}
|
||||
go messageEraser(donec, cleanc)
|
||||
go switchboard(stopc, donec, cleanc)
|
||||
|
||||
case chReq := <-broadcast:
|
||||
acc := chReq.acc
|
||||
for c := range regs[acc] {
|
||||
// Do not send the broadcaster back their own changes. chReq.comm is nil if not
|
||||
// originating from a comm, so won't match in that case.
|
||||
if c == chReq.comm {
|
||||
continue
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
c.changes = append(c.changes, chReq.changes...)
|
||||
c.Unlock()
|
||||
|
||||
select {
|
||||
case c.Pending <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
chReq.done <- struct{}{}
|
||||
|
||||
case <-done:
|
||||
done <- struct{}{}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
done <- struct{}{}
|
||||
<-done
|
||||
stopc <- struct{}{}
|
||||
|
||||
// Wait for switchboard and eraser goroutines to be ready.
|
||||
<-donec
|
||||
<-donec
|
||||
|
||||
if !switchboardBusy.CompareAndSwap(true, false) {
|
||||
panic("switchboard already unregistered?")
|
||||
}
|
||||
@ -225,6 +475,13 @@ func (c *Comm) Get() []Change {
|
||||
return l
|
||||
}
|
||||
|
||||
// RemovalSeen must be called by consumers when they have applied the removal to
|
||||
// their session. The switchboard tracks references of expunged messages, and
|
||||
// removes/cleans the message up when the last reference is gone.
|
||||
func (c *Comm) RemovalSeen(ch ChangeRemoveUIDs) {
|
||||
applied <- removalApplied{c.acc, ch.MsgIDs}
|
||||
}
|
||||
|
||||
// BroadcastChanges ensures changes are sent to all listeners on the accoount.
|
||||
func BroadcastChanges(acc *Account, ch []Change) {
|
||||
if len(ch) == 0 {
|
||||
|
@ -357,6 +357,8 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore
|
||||
m := Message{ID: mi.ID}
|
||||
if err := tx.Get(&m); err != nil {
|
||||
return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
|
||||
} else if m.Expunged {
|
||||
return fmt.Errorf("message %d marked as expunged", mi.ID)
|
||||
}
|
||||
if m.ThreadID != 0 {
|
||||
// ThreadID already set because this is a cyclic message. If we would assign a
|
||||
|
@ -155,11 +155,7 @@ func (a *Account) RetrainMessage(ctx context.Context, log mlog.Log, tx *bstore.T
|
||||
|
||||
// TrainMessage trains the junk filter based on the current m.Junk/m.Notjunk flags,
|
||||
// disregarding m.TrainedJunk and not updating that field.
|
||||
func (a *Account) TrainMessage(ctx context.Context, log mlog.Log, jf *junk.Filter, m Message) (bool, error) {
|
||||
if m.Junk == m.Notjunk {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (a *Account) TrainMessage(ctx context.Context, log mlog.Log, jf *junk.Filter, ham bool, m Message) (bool, error) {
|
||||
mr := a.MessageReader(m)
|
||||
defer func() {
|
||||
err := mr.Close()
|
||||
@ -178,5 +174,5 @@ func (a *Account) TrainMessage(ctx context.Context, log mlog.Log, jf *junk.Filte
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, jf.Train(ctx, m.Notjunk, words)
|
||||
return true, jf.Train(ctx, ham, words)
|
||||
}
|
||||
|
Reference in New Issue
Block a user