imapserver: implement PREVIEW extension (RFC 8970), and store previews in message database

We were already generating previews of plain text parts for the webmail
interface, but we didn't store them, so were generating the previews each time
messages were listed.

Now we store previews in the database for faster handling. And we also generate
previews for html parts if needed. We use the first part that has textual
content.

For IMAP, the previews can be requested by an IMAP client. When we get the
"LAZY" variant, which doesn't require us to generate a preview, we generate it
anyway, because it should be fast enough. So don't make clients first ask for
"PREVIEW (LAZY)" and then again a request for "PREVIEW".

We now also generate a preview when a message is added to the account. Except
for imports. It would slow us down, the previews aren't urgent, and they will
be generated on-demand at first-request.
This commit is contained in:
Mechiel Lukkien
2025-03-28 16:57:44 +01:00
parent 8b418a9ca2
commit aa631c604c
23 changed files with 735 additions and 187 deletions

View File

@ -166,12 +166,11 @@ type MessageEnvelope struct {
// message.Part, made for the needs of the message items in the message list.
// messages.
type MessageItem struct {
Message store.Message // Without ParsedBuf and MsgPrefix, for size.
Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
Envelope MessageEnvelope
Attachments []Attachment
IsSigned bool
IsEncrypted bool
FirstLine string // Of message body, for showing as preview.
MatchQuery bool // If message does not match query, it can still be included because of threading.
MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
}
@ -204,7 +203,6 @@ type ParsedMessage struct {
attachments []Attachment
isSigned bool
isEncrypted bool
firstLine string
}
// EventStart is the first message sent on an SSE connection, giving the client
@ -816,6 +814,9 @@ func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.R
xprocessChanges := func(changes []store.Change) {
taggedChanges := [][2]any{}
newPreviews := map[int64]string{}
defer storeNewPreviews(ctx, log, acc, newPreviews)
// We get a transaction first time we need it.
var xtx *bstore.Tx
defer func() {
@ -891,7 +892,7 @@ func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.R
continue
}
state := msgState{acc: acc, log: log}
state := msgState{acc: acc, log: log, newPreviews: newPreviews}
mi, err := messageItem(log, m, &state, xmoreHeaders())
state.clear()
xcheckf(ctx, err, "make messageitem")
@ -901,7 +902,7 @@ func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.R
if !thread && req.Query.Threading != ThreadOff {
err := ensureTx()
xcheckf(ctx, err, "transaction")
more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
mil = append(mil, more...)
v.threadIDs[m.ThreadID] = struct{}{}
@ -1265,18 +1266,55 @@ type msgResp struct {
pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
}
func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
if len(newPreviews) == 0 {
return
}
defer func() {
x := recover()
if x != nil {
log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
debug.PrintStack()
metrics.PanicInc(metrics.Store)
}
}()
err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
for id, preview := range newPreviews {
m := store.Message{ID: id}
if err := tx.Get(&m); err != nil {
return fmt.Errorf("get message with id %d to store preview: %w", id, err)
} else if !m.Expunged {
m.Preview = &preview
if err := tx.Update(&m); err != nil {
return fmt.Errorf("updating message with id %d: %v", m.ID, err)
}
}
}
return nil
})
log.Check(err, "saving new previews with messages")
}
// viewRequestTx executes a request (query with filters, pagination) by
// launching a new goroutine with queryMessages, receiving results as msgResp,
// and sending Event* to the SSE connection.
//
// It always closes tx.
func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
// Newly generated previews which we'll save when the operation is done.
newPreviews := map[int64]string{}
defer func() {
err := tx.Rollback()
log.Check(err, "rolling back query transaction")
donec <- v.Request.ID
// ctx can be canceled, we still want to store the previews.
storeNewPreviews(context.Background(), log, acc, newPreviews)
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
@ -1308,7 +1346,7 @@ func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
// todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
mrc := make(chan msgResp, 1)
go queryMessages(ctx, log, acc, tx, v, mrc)
go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
for {
select {
@ -1358,7 +1396,8 @@ func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
// It sends on msgc, with several types of messages: errors, whether the view is
// reset due to missing AnchorMessageID, and when the end of the view was reached
// and/or for a message.
func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
// newPreviews is filled with previews, the caller must save them.
func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
defer func() {
x := recover() // Should not happen, but don't take program down if it does.
if x != nil {
@ -1453,7 +1492,7 @@ func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
// implement reporting errors, or anything else, just a bool. So when making the
// filter functions, we give them a place to store parsed message state, and an
// error. We check the error during and after query execution.
state := msgState{acc: acc, log: log}
state := msgState{acc: acc, log: log, newPreviews: newPreviews}
defer state.clear()
flagfilter := query.flagFilterFn()
@ -1530,7 +1569,7 @@ func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
// expected to read first, that would be the first unread, which we'll get below
// when gathering the thread.
found = true
xpm, err := parsedMessage(log, m, &state, true, false, false)
xpm, err := parsedMessage(log, &m, &state, true, false, false)
if err != nil && errors.Is(err, message.ErrHeader) {
log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
} else if err != nil {
@ -1552,7 +1591,7 @@ func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
}
mil := []MessageItem{mi}
if query.Threading != ThreadOff {
more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders)
more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
if err != nil {
return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
}
@ -1621,7 +1660,7 @@ func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bs
}
}
func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string) ([]MessageItem, *ParsedMessage, error) {
func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
if m.ThreadID == 0 {
// If we would continue, FilterNonzero would fail because there are no non-zero fields.
return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
@ -1643,7 +1682,7 @@ func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m sto
var firstUnread bool
for _, tm := range tml {
err := func() error {
xstate := msgState{acc: acc, log: log}
xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
defer xstate.clear()
mi, err := messageItem(log, tm, &xstate, moreHeaders)
@ -1660,7 +1699,7 @@ func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m sto
if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
firstUnread = !tm.Seen
xpm, err := parsedMessage(log, tm, &xstate, true, false, false)
xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
if err != nil && errors.Is(err, message.ErrHeader) {
log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
} else if err != nil {
@ -1681,7 +1720,7 @@ func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m sto
if destMessageID == 0 && first && !m.Seen && !firstUnread {
xstate := msgState{acc: acc, log: log}
defer xstate.clear()
xpm, err := parsedMessage(log, m, &xstate, true, false, false)
xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
if err != nil && errors.Is(err, message.ErrHeader) {
log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
} else if err != nil {
@ -1706,6 +1745,11 @@ type msgState struct {
part *message.Part // Will be without Reader when msgr is nil.
msgr *store.MsgReader
log mlog.Log
// If not nil, messages will get their Preview field filled when nil, and message
// id and preview added to newPreviews, and saved in a separate write transaction
// when the operation is done.
newPreviews map[int64]string
}
func (ms *msgState) clear() {
@ -1714,7 +1758,7 @@ func (ms *msgState) clear() {
ms.log.Check(err, "closing message reader from state")
ms.msgr = nil
}
*ms = msgState{acc: ms.acc, err: ms.err, log: ms.log}
*ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
}
func (ms *msgState) ensureMsg(m store.Message) {
@ -1864,7 +1908,7 @@ var attachmentExtensions = map[string]AttachmentType{
func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
types := map[AttachmentType]bool{}
pm, err := parsedMessage(log, m, state, false, false, false)
pm, err := parsedMessage(log, &m, state, false, false, false)
if err != nil {
return nil, fmt.Errorf("parsing message for attachments: %w", err)
}