mirror of
https://github.com/mjl-/mox.git
synced 2025-06-27 21:48:16 +03:00
145 lines
3.3 KiB
Go
145 lines
3.3 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"runtime/debug"
|
|
|
|
"github.com/mjl-/bstore"
|
|
|
|
"github.com/mjl-/mox/message"
|
|
"github.com/mjl-/mox/metrics"
|
|
"github.com/mjl-/mox/mlog"
|
|
)
|
|
|
|
// We process messages in database transactions in batches. Otherwise, for accounts
|
|
// with many messages, we would get slowdown with many unwritten blocks in memory.
|
|
var reparseMessageBatchSize = 1000
|
|
|
|
// ReparseMessages reparses all messages, updating the MIME structure in
|
|
// Message.ParsedBuf.
|
|
//
|
|
// Typically called during automatic account upgrade, or manually.
|
|
//
|
|
// Returns total number of messages, all of which were reparsed.
|
|
func (a *Account) ReparseMessages(ctx context.Context, log mlog.Log) (int, error) {
|
|
type Result struct {
|
|
Message *Message
|
|
Buf []byte
|
|
Err error
|
|
}
|
|
|
|
// We'll have multiple goroutines that pick up messages to parse. The assumption is
|
|
// that reads of messages from disk are the bottleneck.
|
|
nprog := 10
|
|
work := make(chan *Message, nprog)
|
|
results := make(chan Result, nprog)
|
|
|
|
processMessage := func(m *Message) {
|
|
r := Result{Message: m}
|
|
|
|
defer func() {
|
|
x := recover()
|
|
if x != nil {
|
|
r.Err = fmt.Errorf("unhandled panic parsing message: %v", x)
|
|
log.Error("processMessage panic", slog.Any("err", x))
|
|
debug.PrintStack()
|
|
metrics.PanicInc(metrics.Store)
|
|
}
|
|
|
|
results <- r
|
|
}()
|
|
|
|
mr := a.MessageReader(*m)
|
|
p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
|
|
if err != nil {
|
|
// note: p is still set to a usable part
|
|
log.Debugx("reparsing message", err, slog.Int64("msgid", m.ID))
|
|
}
|
|
r.Buf, r.Err = json.Marshal(p)
|
|
}
|
|
|
|
// Start goroutines that parse messages.
|
|
for range nprog {
|
|
go func() {
|
|
for {
|
|
m, ok := <-work
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
processMessage(m)
|
|
}
|
|
}()
|
|
}
|
|
defer close(work) // Stop goroutines when done.
|
|
|
|
total := 0
|
|
var lastID int64 // Each db transaction starts after lastID.
|
|
for {
|
|
var n int
|
|
err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
|
|
var busy int
|
|
|
|
q := bstore.QueryTx[Message](tx)
|
|
q.FilterEqual("Expunged", false)
|
|
q.FilterGreater("ID", lastID)
|
|
q.Limit(reparseMessageBatchSize)
|
|
q.SortAsc("ID")
|
|
err := q.ForEach(func(m Message) error {
|
|
lastID = m.ID
|
|
n++
|
|
|
|
for {
|
|
select {
|
|
case work <- &m:
|
|
busy++
|
|
return nil
|
|
|
|
case r := <-results:
|
|
busy--
|
|
if r.Err != nil {
|
|
log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
|
|
} else {
|
|
r.Message.ParsedBuf = r.Buf
|
|
if err := tx.Update(r.Message); err != nil {
|
|
return fmt.Errorf("update message: %w", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("reparsing messages: %w", err)
|
|
}
|
|
|
|
// Drain remaining reparses.
|
|
for ; busy > 0; busy-- {
|
|
r := <-results
|
|
if r.Err != nil {
|
|
log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
|
|
} else {
|
|
r.Message.ParsedBuf = r.Buf
|
|
if err := tx.Update(r.Message); err != nil {
|
|
return fmt.Errorf("update message with id %d: %w", r.Message.ID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
total += n
|
|
if err != nil {
|
|
return total, fmt.Errorf("update messages with parsed mime structure: %w", err)
|
|
}
|
|
log.Debug("reparse message progress", slog.Int("total", total))
|
|
if n < reparseMessageBatchSize {
|
|
break
|
|
}
|
|
}
|
|
|
|
return total, nil
|
|
}
|