diff --git a/backup.go b/backup.go index 3f2ebfe..ae05335 100644 --- a/backup.go +++ b/backup.go @@ -27,7 +27,7 @@ import ( "github.com/mjl-/mox/tlsrptdb" ) -func backupctl(ctx context.Context, ctl *ctl) { +func xbackupctl(ctx context.Context, xctl *ctl) { /* protocol: > "backup" > destdir @@ -41,14 +41,14 @@ func backupctl(ctx context.Context, ctl *ctl) { // "src" or "dst" are incomplete paths relative to the source or destination data // directories. - dstDir := ctl.xread() - verbose := ctl.xread() == "verbose" + dstDir := xctl.xread() + verbose := xctl.xread() == "verbose" // Set when an error is encountered. At the end, we warn if set. var incomplete bool // We'll be writing output, and logging both to mox and the ctl stream. - writer := ctl.writer() + xwriter := xctl.writer() // Format easily readable output for the user. formatLog := func(prefix, text string, err error, attrs ...slog.Attr) []byte { @@ -67,10 +67,8 @@ func backupctl(ctx context.Context, ctl *ctl) { // Log an error to both the mox service as the user running "mox backup". pkglogx := func(prefix, text string, err error, attrs ...slog.Attr) { - ctl.log.Errorx(text, err, attrs...) - - _, werr := writer.Write(formatLog(prefix, text, err, attrs...)) - ctl.xcheck(werr, "write to ctl") + xctl.log.Errorx(text, err, attrs...) + xwriter.Write(formatLog(prefix, text, err, attrs...)) } // Log an error but don't mark backup as failed. @@ -87,10 +85,9 @@ func backupctl(ctx context.Context, ctl *ctl) { // If verbose is enabled, log to the cli command. Always log as info level. xvlog := func(text string, attrs ...slog.Attr) { - ctl.log.Info(text, attrs...) + xctl.log.Info(text, attrs...) if verbose { - _, werr := writer.Write(formatLog("", text, nil, attrs...)) - ctl.xcheck(werr, "write to ctl") + xwriter.Write(formatLog("", text, nil, attrs...)) } } @@ -164,12 +161,12 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if df != nil { err := df.Close() - ctl.log.Check(err, "closing file") + xctl.log.Check(err, "closing file") } }() defer func() { err := sf.Close() - ctl.log.Check(err, "closing file") + xctl.log.Check(err, "closing file") }() if _, err := io.Copy(df, sf); err != nil { return fmt.Errorf("copying config file %s to %s: %v", srcPath, destPath, err) @@ -213,7 +210,7 @@ func backupctl(ctx context.Context, ctl *ctl) { } defer func() { err := sf.Close() - ctl.log.Check(err, "closing source file") + xctl.log.Check(err, "closing source file") }() ensureDestDir(dstpath) @@ -225,7 +222,7 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if df != nil { err := df.Close() - ctl.log.Check(err, "closing destination file") + xctl.log.Check(err, "closing destination file") } }() if _, err := io.Copy(df, sf); err != nil { @@ -279,7 +276,7 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if df != nil { err := df.Close() - ctl.log.Check(err, "closing destination database file") + xctl.log.Check(err, "closing destination database file") } }() err = db.Read(ctx, func(tx *bstore.Tx) error { @@ -344,7 +341,7 @@ func backupctl(ctx context.Context, ctl *ctl) { } defer func() { err := sf.Close() - ctl.log.Check(err, "closing copied source file") + xctl.log.Check(err, "closing copied source file") }() df, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660) @@ -354,7 +351,7 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if df != nil { err := df.Close() - ctl.log.Check(err, "closing partial destination file") + xctl.log.Check(err, "closing partial destination file") } }() if _, err := io.Copy(df, sf); err != nil { @@ -371,7 +368,7 @@ func backupctl(ctx context.Context, ctl *ctl) { // Start making the backup. tmStart := time.Now() - ctl.log.Print("making backup", slog.String("destdir", dstDataDir)) + xctl.log.Print("making backup", slog.String("destdir", dstDataDir)) if err := os.MkdirAll(dstDataDir, 0770); err != nil { xerrx("creating destination data directory", err) @@ -405,7 +402,7 @@ func backupctl(ctx context.Context, ctl *ctl) { } dstdbpath := filepath.Join(dstDataDir, path) - opts := bstore.Options{MustExist: true, RegisterLogger: ctl.log.Logger} + opts := bstore.Options{MustExist: true, RegisterLogger: xctl.log.Logger} db, err := bstore.Open(ctx, dstdbpath, &opts, queue.DBTypes...) if err != nil { xerrx("open copied queue database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmQueue))) @@ -415,7 +412,7 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if db != nil { err := db.Close() - ctl.log.Check(err, "closing new queue db") + xctl.log.Check(err, "closing new queue db") } }() @@ -495,7 +492,7 @@ func backupctl(ctx context.Context, ctl *ctl) { backupAccount := func(acc *store.Account) { defer func() { err := acc.Close() - ctl.log.Check(err, "closing account") + xctl.log.Check(err, "closing account") }() tmAccount := time.Now() @@ -507,7 +504,7 @@ func backupctl(ctx context.Context, ctl *ctl) { // todo: should document/check not taking a rlock on account. // Copy junkfilter files, if configured. - if jf, _, err := acc.OpenJunkFilter(ctx, ctl.log); err != nil { + if jf, _, err := acc.OpenJunkFilter(ctx, xctl.log); err != nil { if !errors.Is(err, store.ErrNoJunkFilter) { xerrx("opening junk filter for account (not backed up)", err) } @@ -518,11 +515,11 @@ func backupctl(ctx context.Context, ctl *ctl) { bloompath := filepath.Join("accounts", acc.Name, "junkfilter.bloom") backupFile(bloompath) err := jf.Close() - ctl.log.Check(err, "closing junkfilter") + xctl.log.Check(err, "closing junkfilter") } dstdbpath := filepath.Join(dstDataDir, dbpath) - opts := bstore.Options{MustExist: true, RegisterLogger: ctl.log.Logger} + opts := bstore.Options{MustExist: true, RegisterLogger: xctl.log.Logger} db, err := bstore.Open(ctx, dstdbpath, &opts, store.DBTypes...) if err != nil { xerrx("open copied account database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmAccount))) @@ -532,7 +529,7 @@ func backupctl(ctx context.Context, ctl *ctl) { defer func() { if db != nil { err := db.Close() - ctl.log.Check(err, "close account database") + xctl.log.Check(err, "close account database") } }() @@ -635,7 +632,7 @@ func backupctl(ctx context.Context, ctl *ctl) { // account directories when handling "all other files" below. accounts := map[string]struct{}{} for _, accName := range mox.Conf.Accounts() { - acc, err := store.OpenAccount(ctl.log, accName, false) + acc, err := store.OpenAccount(xctl.log, accName, false) if err != nil { xerrx("opening account for copying (will try to copy as regular files later)", err, slog.String("account", accName)) continue @@ -691,11 +688,11 @@ func backupctl(ctx context.Context, ctl *ctl) { xvlog("backup finished", slog.Duration("duration", time.Since(tmStart))) - writer.xclose() + xwriter.xclose() if incomplete { - ctl.xwrite("errors were encountered during backup") + xctl.xwrite("errors were encountered during backup") } else { - ctl.xwriteok() + xctl.xwriteok() } } diff --git a/ctl.go b/ctl.go index ac23e1b..1f66441 100644 --- a/ctl.go +++ b/ctl.go @@ -124,10 +124,10 @@ func (c *ctl) xstreamto(dst io.Writer) { // Copy data from src to a stream to ctl. func (c *ctl) xstreamfrom(src io.Reader) { - w := c.writer() - _, err := io.Copy(w, src) + xw := c.writer() + _, err := io.Copy(xw, src) c.xcheck(err, "copying") - w.xclose() + xw.xclose() } // Writer returns an io.Writer for a data stream to ctl. @@ -171,6 +171,8 @@ type ctlwriter struct { log mlog.Log } +// Write implements io.Writer. Errors other than EOF are handled through behaviour +// for s.x, either a panic or log.Fatal. func (s *ctlwriter) Write(buf []byte) (int, error) { _, err := fmt.Fprintf(s.conn, "%d\n", len(buf)) s.xcheck(err, "write count") @@ -224,6 +226,8 @@ type ctlreader struct { log mlog.Log // If x is set, logging goes to log. } +// Read implements io.Reader. Errors other than EOF are handled through behaviour +// for s.x, either a panic or log.Fatal. func (s *ctlreader) Read(buf []byte) (N int, Err error) { if s.err != nil { return 0, s.err @@ -279,13 +283,13 @@ func servectl(ctx context.Context, cid int64, log mlog.Log, conn net.Conn, shutd log.Debug("ctl connection") var stop = struct{}{} // Sentinel value for panic and recover. - ctl := &ctl{conn: conn, x: stop, log: log} + xctl := &ctl{conn: conn, x: stop, log: log} defer func() { x := recover() if x == nil || x == stop { return } - log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd)) + log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", xctl.cmd)) debug.PrintStack() metrics.PanicInc(metrics.Ctl) }() @@ -295,23 +299,23 @@ func servectl(ctx context.Context, cid int64, log mlog.Log, conn net.Conn, shutd log.Check(err, "close ctl connection") }() - ctl.xwrite("ctlv0") + xctl.xwrite("ctlv0") for { - servectlcmd(ctx, ctl, cid, shutdown) + servectlcmd(ctx, xctl, cid, shutdown) } } -func xparseJSON(ctl *ctl, s string, v any) { +func xparseJSON(xctl *ctl, s string, v any) { dec := json.NewDecoder(strings.NewReader(s)) dec.DisallowUnknownFields() err := dec.Decode(v) - ctl.xcheck(err, "parsing from ctl as json") + xctl.xcheck(err, "parsing from ctl as json") } -func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { - log := ctl.log - cmd := ctl.xread() - ctl.cmd = cmd +func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) { + log := xctl.log + cmd := xctl.xread() + xctl.cmd = cmd log.Info("ctl command", slog.String("cmd", cmd)) switch cmd { case "stop": @@ -328,19 +332,19 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" */ - to := ctl.xread() + to := xctl.xread() a, _, addr, err := store.OpenEmail(log, to, false) - ctl.xcheck(err, "lookup destination address") + xctl.xcheck(err, "lookup destination address") msgFile, err := store.CreateMessageTemp(log, "ctl-deliver") - ctl.xcheck(err, "creating temporary message file") + xctl.xcheck(err, "creating temporary message file") defer store.CloseRemoveTempFile(log, msgFile, "deliver message") mw := message.NewWriter(msgFile) - ctl.xwriteok() + xctl.xwriteok() - ctl.xstreamto(mw) + xctl.xstreamto(mw) err = msgFile.Sync() - ctl.xcheck(err, "syncing message to storage") + xctl.xcheck(err, "syncing message to storage") m := store.Message{ Received: time.Now(), @@ -349,13 +353,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { a.WithWLock(func() { err := a.DeliverDestination(log, addr, &m, msgFile) - ctl.xcheck(err, "delivering message") + xctl.xcheck(err, "delivering message") log.Info("message delivered through ctl", slog.Any("to", to)) }) err = a.Close() - ctl.xcheck(err, "closing account") - ctl.xwriteok() + xctl.xcheck(err, "closing account") + xctl.xwriteok() case "setaccountpassword": /* protocol: @@ -365,11 +369,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error */ - account := ctl.xread() - pw := ctl.xread() + account := xctl.xread() + pw := xctl.xread() acc, err := store.OpenAccount(log, account, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { if acc != nil { err := acc.Close() @@ -378,11 +382,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { }() err = acc.SetPassword(log, pw) - ctl.xcheck(err, "setting password") + xctl.xcheck(err, "setting password") err = acc.Close() - ctl.xcheck(err, "closing account") + xctl.xcheck(err, "closing account") acc = nil - ctl.xwriteok() + xctl.xwriteok() case "queueholdruleslist": /* protocol: @@ -391,9 +395,9 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ l, err := queue.HoldRuleList(ctx) - ctl.xcheck(err, "listing hold rules") - ctl.xwriteok() - xw := ctl.writer() + xctl.xcheck(err, "listing hold rules") + xctl.xwriteok() + xw := xctl.writer() fmt.Fprintln(xw, "hold rules:") for _, hr := range l { var elems []string @@ -427,17 +431,17 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error */ var hr queue.HoldRule - hr.Account = ctl.xread() - senderdomstr := ctl.xread() - rcptdomstr := ctl.xread() + hr.Account = xctl.xread() + senderdomstr := xctl.xread() + rcptdomstr := xctl.xread() var err error hr.SenderDomain, err = dns.ParseDomain(senderdomstr) - ctl.xcheck(err, "parsing sender domain") + xctl.xcheck(err, "parsing sender domain") hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr) - ctl.xcheck(err, "parsing recipient domain") + xctl.xcheck(err, "parsing recipient domain") hr, err = queue.HoldRuleAdd(ctx, log, hr) - ctl.xcheck(err, "add hold rule") - ctl.xwriteok() + xctl.xcheck(err, "add hold rule") + xctl.xwriteok() case "queueholdrulesremove": /* protocol: @@ -445,12 +449,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > id < "ok" or error */ - idstr := ctl.xread() + idstr := xctl.xread() id, err := strconv.ParseInt(idstr, 10, 64) - ctl.xcheck(err, "parsing id") + xctl.xcheck(err, "parsing id") err = queue.HoldRuleRemove(ctx, log, id) - ctl.xcheck(err, "remove hold rule") - ctl.xwriteok() + xctl.xcheck(err, "remove hold rule") + xctl.xwriteok() case "queuelist": /* protocol: @@ -460,17 +464,17 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - filterline := ctl.xread() - sortline := ctl.xread() + filterline := xctl.xread() + sortline := xctl.xread() var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) var s queue.Sort - xparseJSON(ctl, sortline, &s) + xparseJSON(xctl, sortline, &s) qmsgs, err := queue.List(ctx, f, s) - ctl.xcheck(err, "listing queue") - ctl.xwriteok() + xctl.xcheck(err, "listing queue") + xctl.xwriteok() - xw := ctl.writer() + xw := xctl.writer() fmt.Fprintln(xw, "messages:") for _, qm := range qmsgs { var lastAttempt string @@ -493,14 +497,14 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() - hold := ctl.xread() == "true" + filterline := xctl.xread() + hold := xctl.xread() == "true" var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.HoldSet(ctx, f, hold) - ctl.xcheck(err, "setting on hold status for messages") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "setting on hold status for messages") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queueschedule": /* protocol: @@ -512,22 +516,22 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() - relnow := ctl.xread() - duration := ctl.xread() + filterline := xctl.xread() + relnow := xctl.xread() + duration := xctl.xread() var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) d, err := time.ParseDuration(duration) - ctl.xcheck(err, "parsing duration for next delivery attempt") + xctl.xcheck(err, "parsing duration for next delivery attempt") var count int if relnow == "" { count, err = queue.NextAttemptAdd(ctx, f, d) } else { count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d)) } - ctl.xcheck(err, "setting next delivery attempts in queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "setting next delivery attempts in queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuetransport": /* protocol: @@ -538,14 +542,14 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() - transport := ctl.xread() + filterline := xctl.xread() + transport := xctl.xread() var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.TransportSet(ctx, f, transport) - ctl.xcheck(err, "adding to next delivery attempts in queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "adding to next delivery attempts in queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuerequiretls": /* protocol: @@ -556,8 +560,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() - reqtls := ctl.xread() + filterline := xctl.xread() + reqtls := xctl.xread() var req *bool switch reqtls { case "": @@ -568,14 +572,14 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { v := false req = &v default: - ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value") + xctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value") } var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.RequireTLSSet(ctx, f, req) - ctl.xcheck(err, "setting tls requirements on messages in queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "setting tls requirements on messages in queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuefail": /* protocol: @@ -585,13 +589,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() + filterline := xctl.xread() var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.Fail(ctx, log, f) - ctl.xcheck(err, "marking messages from queue as failed") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "marking messages from queue as failed") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuedrop": /* protocol: @@ -601,13 +605,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() + filterline := xctl.xread() var f queue.Filter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.Drop(ctx, log, f) - ctl.xcheck(err, "dropping messages from queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "dropping messages from queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuedump": /* protocol: @@ -617,19 +621,19 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - idstr := ctl.xread() + idstr := xctl.xread() id, err := strconv.ParseInt(idstr, 10, 64) if err != nil { - ctl.xcheck(err, "parsing id") + xctl.xcheck(err, "parsing id") } mr, err := queue.OpenMessage(ctx, id) - ctl.xcheck(err, "opening message") + xctl.xcheck(err, "opening message") defer func() { err := mr.Close() log.Check(err, "closing message from queue") }() - ctl.xwriteok() - ctl.xstreamfrom(mr) + xctl.xwriteok() + xctl.xstreamfrom(mr) case "queueretiredlist": /* protocol: @@ -639,17 +643,17 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - filterline := ctl.xread() - sortline := ctl.xread() + filterline := xctl.xread() + sortline := xctl.xread() var f queue.RetiredFilter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) var s queue.RetiredSort - xparseJSON(ctl, sortline, &s) + xparseJSON(xctl, sortline, &s) qmsgs, err := queue.RetiredList(ctx, f, s) - ctl.xcheck(err, "listing retired queue") - ctl.xwriteok() + xctl.xcheck(err, "listing retired queue") + xctl.xwriteok() - xw := ctl.writer() + xw := xctl.writer() fmt.Fprintln(xw, "retired messages:") for _, qm := range qmsgs { var lastAttempt string @@ -676,23 +680,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - idstr := ctl.xread() + idstr := xctl.xread() id, err := strconv.ParseInt(idstr, 10, 64) if err != nil { - ctl.xcheck(err, "parsing id") + xctl.xcheck(err, "parsing id") } l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{}) - ctl.xcheck(err, "getting retired messages") + xctl.xcheck(err, "getting retired messages") if len(l) == 0 { - ctl.xcheck(errors.New("not found"), "getting retired message") + xctl.xcheck(errors.New("not found"), "getting retired message") } m := l[0] - ctl.xwriteok() - xw := ctl.writer() + xctl.xwriteok() + xw := xctl.writer() enc := json.NewEncoder(xw) enc.SetIndent("", "\t") err = enc.Encode(m) - ctl.xcheck(err, "encode retired message") + xctl.xcheck(err, "encode retired message") xw.xclose() case "queuehooklist": @@ -703,17 +707,17 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - filterline := ctl.xread() - sortline := ctl.xread() + filterline := xctl.xread() + sortline := xctl.xread() var f queue.HookFilter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) var s queue.HookSort - xparseJSON(ctl, sortline, &s) + xparseJSON(xctl, sortline, &s) hooks, err := queue.HookList(ctx, f, s) - ctl.xcheck(err, "listing webhooks") - ctl.xwriteok() + xctl.xcheck(err, "listing webhooks") + xctl.xwriteok() - xw := ctl.writer() + xw := xctl.writer() fmt.Fprintln(xw, "webhooks:") for _, h := range hooks { var lastAttempt string @@ -737,22 +741,22 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() - relnow := ctl.xread() - duration := ctl.xread() + filterline := xctl.xread() + relnow := xctl.xread() + duration := xctl.xread() var f queue.HookFilter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) d, err := time.ParseDuration(duration) - ctl.xcheck(err, "parsing duration for next delivery attempt") + xctl.xcheck(err, "parsing duration for next delivery attempt") var count int if relnow == "" { count, err = queue.HookNextAttemptAdd(ctx, f, d) } else { count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d)) } - ctl.xcheck(err, "setting next delivery attempts in queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "setting next delivery attempts in queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuehookcancel": /* protocol: @@ -762,13 +766,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < count */ - filterline := ctl.xread() + filterline := xctl.xread() var f queue.HookFilter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) count, err := queue.HookCancel(ctx, log, f) - ctl.xcheck(err, "canceling webhooks in queue") - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", count)) + xctl.xcheck(err, "canceling webhooks in queue") + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", count)) case "queuehookprint": /* protocol: @@ -777,23 +781,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - idstr := ctl.xread() + idstr := xctl.xread() id, err := strconv.ParseInt(idstr, 10, 64) if err != nil { - ctl.xcheck(err, "parsing id") + xctl.xcheck(err, "parsing id") } l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{}) - ctl.xcheck(err, "getting webhooks") + xctl.xcheck(err, "getting webhooks") if len(l) == 0 { - ctl.xcheck(errors.New("not found"), "getting webhook") + xctl.xcheck(errors.New("not found"), "getting webhook") } h := l[0] - ctl.xwriteok() - xw := ctl.writer() + xctl.xwriteok() + xw := xctl.writer() enc := json.NewEncoder(xw) enc.SetIndent("", "\t") err = enc.Encode(h) - ctl.xcheck(err, "encode webhook") + xctl.xcheck(err, "encode webhook") xw.xclose() case "queuehookretiredlist": @@ -804,17 +808,17 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - filterline := ctl.xread() - sortline := ctl.xread() + filterline := xctl.xread() + sortline := xctl.xread() var f queue.HookRetiredFilter - xparseJSON(ctl, filterline, &f) + xparseJSON(xctl, filterline, &f) var s queue.HookRetiredSort - xparseJSON(ctl, sortline, &s) + xparseJSON(xctl, sortline, &s) l, err := queue.HookRetiredList(ctx, f, s) - ctl.xcheck(err, "listing retired webhooks") - ctl.xwriteok() + xctl.xcheck(err, "listing retired webhooks") + xctl.xwriteok() - xw := ctl.writer() + xw := xctl.writer() fmt.Fprintln(xw, "retired webhooks:") for _, h := range l { var lastAttempt string @@ -839,23 +843,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - idstr := ctl.xread() + idstr := xctl.xread() id, err := strconv.ParseInt(idstr, 10, 64) if err != nil { - ctl.xcheck(err, "parsing id") + xctl.xcheck(err, "parsing id") } l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{}) - ctl.xcheck(err, "getting retired webhooks") + xctl.xcheck(err, "getting retired webhooks") if len(l) == 0 { - ctl.xcheck(errors.New("not found"), "getting retired webhook") + xctl.xcheck(errors.New("not found"), "getting retired webhook") } h := l[0] - ctl.xwriteok() - xw := ctl.writer() + xctl.xwriteok() + xw := xctl.writer() enc := json.NewEncoder(xw) enc.SetIndent("", "\t") err = enc.Encode(h) - ctl.xcheck(err, "encode retired webhook") + xctl.xcheck(err, "encode retired webhook") xw.xclose() case "queuesuppresslist": @@ -866,11 +870,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - account := ctl.xread() + account := xctl.xread() l, err := queue.SuppressionList(ctx, account) - ctl.xcheck(err, "listing suppressions") - ctl.xwriteok() - xw := ctl.writer() + xctl.xcheck(err, "listing suppressions") + xctl.xwriteok() + xw := xctl.writer() fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):") for _, sup := range l { manual := "No" @@ -892,22 +896,22 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error */ - account := ctl.xread() - address := ctl.xread() + account := xctl.xread() + address := xctl.xread() _, ok := mox.Conf.Account(account) if !ok { - ctl.xcheck(errors.New("unknown account"), "looking up account") + xctl.xcheck(errors.New("unknown account"), "looking up account") } addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") sup := webapi.Suppression{ Account: account, Manual: true, Reason: "added through mox cli", } err = queue.SuppressionAdd(ctx, addr.Path(), &sup) - ctl.xcheck(err, "adding suppression") - ctl.xwriteok() + xctl.xcheck(err, "adding suppression") + xctl.xwriteok() case "queuesuppressremove": /* protocol: @@ -917,13 +921,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error */ - account := ctl.xread() - address := ctl.xread() + account := xctl.xread() + address := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") err = queue.SuppressionRemove(ctx, account, addr.Path()) - ctl.xcheck(err, "removing suppression") - ctl.xwriteok() + xctl.xcheck(err, "removing suppression") + xctl.xwriteok() case "queuesuppresslookup": /* protocol: @@ -934,20 +938,20 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - account := ctl.xread() - address := ctl.xread() + account := xctl.xread() + address := xctl.xread() if account != "" { _, ok := mox.Conf.Account(account) if !ok { - ctl.xcheck(errors.New("unknown account"), "looking up account") + xctl.xcheck(errors.New("unknown account"), "looking up account") } } addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") sup, err := queue.SuppressionLookup(ctx, account, addr.Path()) - ctl.xcheck(err, "looking up suppression") - ctl.xwriteok() - xw := ctl.writer() + xctl.xcheck(err, "looking up suppression") + xctl.xwriteok() + xw := xctl.writer() if sup == nil { fmt.Fprintln(xw, "not present") } else { @@ -961,7 +965,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { case "importmaildir", "importmbox": mbox := cmd == "importmbox" - importctl(ctx, ctl, mbox) + ximportctl(ctx, xctl, mbox) case "domainadd": /* protocol: @@ -973,23 +977,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error */ var disabled bool - switch s := ctl.xread(); s { + switch s := xctl.xread(); s { case "true": disabled = true case "false": disabled = false default: - ctl.xcheck(fmt.Errorf("invalid value %q", s), "parsing disabled boolean") + xctl.xcheck(fmt.Errorf("invalid value %q", s), "parsing disabled boolean") } - domain := ctl.xread() - account := ctl.xread() - localpart := ctl.xread() + domain := xctl.xread() + account := xctl.xread() + localpart := xctl.xread() d, err := dns.ParseDomain(domain) - ctl.xcheck(err, "parsing domain") + xctl.xcheck(err, "parsing domain") err = admin.DomainAdd(ctx, disabled, d, account, smtp.Localpart(localpart)) - ctl.xcheck(err, "adding domain") - ctl.xwriteok() + xctl.xcheck(err, "adding domain") + xctl.xwriteok() case "domainrm": /* protocol: @@ -997,12 +1001,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > domain < "ok" or error */ - domain := ctl.xread() + domain := xctl.xread() d, err := dns.ParseDomain(domain) - ctl.xcheck(err, "parsing domain") + xctl.xcheck(err, "parsing domain") err = admin.DomainRemove(ctx, d) - ctl.xcheck(err, "removing domain") - ctl.xwriteok() + xctl.xcheck(err, "removing domain") + xctl.xwriteok() case "domaindisabled": /* protocol: @@ -1011,22 +1015,22 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > domain < "ok" or error */ - domain := ctl.xread() + domain := xctl.xread() var disabled bool - switch s := ctl.xread(); s { + switch s := xctl.xread(); s { case "true": disabled = true case "false": disabled = false default: - ctl.xerror("bad boolean value") + xctl.xerror("bad boolean value") } err := admin.DomainSave(ctx, domain, func(d *config.Domain) error { d.Disabled = disabled return nil }) - ctl.xcheck(err, "saving domain") - ctl.xwriteok() + xctl.xcheck(err, "saving domain") + xctl.xwriteok() case "accountadd": /* protocol: @@ -1035,11 +1039,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > address < "ok" or error */ - account := ctl.xread() - address := ctl.xread() + account := xctl.xread() + address := xctl.xread() err := admin.AccountAdd(ctx, account, address) - ctl.xcheck(err, "adding account") - ctl.xwriteok() + xctl.xcheck(err, "adding account") + xctl.xwriteok() case "accountrm": /* protocol: @@ -1047,10 +1051,10 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > account < "ok" or error */ - account := ctl.xread() + account := xctl.xread() err := admin.AccountRemove(ctx, account) - ctl.xcheck(err, "removing account") - ctl.xwriteok() + xctl.xcheck(err, "removing account") + xctl.xwriteok() case "accountdisabled": /* protocol: @@ -1059,11 +1063,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > message (if empty, then enabled) < "ok" or error */ - account := ctl.xread() - message := ctl.xread() + account := xctl.xread() + message := xctl.xread() acc, err := store.OpenAccount(log, account, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { err := acc.Close() log.Check(err, "closing account") @@ -1072,12 +1076,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { err = admin.AccountSave(ctx, account, func(acc *config.Account) { acc.LoginDisabled = message }) - ctl.xcheck(err, "saving account") + xctl.xcheck(err, "saving account") - err = acc.SessionsClear(ctx, ctl.log) - ctl.xcheck(err, "clearing active web sessions") + err = acc.SessionsClear(ctx, xctl.log) + xctl.xcheck(err, "clearing active web sessions") - ctl.xwriteok() + xctl.xwriteok() case "accountenable": /* protocol: @@ -1085,12 +1089,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > account < "ok" or error */ - account := ctl.xread() + account := xctl.xread() err := admin.AccountSave(ctx, account, func(acc *config.Account) { acc.LoginDisabled = "" }) - ctl.xcheck(err, "enabling account") - ctl.xwriteok() + xctl.xcheck(err, "enabling account") + xctl.xwriteok() case "tlspubkeylist": /* protocol: @@ -1099,11 +1103,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error < stream */ - accountOpt := ctl.xread() + accountOpt := xctl.xread() tlspubkeys, err := store.TLSPublicKeyList(ctx, accountOpt) - ctl.xcheck(err, "list tls public keys") - ctl.xwriteok() - xw := ctl.writer() + xctl.xcheck(err, "list tls public keys") + xctl.xwriteok() + xw := xctl.writer() fmt.Fprintf(xw, "# fingerprint, type, name, account, login address, no imap preauth (%d)\n", len(tlspubkeys)) for _, k := range tlspubkeys { fmt.Fprintf(xw, "%s\t%s\t%q\t%s\t%s\t%v\n", k.Fingerprint, k.Type, k.Name, k.Account, k.LoginAddress, k.NoIMAPPreauth) @@ -1122,16 +1126,16 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < noimappreauth (true/false) < stream (certder) */ - fp := ctl.xread() + fp := xctl.xread() tlspubkey, err := store.TLSPublicKeyGet(ctx, fp) - ctl.xcheck(err, "looking tls public key") - ctl.xwriteok() - ctl.xwrite(tlspubkey.Type) - ctl.xwrite(tlspubkey.Name) - ctl.xwrite(tlspubkey.Account) - ctl.xwrite(tlspubkey.LoginAddress) - ctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth)) - ctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER)) + xctl.xcheck(err, "looking tls public key") + xctl.xwriteok() + xctl.xwrite(tlspubkey.Type) + xctl.xwrite(tlspubkey.Name) + xctl.xwrite(tlspubkey.Account) + xctl.xwrite(tlspubkey.LoginAddress) + xctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth)) + xctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER)) case "tlspubkeyadd": /* protocol: @@ -1142,32 +1146,32 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > stream (certder) < "ok" or error */ - loginAddress := ctl.xread() - name := ctl.xread() - noimappreauth := ctl.xread() + loginAddress := xctl.xread() + name := xctl.xread() + noimappreauth := xctl.xread() if noimappreauth != "true" && noimappreauth != "false" { - ctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth") + xctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth") } var b bytes.Buffer - ctl.xstreamto(&b) + xctl.xstreamto(&b) tlspubkey, err := store.ParseTLSPublicKeyCert(b.Bytes()) - ctl.xcheck(err, "parsing certificate") + xctl.xcheck(err, "parsing certificate") if name != "" { tlspubkey.Name = name } - acc, _, _, err := store.OpenEmail(ctl.log, loginAddress, false) - ctl.xcheck(err, "open account for address") + acc, _, _, err := store.OpenEmail(xctl.log, loginAddress, false) + xctl.xcheck(err, "open account for address") defer func() { err := acc.Close() - ctl.log.Check(err, "close account") + xctl.log.Check(err, "close account") }() tlspubkey.Account = acc.Name tlspubkey.LoginAddress = loginAddress tlspubkey.NoIMAPPreauth = noimappreauth == "true" err = store.TLSPublicKeyAdd(ctx, &tlspubkey) - ctl.xcheck(err, "adding tls public key") - ctl.xwriteok() + xctl.xcheck(err, "adding tls public key") + xctl.xwriteok() case "tlspubkeyrm": /* protocol: @@ -1175,10 +1179,10 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > fingerprint < "ok" or error */ - fp := ctl.xread() + fp := xctl.xread() err := store.TLSPublicKeyRemove(ctx, fp) - ctl.xcheck(err, "removing tls public key") - ctl.xwriteok() + xctl.xcheck(err, "removing tls public key") + xctl.xwriteok() case "addressadd": /* protocol: @@ -1187,11 +1191,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > account < "ok" or error */ - address := ctl.xread() - account := ctl.xread() + address := xctl.xread() + account := xctl.xread() err := admin.AddressAdd(ctx, address, account) - ctl.xcheck(err, "adding address") - ctl.xwriteok() + xctl.xcheck(err, "adding address") + xctl.xwriteok() case "addressrm": /* protocol: @@ -1199,10 +1203,10 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > address < "ok" or error */ - address := ctl.xread() + address := xctl.xread() err := admin.AddressRemove(ctx, address) - ctl.xcheck(err, "removing address") - ctl.xwriteok() + xctl.xcheck(err, "removing address") + xctl.xwriteok() case "aliaslist": /* protocol: @@ -1211,21 +1215,21 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error < stream */ - domain := ctl.xread() + domain := xctl.xread() d, err := dns.ParseDomain(domain) - ctl.xcheck(err, "parsing domain") + xctl.xcheck(err, "parsing domain") dc, ok := mox.Conf.Domain(d) if !ok { - ctl.xcheck(errors.New("no such domain"), "listing aliases") + xctl.xcheck(errors.New("no such domain"), "listing aliases") } - ctl.xwriteok() - w := ctl.writer() + xctl.xwriteok() + xw := xctl.writer() for _, a := range dc.Aliases { lp, err := smtp.ParseLocalpart(a.LocalpartStr) - ctl.xcheck(err, "parsing alias localpart") - fmt.Fprintln(w, smtp.NewAddress(lp, a.Domain).Pack(true)) + xctl.xcheck(err, "parsing alias localpart") + fmt.Fprintln(xw, smtp.NewAddress(lp, a.Domain).Pack(true)) } - w.xclose() + xw.xclose() case "aliasprint": /* protocol: @@ -1234,23 +1238,23 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error < stream */ - address := ctl.xread() + address := xctl.xread() _, alias, ok := mox.Conf.AccountDestination(address) if !ok { - ctl.xcheck(errors.New("no such address"), "looking up alias") + xctl.xcheck(errors.New("no such address"), "looking up alias") } else if alias == nil { - ctl.xcheck(errors.New("address not an alias"), "looking up alias") + xctl.xcheck(errors.New("address not an alias"), "looking up alias") } - ctl.xwriteok() - w := ctl.writer() - fmt.Fprintf(w, "# postpublic %v\n", alias.PostPublic) - fmt.Fprintf(w, "# listmembers %v\n", alias.ListMembers) - fmt.Fprintf(w, "# allowmsgfrom %v\n", alias.AllowMsgFrom) - fmt.Fprintln(w, "# members:") + xctl.xwriteok() + xw := xctl.writer() + fmt.Fprintf(xw, "# postpublic %v\n", alias.PostPublic) + fmt.Fprintf(xw, "# listmembers %v\n", alias.ListMembers) + fmt.Fprintf(xw, "# allowmsgfrom %v\n", alias.AllowMsgFrom) + fmt.Fprintln(xw, "# members:") for _, a := range alias.Addresses { - fmt.Fprintln(w, a) + fmt.Fprintln(xw, a) } - w.xclose() + xw.xclose() case "aliasadd": /* protocol: @@ -1259,15 +1263,15 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > json alias < "ok" or error */ - address := ctl.xread() - line := ctl.xread() + address := xctl.xread() + line := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") var alias config.Alias - xparseJSON(ctl, line, &alias) + xparseJSON(xctl, line, &alias) err = admin.AliasAdd(ctx, addr, alias) - ctl.xcheck(err, "adding alias") - ctl.xwriteok() + xctl.xcheck(err, "adding alias") + xctl.xwriteok() case "aliasupdate": /* protocol: @@ -1278,12 +1282,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > "true" or "false" for allowmsgfrom < "ok" or error */ - address := ctl.xread() - postpublic := ctl.xread() - listmembers := ctl.xread() - allowmsgfrom := ctl.xread() + address := xctl.xread() + postpublic := xctl.xread() + listmembers := xctl.xread() + allowmsgfrom := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") err = admin.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error { a, ok := d.Aliases[addr.Localpart.String()] if !ok { @@ -1313,8 +1317,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { d.Aliases[addr.Localpart.String()] = a return nil }) - ctl.xcheck(err, "saving alias") - ctl.xwriteok() + xctl.xcheck(err, "saving alias") + xctl.xwriteok() case "aliasrm": /* protocol: @@ -1322,12 +1326,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > alias < "ok" or error */ - address := ctl.xread() + address := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") err = admin.AliasRemove(ctx, addr) - ctl.xcheck(err, "removing alias") - ctl.xwriteok() + xctl.xcheck(err, "removing alias") + xctl.xwriteok() case "aliasaddaddr": /* protocol: @@ -1336,15 +1340,15 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > addresses as json < "ok" or error */ - address := ctl.xread() - line := ctl.xread() + address := xctl.xread() + line := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") var addresses []string - xparseJSON(ctl, line, &addresses) + xparseJSON(xctl, line, &addresses) err = admin.AliasAddressesAdd(ctx, addr, addresses) - ctl.xcheck(err, "adding addresses to alias") - ctl.xwriteok() + xctl.xcheck(err, "adding addresses to alias") + xctl.xwriteok() case "aliasrmaddr": /* protocol: @@ -1353,15 +1357,15 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > addresses as json < "ok" or error */ - address := ctl.xread() - line := ctl.xread() + address := xctl.xread() + line := xctl.xread() addr, err := smtp.ParseAddress(address) - ctl.xcheck(err, "parsing address") + xctl.xcheck(err, "parsing address") var addresses []string - xparseJSON(ctl, line, &addresses) + xparseJSON(xctl, line, &addresses) err = admin.AliasAddressesRemove(ctx, addr, addresses) - ctl.xcheck(err, "removing addresses to alias") - ctl.xwriteok() + xctl.xcheck(err, "removing addresses to alias") + xctl.xwriteok() case "loglevels": /* protocol: @@ -1369,7 +1373,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" < stream */ - ctl.xwriteok() + xctl.xwriteok() l := mox.Conf.LogLevels() keys := []string{} for k := range l { @@ -1384,7 +1388,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { } s += ks + ": " + mlog.LevelStrings[l[k]] + "\n" } - ctl.xstreamfrom(strings.NewReader(s)) + xctl.xstreamfrom(strings.NewReader(s)) case "setloglevels": /* protocol: @@ -1393,18 +1397,18 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > level (if empty, log level for pkg will be unset) < "ok" or error */ - pkg := ctl.xread() - levelstr := ctl.xread() + pkg := xctl.xread() + levelstr := xctl.xread() if levelstr == "" { mox.Conf.LogLevelRemove(log, pkg) } else { level, ok := mlog.Levels[levelstr] if !ok { - ctl.xerror("bad level") + xctl.xerror("bad level") } mox.Conf.LogLevelSet(log, pkg, level) } - ctl.xwriteok() + xctl.xwriteok() case "retrain": /* protocol: @@ -1412,11 +1416,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { > account or empty < "ok" or error */ - account := ctl.xread() + account := xctl.xread() xretrain := func(name string) { acc, err := store.OpenAccount(log, name, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { if acc != nil { err := acc.Close() @@ -1429,7 +1433,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { acc.WithWLock(func() { conf, _ := acc.Conf() if conf.JunkFilter == nil { - ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter") + xctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter") } // Remove existing junk filter files. @@ -1443,7 +1447,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { // Open junk filter, this creates new files. jf, _, err := acc.OpenJunkFilter(ctx, log) - ctl.xcheck(err, "open new junk filter") + xctl.xcheck(err, "open new junk filter") defer func() { if jf == nil { return @@ -1475,13 +1479,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { return err }) }) - ctl.xcheck(err, "training messages") + xctl.xcheck(err, "training messages") log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained)) // Close junk filter, marking success. err = jf.Close() jf = nil - ctl.xcheck(err, "closing junk filter") + xctl.xcheck(err, "closing junk filter") }) } @@ -1492,7 +1496,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { } else { xretrain(account) } - ctl.xwriteok() + xctl.xwriteok() case "recalculatemailboxcounts": /* protocol: @@ -1501,18 +1505,18 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok" or error < stream */ - account := ctl.xread() + account := xctl.xread() acc, err := store.OpenAccount(log, account, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { if acc != nil { err := acc.Close() log.Check(err, "closing account after recalculating mailbox counts") } }() - ctl.xwriteok() + xctl.xwriteok() - w := ctl.writer() + xw := xctl.writer() acc.WithWLock(func() { var changes []store.Change @@ -1526,8 +1530,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { totalSize += mc.Size if mc != mb.MailboxCounts { - _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts) mb.HaveCounts = true mb.MailboxCounts = mc if err := tx.Update(&mb); err != nil { @@ -1546,8 +1549,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { return fmt.Errorf("get disk usage: %v", err) } if du.MessageSize != totalSize { - _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize) du.MessageSize = totalSize if err := tx.Update(&du); err != nil { return fmt.Errorf("update disk usage: %v", err) @@ -1555,11 +1557,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { } return nil }) - ctl.xcheck(err, "write transaction for mailbox counts") + xctl.xcheck(err, "write transaction for mailbox counts") store.BroadcastChanges(acc, changes) }) - w.xclose() + xw.xclose() case "fixmsgsize": /* protocol: @@ -1569,16 +1571,16 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - accountOpt := ctl.xread() - ctl.xwriteok() - w := ctl.writer() + accountOpt := xctl.xread() + xctl.xwriteok() + xw := xctl.writer() var foundProblem bool const batchSize = 10000 xfixmsgsize := func(accName string) { acc, err := store.OpenAccount(log, accName, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { err := acc.Close() log.Check(err, "closing account after fixing message sizes") @@ -1608,11 +1610,9 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { if err != nil { mb := store.Mailbox{ID: m.MailboxID} if xerr := tx.Get(&mb); xerr != nil { - _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr) - ctl.xcheck(werr, "write") + fmt.Fprintf(xw, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr) } - _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err) - ctl.xcheck(werr, "write") + fmt.Fprintf(xw, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err) return nil } filesize := st.Size() @@ -1625,16 +1625,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { mb := store.Mailbox{ID: m.MailboxID} if err := tx.Get(&mb); err != nil { - _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err) - ctl.xcheck(werr, "write") + fmt.Fprintf(xw, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err) return nil } if mb.Expunged { - _, err := fmt.Fprintf(w, "message %d is in expunged mailbox %q (id %d) (continuing)\n", m.ID, mb.Name, mb.ID) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "message %d is in expunged mailbox %q (id %d) (continuing)\n", m.ID, mb.Name, mb.ID) } - _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize) // We assume that the original message size was accounted as stored in the mailbox // total size. If this isn't correct, the user can always run @@ -1651,8 +1648,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { mr := acc.MessageReader(m) part, err := message.EnsurePart(log.Logger, false, mr, m.Size) if err != nil { - _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err) - ctl.xcheck(werr, "write") + fmt.Fprintf(xw, "parsing message %d again: %v (continuing)\n", m.ID, err) } m.ParsedBuf, err = json.Marshal(part) if err != nil { @@ -1665,7 +1661,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { return nil }) }) - ctl.xcheck(err, "find and fix wrong message sizes") + xctl.xcheck(err, "find and fix wrong message sizes") var changes []store.Change for _, mb := range mailboxCounts { @@ -1677,8 +1673,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { break } } - _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "%d message size(s) fixed for account %s\n", total, accName) } if accountOpt != "" { @@ -1689,17 +1684,15 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { if i > 0 { line = "\n" } - _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "%sFixing message sizes in account %s...\n", line, accName) xfixmsgsize(accName) } } if foundProblem { - _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n") - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n") } - w.xclose() + xw.xclose() case "reparse": /* protocol: @@ -1709,15 +1702,15 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - accountOpt := ctl.xread() - ctl.xwriteok() - w := ctl.writer() + accountOpt := xctl.xread() + xctl.xwriteok() + xw := xctl.writer() const batchSize = 100 xreparseAccount := func(accName string) { acc, err := store.OpenAccount(log, accName, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { err := acc.Close() log.Check(err, "closing account after reparsing messages") @@ -1739,8 +1732,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { mr := acc.MessageReader(m) p, err := message.EnsurePart(log.Logger, false, mr, m.Size) if err != nil { - _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "parsing message %d: %v (continuing)\n", m.ID, err) } m.ParsedBuf, err = json.Marshal(p) if err != nil { @@ -1755,13 +1747,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { }) }) - ctl.xcheck(err, "update messages with parsed mime structure") + xctl.xcheck(err, "update messages with parsed mime structure") if n < batchSize { break } } - _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "%d message(s) reparsed for account %s\n", total, accName) } if accountOpt != "" { @@ -1772,12 +1763,11 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { if i > 0 { line = "\n" } - _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "%sReparsing account %s...\n", line, accName) xreparseAccount(accName) } } - w.xclose() + xw.xclose() case "reassignthreads": /* protocol: @@ -1787,13 +1777,13 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < stream */ - accountOpt := ctl.xread() - ctl.xwriteok() - w := ctl.writer() + accountOpt := xctl.xread() + xctl.xwriteok() + xw := xctl.writer() xreassignThreads := func(accName string) { acc, err := store.OpenAccount(log, accName, false) - ctl.xcheck(err, "open account") + xctl.xcheck(err, "open account") defer func() { err := acc.Close() log.Check(err, "closing account after reassigning threads") @@ -1801,23 +1791,21 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { // We don't want to step on an existing upgrade process. err = acc.ThreadingWait(log) - ctl.xcheck(err, "waiting for threading upgrade to finish") + xctl.xcheck(err, "waiting for threading upgrade to finish") // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time... // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates). const batchSize = 50000 total, err := acc.ResetThreading(ctx, log, batchSize, true) - ctl.xcheck(err, "resetting threading fields") - _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total) - ctl.xcheck(err, "write") + xctl.xcheck(err, "resetting threading fields") + fmt.Fprintf(xw, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total) // Assign threads again. Ideally we would do this in a single transaction, but // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize. - err = acc.AssignThreads(ctx, log, nil, 0, 50000, w) - ctl.xcheck(err, "reassign threads") + err = acc.AssignThreads(ctx, log, nil, 0, 50000, xw) + xctl.xcheck(err, "reassign threads") - _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n") - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n") } if accountOpt != "" { @@ -1828,15 +1816,14 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { if i > 0 { line = "\n" } - _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName) - ctl.xcheck(err, "write") + fmt.Fprintf(xw, "%sReassigning threads for account %s...\n", line, accName) xreassignThreads(accName) } } - w.xclose() + xw.xclose() case "backup": - backupctl(ctx, ctl) + xbackupctl(ctx, xctl) case "imapserve": /* protocol: @@ -1845,14 +1832,14 @@ func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) { < "ok or error" imap protocol */ - address := ctl.xread() - ctl.xwriteok() - imapserver.ServeConnPreauth("(imapserve)", cid, ctl.conn, address) - ctl.log.Debug("imap connection finished") + address := xctl.xread() + xctl.xwriteok() + imapserver.ServeConnPreauth("(imapserve)", cid, xctl.conn, address) + xctl.log.Debug("imap connection finished") default: log.Info("unrecognized command", slog.String("cmd", cmd)) - ctl.xwrite("unrecognized command") + xctl.xwrite("unrecognized command") return } } diff --git a/ctl_test.go b/ctl_test.go index bf5f1c4..f2a3feb 100644 --- a/ctl_test.go +++ b/ctl_test.go @@ -60,54 +60,54 @@ func TestCtl(t *testing.T) { var cid int64 - testctl := func(fn func(clientctl *ctl)) { + testctl := func(fn func(clientxctl *ctl)) { t.Helper() cconn, sconn := net.Pipe() - clientctl := ctl{conn: cconn, log: pkglog} - serverctl := ctl{conn: sconn, log: pkglog} + clientxctl := ctl{conn: cconn, log: pkglog} + serverxctl := ctl{conn: sconn, log: pkglog} done := make(chan struct{}) go func() { cid++ - servectlcmd(ctxbg, &serverctl, cid, func() {}) + servectlcmd(ctxbg, &serverxctl, cid, func() {}) close(done) }() - fn(&clientctl) + fn(&clientxctl) cconn.Close() <-done sconn.Close() } // "deliver" - testctl(func(ctl *ctl) { - ctlcmdDeliver(ctl, "mjl@mox.example") + testctl(func(xctl *ctl) { + ctlcmdDeliver(xctl, "mjl@mox.example") }) // "setaccountpassword" - testctl(func(ctl *ctl) { - ctlcmdSetaccountpassword(ctl, "mjl", "test4321") + testctl(func(xctl *ctl) { + ctlcmdSetaccountpassword(xctl, "mjl", "test4321") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesList(ctl) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesList(xctl) }) // All messages. - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "", "", "") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "", "", "") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "mjl", "", "") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "mjl", "", "") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "", "☺.mox.example", "") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "", "☺.mox.example", "") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "mox", "☺.mox.example", "example.com") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "mox", "☺.mox.example", "example.com") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesRemove(ctl, 1) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesRemove(xctl, 1) }) // Queue a message to list/change/dump. @@ -127,252 +127,252 @@ func TestCtl(t *testing.T) { qmid := qml[0].ID // Has entries now. - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesList(ctl) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesList(xctl) }) // "queuelist" - testctl(func(ctl *ctl) { - ctlcmdQueueList(ctl, queue.Filter{}, queue.Sort{}) + testctl(func(xctl *ctl) { + ctlcmdQueueList(xctl, queue.Filter{}, queue.Sort{}) }) // "queueholdset" - testctl(func(ctl *ctl) { - ctlcmdQueueHoldSet(ctl, queue.Filter{}, true) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldSet(xctl, queue.Filter{}, true) }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldSet(ctl, queue.Filter{}, false) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldSet(xctl, queue.Filter{}, false) }) // "queueschedule" - testctl(func(ctl *ctl) { - ctlcmdQueueSchedule(ctl, queue.Filter{}, true, time.Minute) + testctl(func(xctl *ctl) { + ctlcmdQueueSchedule(xctl, queue.Filter{}, true, time.Minute) }) // "queuetransport" - testctl(func(ctl *ctl) { - ctlcmdQueueTransport(ctl, queue.Filter{}, "socks") + testctl(func(xctl *ctl) { + ctlcmdQueueTransport(xctl, queue.Filter{}, "socks") }) // "queuerequiretls" - testctl(func(ctl *ctl) { - ctlcmdQueueRequireTLS(ctl, queue.Filter{}, nil) + testctl(func(xctl *ctl) { + ctlcmdQueueRequireTLS(xctl, queue.Filter{}, nil) }) // "queuedump" - testctl(func(ctl *ctl) { - ctlcmdQueueDump(ctl, fmt.Sprintf("%d", qmid)) + testctl(func(xctl *ctl) { + ctlcmdQueueDump(xctl, fmt.Sprintf("%d", qmid)) }) // "queuefail" - testctl(func(ctl *ctl) { - ctlcmdQueueFail(ctl, queue.Filter{}) + testctl(func(xctl *ctl) { + ctlcmdQueueFail(xctl, queue.Filter{}) }) // "queuedrop" - testctl(func(ctl *ctl) { - ctlcmdQueueDrop(ctl, queue.Filter{}) + testctl(func(xctl *ctl) { + ctlcmdQueueDrop(xctl, queue.Filter{}) }) // "queueholdruleslist" - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesList(ctl) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesList(xctl) }) // "queueholdrulesadd" - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "mjl", "", "") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "mjl", "", "") }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesAdd(ctl, "mjl", "localhost", "") + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesAdd(xctl, "mjl", "localhost", "") }) // "queueholdrulesremove" - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesRemove(ctl, 2) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesRemove(xctl, 2) }) - testctl(func(ctl *ctl) { - ctlcmdQueueHoldrulesList(ctl) + testctl(func(xctl *ctl) { + ctlcmdQueueHoldrulesList(xctl) }) // "queuesuppresslist" - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressList(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressList(xctl, "mjl") }) // "queuesuppressadd" - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressAdd(ctl, "mjl", "base@localhost") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressAdd(xctl, "mjl", "base@localhost") }) - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressAdd(ctl, "mjl", "other@localhost") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressAdd(xctl, "mjl", "other@localhost") }) // "queuesuppresslookup" - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressLookup(ctl, "mjl", "base@localhost") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressLookup(xctl, "mjl", "base@localhost") }) // "queuesuppressremove" - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressRemove(ctl, "mjl", "base@localhost") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressRemove(xctl, "mjl", "base@localhost") }) - testctl(func(ctl *ctl) { - ctlcmdQueueSuppressList(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdQueueSuppressList(xctl, "mjl") }) // "queueretiredlist" - testctl(func(ctl *ctl) { - ctlcmdQueueRetiredList(ctl, queue.RetiredFilter{}, queue.RetiredSort{}) + testctl(func(xctl *ctl) { + ctlcmdQueueRetiredList(xctl, queue.RetiredFilter{}, queue.RetiredSort{}) }) // "queueretiredprint" - testctl(func(ctl *ctl) { - ctlcmdQueueRetiredPrint(ctl, "1") + testctl(func(xctl *ctl) { + ctlcmdQueueRetiredPrint(xctl, "1") }) // "queuehooklist" - testctl(func(ctl *ctl) { - ctlcmdQueueHookList(ctl, queue.HookFilter{}, queue.HookSort{}) + testctl(func(xctl *ctl) { + ctlcmdQueueHookList(xctl, queue.HookFilter{}, queue.HookSort{}) }) // "queuehookschedule" - testctl(func(ctl *ctl) { - ctlcmdQueueHookSchedule(ctl, queue.HookFilter{}, true, time.Minute) + testctl(func(xctl *ctl) { + ctlcmdQueueHookSchedule(xctl, queue.HookFilter{}, true, time.Minute) }) // "queuehookprint" - testctl(func(ctl *ctl) { - ctlcmdQueueHookPrint(ctl, "1") + testctl(func(xctl *ctl) { + ctlcmdQueueHookPrint(xctl, "1") }) // "queuehookcancel" - testctl(func(ctl *ctl) { - ctlcmdQueueHookCancel(ctl, queue.HookFilter{}) + testctl(func(xctl *ctl) { + ctlcmdQueueHookCancel(xctl, queue.HookFilter{}) }) // "queuehookretiredlist" - testctl(func(ctl *ctl) { - ctlcmdQueueHookRetiredList(ctl, queue.HookRetiredFilter{}, queue.HookRetiredSort{}) + testctl(func(xctl *ctl) { + ctlcmdQueueHookRetiredList(xctl, queue.HookRetiredFilter{}, queue.HookRetiredSort{}) }) // "queuehookretiredprint" - testctl(func(ctl *ctl) { - ctlcmdQueueHookRetiredPrint(ctl, "1") + testctl(func(xctl *ctl) { + ctlcmdQueueHookRetiredPrint(xctl, "1") }) // "importmbox" - testctl(func(ctl *ctl) { - ctlcmdImport(ctl, true, "mjl", "inbox", "testdata/importtest.mbox") + testctl(func(xctl *ctl) { + ctlcmdImport(xctl, true, "mjl", "inbox", "testdata/importtest.mbox") }) // "importmaildir" - testctl(func(ctl *ctl) { - ctlcmdImport(ctl, false, "mjl", "inbox", "testdata/importtest.maildir") + testctl(func(xctl *ctl) { + ctlcmdImport(xctl, false, "mjl", "inbox", "testdata/importtest.maildir") }) // "domainadd" - testctl(func(ctl *ctl) { - ctlcmdConfigDomainAdd(ctl, false, dns.Domain{ASCII: "mox2.example"}, "mjl", "") + testctl(func(xctl *ctl) { + ctlcmdConfigDomainAdd(xctl, false, dns.Domain{ASCII: "mox2.example"}, "mjl", "") }) // "accountadd" - testctl(func(ctl *ctl) { - ctlcmdConfigAccountAdd(ctl, "mjl2", "mjl2@mox2.example") + testctl(func(xctl *ctl) { + ctlcmdConfigAccountAdd(xctl, "mjl2", "mjl2@mox2.example") }) // "addressadd" - testctl(func(ctl *ctl) { - ctlcmdConfigAddressAdd(ctl, "mjl3@mox2.example", "mjl2") + testctl(func(xctl *ctl) { + ctlcmdConfigAddressAdd(xctl, "mjl3@mox2.example", "mjl2") }) // Add a message. - testctl(func(ctl *ctl) { - ctlcmdDeliver(ctl, "mjl3@mox2.example") + testctl(func(xctl *ctl) { + ctlcmdDeliver(xctl, "mjl3@mox2.example") }) // "retrain", retrain junk filter. - testctl(func(ctl *ctl) { - ctlcmdRetrain(ctl, "mjl2") + testctl(func(xctl *ctl) { + ctlcmdRetrain(xctl, "mjl2") }) // "addressrm" - testctl(func(ctl *ctl) { - ctlcmdConfigAddressRemove(ctl, "mjl3@mox2.example") + testctl(func(xctl *ctl) { + ctlcmdConfigAddressRemove(xctl, "mjl3@mox2.example") }) // "accountdisabled" - testctl(func(ctl *ctl) { - ctlcmdConfigAccountDisabled(ctl, "mjl2", "testing") + testctl(func(xctl *ctl) { + ctlcmdConfigAccountDisabled(xctl, "mjl2", "testing") }) - testctl(func(ctl *ctl) { - ctlcmdConfigAccountDisabled(ctl, "mjl2", "") + testctl(func(xctl *ctl) { + ctlcmdConfigAccountDisabled(xctl, "mjl2", "") }) // "accountrm" - testctl(func(ctl *ctl) { - ctlcmdConfigAccountRemove(ctl, "mjl2") + testctl(func(xctl *ctl) { + ctlcmdConfigAccountRemove(xctl, "mjl2") }) // "domaindisabled" - testctl(func(ctl *ctl) { - ctlcmdConfigDomainDisabled(ctl, dns.Domain{ASCII: "mox2.example"}, true) + testctl(func(xctl *ctl) { + ctlcmdConfigDomainDisabled(xctl, dns.Domain{ASCII: "mox2.example"}, true) }) - testctl(func(ctl *ctl) { - ctlcmdConfigDomainDisabled(ctl, dns.Domain{ASCII: "mox2.example"}, false) + testctl(func(xctl *ctl) { + ctlcmdConfigDomainDisabled(xctl, dns.Domain{ASCII: "mox2.example"}, false) }) // "domainrm" - testctl(func(ctl *ctl) { - ctlcmdConfigDomainRemove(ctl, dns.Domain{ASCII: "mox2.example"}) + testctl(func(xctl *ctl) { + ctlcmdConfigDomainRemove(xctl, dns.Domain{ASCII: "mox2.example"}) }) // "aliasadd" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasAdd(ctl, "support@mox.example", config.Alias{Addresses: []string{"mjl@mox.example"}}) + testctl(func(xctl *ctl) { + ctlcmdConfigAliasAdd(xctl, "support@mox.example", config.Alias{Addresses: []string{"mjl@mox.example"}}) }) // "aliaslist" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasList(ctl, "mox.example") + testctl(func(xctl *ctl) { + ctlcmdConfigAliasList(xctl, "mox.example") }) // "aliasprint" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasPrint(ctl, "support@mox.example") + testctl(func(xctl *ctl) { + ctlcmdConfigAliasPrint(xctl, "support@mox.example") }) // "aliasupdate" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasUpdate(ctl, "support@mox.example", "true", "true", "true") + testctl(func(xctl *ctl) { + ctlcmdConfigAliasUpdate(xctl, "support@mox.example", "true", "true", "true") }) // "aliasaddaddr" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasAddaddr(ctl, "support@mox.example", []string{"mjl2@mox.example"}) + testctl(func(xctl *ctl) { + ctlcmdConfigAliasAddaddr(xctl, "support@mox.example", []string{"mjl2@mox.example"}) }) // "aliasrmaddr" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasRmaddr(ctl, "support@mox.example", []string{"mjl2@mox.example"}) + testctl(func(xctl *ctl) { + ctlcmdConfigAliasRmaddr(xctl, "support@mox.example", []string{"mjl2@mox.example"}) }) // "aliasrm" - testctl(func(ctl *ctl) { - ctlcmdConfigAliasRemove(ctl, "support@mox.example") + testctl(func(xctl *ctl) { + ctlcmdConfigAliasRemove(xctl, "support@mox.example") }) // accounttlspubkeyadd certDER := fakeCert(t) - testctl(func(ctl *ctl) { - ctlcmdConfigTlspubkeyAdd(ctl, "mjl@mox.example", "testkey", false, certDER) + testctl(func(xctl *ctl) { + ctlcmdConfigTlspubkeyAdd(xctl, "mjl@mox.example", "testkey", false, certDER) }) // "accounttlspubkeylist" - testctl(func(ctl *ctl) { - ctlcmdConfigTlspubkeyList(ctl, "") + testctl(func(xctl *ctl) { + ctlcmdConfigTlspubkeyList(xctl, "") }) - testctl(func(ctl *ctl) { - ctlcmdConfigTlspubkeyList(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdConfigTlspubkeyList(xctl, "mjl") }) tpkl, err := store.TLSPublicKeyList(ctxbg, "") @@ -383,13 +383,13 @@ func TestCtl(t *testing.T) { fingerprint := tpkl[0].Fingerprint // "accounttlspubkeyget" - testctl(func(ctl *ctl) { - ctlcmdConfigTlspubkeyGet(ctl, fingerprint) + testctl(func(xctl *ctl) { + ctlcmdConfigTlspubkeyGet(xctl, fingerprint) }) // "accounttlspubkeyrm" - testctl(func(ctl *ctl) { - ctlcmdConfigTlspubkeyRemove(ctl, fingerprint) + testctl(func(xctl *ctl) { + ctlcmdConfigTlspubkeyRemove(xctl, fingerprint) }) tpkl, err = store.TLSPublicKeyList(ctxbg, "") @@ -399,39 +399,39 @@ func TestCtl(t *testing.T) { } // "loglevels" - testctl(func(ctl *ctl) { - ctlcmdLoglevels(ctl) + testctl(func(xctl *ctl) { + ctlcmdLoglevels(xctl) }) // "setloglevels" - testctl(func(ctl *ctl) { - ctlcmdSetLoglevels(ctl, "", "debug") + testctl(func(xctl *ctl) { + ctlcmdSetLoglevels(xctl, "", "debug") }) - testctl(func(ctl *ctl) { - ctlcmdSetLoglevels(ctl, "smtpserver", "debug") + testctl(func(xctl *ctl) { + ctlcmdSetLoglevels(xctl, "smtpserver", "debug") }) // Export data, import it again xcmdExport(true, false, []string{filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/"), filepath.FromSlash("testdata/ctl/data/accounts/mjl")}, &cmd{log: pkglog}) xcmdExport(false, false, []string{filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/"), filepath.FromSlash("testdata/ctl/data/accounts/mjl")}, &cmd{log: pkglog}) - testctl(func(ctl *ctl) { - ctlcmdImport(ctl, true, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/Inbox.mbox")) + testctl(func(xctl *ctl) { + ctlcmdImport(xctl, true, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/mbox/Inbox.mbox")) }) - testctl(func(ctl *ctl) { - ctlcmdImport(ctl, false, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/Inbox")) + testctl(func(xctl *ctl) { + ctlcmdImport(xctl, false, "mjl", "inbox", filepath.FromSlash("testdata/ctl/data/tmp/export/maildir/Inbox")) }) // "recalculatemailboxcounts" - testctl(func(ctl *ctl) { - ctlcmdRecalculateMailboxCounts(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdRecalculateMailboxCounts(xctl, "mjl") }) // "fixmsgsize" - testctl(func(ctl *ctl) { - ctlcmdFixmsgsize(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdFixmsgsize(xctl, "mjl") }) - testctl(func(ctl *ctl) { - acc, err := store.OpenAccount(ctl.log, "mjl", false) + testctl(func(xctl *ctl) { + acc, err := store.OpenAccount(xctl.log, "mjl", false) tcheck(t, err, "open account") defer func() { acc.Close() @@ -443,7 +443,7 @@ func TestCtl(t *testing.T) { deliver := func(m *store.Message) { t.Helper() m.Size = int64(len(content)) - msgf, err := store.CreateMessageTemp(ctl.log, "ctltest") + msgf, err := store.CreateMessageTemp(xctl.log, "ctltest") tcheck(t, err, "create temp file") defer os.Remove(msgf.Name()) defer msgf.Close() @@ -451,7 +451,7 @@ func TestCtl(t *testing.T) { tcheck(t, err, "write message file") acc.WithWLock(func() { - err = acc.DeliverMailbox(ctl.log, "Inbox", m, msgf) + err = acc.DeliverMailbox(xctl.log, "Inbox", m, msgf) tcheck(t, err, "deliver message") }) } @@ -471,7 +471,7 @@ func TestCtl(t *testing.T) { tcheck(t, err, "update mailbox size") // Fix up the size. - ctlcmdFixmsgsize(ctl, "") + ctlcmdFixmsgsize(xctl, "") err = acc.DB.Get(ctxbg, &msgBadSize) tcheck(t, err, "get message") @@ -481,19 +481,19 @@ func TestCtl(t *testing.T) { }) // "reparse" - testctl(func(ctl *ctl) { - ctlcmdReparse(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdReparse(xctl, "mjl") }) - testctl(func(ctl *ctl) { - ctlcmdReparse(ctl, "") + testctl(func(xctl *ctl) { + ctlcmdReparse(xctl, "") }) // "reassignthreads" - testctl(func(ctl *ctl) { - ctlcmdReassignthreads(ctl, "mjl") + testctl(func(xctl *ctl) { + ctlcmdReassignthreads(xctl, "mjl") }) - testctl(func(ctl *ctl) { - ctlcmdReassignthreads(ctl, "") + testctl(func(xctl *ctl) { + ctlcmdReassignthreads(xctl, "") }) // "backup", backup account. @@ -506,11 +506,11 @@ func TestCtl(t *testing.T) { err = tlsrptdb.Init() tcheck(t, err, "tlsrptdb init") defer tlsrptdb.Close() - testctl(func(ctl *ctl) { + testctl(func(xctl *ctl) { os.RemoveAll("testdata/ctl/data/tmp/backup") err := os.WriteFile("testdata/ctl/data/receivedid.key", make([]byte, 16), 0600) tcheck(t, err, "writing receivedid.key") - ctlcmdBackup(ctl, filepath.FromSlash("testdata/ctl/data/tmp/backup"), false) + ctlcmdBackup(xctl, filepath.FromSlash("testdata/ctl/data/tmp/backup"), false) }) // Verify the backup. @@ -521,7 +521,7 @@ func TestCtl(t *testing.T) { cmdVerifydata(&xcmd) // IMAP connection. - testctl(func(ctl *ctl) { + testctl(func(xctl *ctl) { a, b := net.Pipe() go func() { client, err := imapclient.New(mox.Cid(), a, true) @@ -530,7 +530,7 @@ func TestCtl(t *testing.T) { client.Logout() defer a.Close() }() - ctlcmdIMAPServe(ctl, "mjl@mox.example", b, b) + ctlcmdIMAPServe(xctl, "mjl@mox.example", b, b) }) } diff --git a/develop.txt b/develop.txt index 7227417..81afa17 100644 --- a/develop.txt +++ b/develop.txt @@ -47,6 +47,18 @@ instructions below. standard slog package for logging, not our mlog package. Packages not intended for reuse do use mlog as it is more convenient. Internally, we always use mlog.Log to do the logging, wrapping an slog.Logger. +- The code uses panic for error handling in quite a few places, including + smtpserver, imapserver and web API calls. Functions/methods, variables, struct + fields and types that begin with an "x" indicate they can panic on errors. Both + for i/o errors that are fatal for a connection, and also often for user-induced + errors, for example bad IMAP commands or invalid web API requests. These panics + are caught again at the top of a command or top of the connection. Write code + that is panic-safe, using defer to clean up and release resources. +- Try to check all errors, at the minimum using mlog.Log.Check() to log an error + at the appropriate level. Also when just closing a file. Log messages sometimes + unexpectedly point out latent issues. Only when there is no point in logging, + for example when previous writes to stderr failed, can error logging be skipped. + Test code is less strict about checking errors. # Reusable packages diff --git a/imapclient/client.go b/imapclient/client.go index 120ccd1..f8d9cbd 100644 --- a/imapclient/client.go +++ b/imapclient/client.go @@ -30,17 +30,19 @@ import ( // Conn is an IMAP connection to a server. type Conn struct { // Connection, may be original TCP or TLS connection. Reads go through c.br, and - // writes through c.bw. It wraps a tracing reading/writer and may wrap flate - // compression. - conn net.Conn - connBroken bool // If connection is broken, we won't flush (and write) again. - br *bufio.Reader - bw *bufio.Writer - compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer. - flateWriter *moxio.FlateWriter - flateBW *bufio.Writer - tr *moxio.TraceReader - tw *moxio.TraceWriter + // writes through c.xbw. The "x" for the writes indicate that failed writes cause + // an i/o panic, which is either turned into a returned error, or passed on (see + // boolean panic). The reader and writer wrap a tracing reading/writer and may wrap + // flate compression. + conn net.Conn + connBroken bool // If connection is broken, we won't flush (and write) again. + br *bufio.Reader + tr *moxio.TraceReader + xbw *bufio.Writer + compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer. + xflateWriter *moxio.FlateWriter + xflateBW *bufio.Writer + xtw *moxio.TraceWriter log mlog.Log panic bool @@ -86,8 +88,8 @@ func New(cid int64, conn net.Conn, xpanic bool) (client *Conn, rerr error) { c.br = bufio.NewReader(c.tr) // Writes are buffered and write to Conn, which may panic. - c.tw = moxio.NewTraceWriter(log, "CW: ", &c) - c.bw = bufio.NewWriter(c.tw) + c.xtw = moxio.NewTraceWriter(log, "CW: ", &c) + c.xbw = bufio.NewWriter(c.xtw) defer c.recover(&rerr) tag := c.xnonspace() @@ -171,14 +173,14 @@ func (c *Conn) xflush() { return } - err := c.bw.Flush() + err := c.xbw.Flush() c.xcheckf(err, "flush") // If compression is active, we need to flush the deflate stream. if c.compress { - err := c.flateWriter.Flush() + err := c.xflateWriter.Flush() c.xcheckf(err, "flush deflate") - err = c.flateBW.Flush() + err = c.xflateBW.Flush() c.xcheckf(err, "flush deflate buffer") } } @@ -186,11 +188,11 @@ func (c *Conn) xflush() { func (c *Conn) xtrace(level slog.Level) func() { c.xflush() c.tr.SetTrace(level) - c.tw.SetTrace(level) + c.xtw.SetTrace(level) return func() { c.xflush() c.tr.SetTrace(mlog.LevelTrace) - c.tw.SetTrace(mlog.LevelTrace) + c.xtw.SetTrace(mlog.LevelTrace) } } @@ -214,13 +216,13 @@ func (c *Conn) Close() (rerr error) { if c.conn == nil { return nil } - if !c.connBroken && c.flateWriter != nil { - err := c.flateWriter.Close() + if !c.connBroken && c.xflateWriter != nil { + err := c.xflateWriter.Close() c.xcheckf(err, "close deflate writer") - err = c.flateBW.Flush() + err = c.xflateBW.Flush() c.xcheckf(err, "flush deflate buffer") - c.flateWriter = nil - c.flateBW = nil + c.xflateWriter = nil + c.xflateBW = nil } err := c.conn.Close() c.xcheckf(err, "close connection") @@ -248,8 +250,7 @@ func (c *Conn) Commandf(tag string, format string, args ...any) (rerr error) { } c.LastTag = tag - _, err := fmt.Fprintf(c.bw, "%s %s\r\n", tag, fmt.Sprintf(format, args...)) - c.xcheckf(err, "write command") + fmt.Fprintf(c.xbw, "%s %s\r\n", tag, fmt.Sprintf(format, args...)) c.xflush() return } @@ -337,8 +338,7 @@ func (c *Conn) Writelinef(format string, args ...any) (rerr error) { defer c.recover(&rerr) s := fmt.Sprintf(format, args...) - _, err := fmt.Fprintf(c.bw, "%s\r\n", s) - c.xcheckf(err, "writeline") + fmt.Fprintf(c.xbw, "%s\r\n", s) c.xflush() return nil } @@ -348,8 +348,7 @@ func (c *Conn) Writelinef(format string, args ...any) (rerr error) { func (c *Conn) WriteSyncLiteral(s string) (untagged []Untagged, rerr error) { defer c.recover(&rerr) - _, err := fmt.Fprintf(c.bw, "{%d}\r\n", len(s)) - c.xcheckf(err, "write sync literal size") + fmt.Fprintf(c.xbw, "{%d}\r\n", len(s)) c.xflush() plus, err := c.br.Peek(1) @@ -358,7 +357,7 @@ func (c *Conn) WriteSyncLiteral(s string) (untagged []Untagged, rerr error) { _, err = c.Readline() c.xcheckf(err, "read continuation line") - _, err = c.bw.Write([]byte(s)) + _, err = c.xbw.Write([]byte(s)) c.xcheckf(err, "write literal data") c.xflush() return nil, nil diff --git a/imapclient/cmds.go b/imapclient/cmds.go index 4b2a88d..3e948dc 100644 --- a/imapclient/cmds.go +++ b/imapclient/cmds.go @@ -58,9 +58,9 @@ func (c *Conn) Login(username, password string) (untagged []Untagged, result Res defer c.recover(&rerr) c.LastTag = c.nextTag() - fmt.Fprintf(c.bw, "%s login %s ", c.LastTag, astring(username)) + fmt.Fprintf(c.xbw, "%s login %s ", c.LastTag, astring(username)) defer c.xtrace(mlog.LevelTraceauth)() - fmt.Fprintf(c.bw, "%s\r\n", astring(password)) + fmt.Fprintf(c.xbw, "%s\r\n", astring(password)) c.xtrace(mlog.LevelTrace) // Restore. return c.Response() } @@ -69,18 +69,19 @@ func (c *Conn) Login(username, password string) (untagged []Untagged, result Res func (c *Conn) AuthenticatePlain(username, password string) (untagged []Untagged, result Result, rerr error) { defer c.recover(&rerr) - c.Commandf("", "authenticate plain") + err := c.Commandf("", "authenticate plain") + c.xcheckf(err, "writing authenticate command") _, untagged, result, rerr = c.ReadContinuation() c.xcheckf(rerr, "reading continuation") if result.Status != "" { c.xerrorf("got result status %q, expected continuation", result.Status) } defer c.xtrace(mlog.LevelTraceauth)() - xw := base64.NewEncoder(base64.StdEncoding, c.bw) + xw := base64.NewEncoder(base64.StdEncoding, c.xbw) fmt.Fprintf(xw, "\u0000%s\u0000%s", username, password) xw.Close() c.xtrace(mlog.LevelTrace) // Restore. - fmt.Fprintf(c.bw, "\r\n") + fmt.Fprintf(c.xbw, "\r\n") c.xflush() return c.Response() } @@ -153,15 +154,15 @@ func (c *Conn) CompressDeflate() (untagged []Untagged, result Result, rerr error untagged, result, rerr = c.Transactf("compress deflate") c.xcheck(rerr) - c.flateBW = bufio.NewWriter(c) - fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) + c.xflateBW = bufio.NewWriter(c) + fw0, err := flate.NewWriter(c.xflateBW, flate.DefaultCompression) c.xcheckf(err, "deflate") // Cannot happen. fw := moxio.NewFlateWriter(fw0) c.compress = true - c.flateWriter = fw - c.tw = moxio.NewTraceWriter(mlog.New("imapclient", nil), "CW: ", fw) - c.bw = bufio.NewWriter(c.tw) + c.xflateWriter = fw + c.xtw = moxio.NewTraceWriter(mlog.New("imapclient", nil), "CW: ", fw) + c.xbw = bufio.NewWriter(c.xtw) rc := c.xprefixConn() fr := flate.NewReaderPartial(rc) @@ -303,8 +304,7 @@ func (c *Conn) Append(mailbox string, message Append, more ...Append) (untagged tag := c.nextTag() c.LastTag = tag - _, err := fmt.Fprintf(c.bw, "%s append %s", tag, astring(mailbox)) - c.xcheckf(err, "write command") + fmt.Fprintf(c.xbw, "%s append %s", tag, astring(mailbox)) msgs := append([]Append{message}, more...) for _, m := range msgs { @@ -316,14 +316,14 @@ func (c *Conn) Append(mailbox string, message Append, more ...Append) (untagged // todo: use literal8 if needed, with "UTF8()" if required. // todo: for larger messages, use a synchronizing literal. - fmt.Fprintf(c.bw, " (%s)%s {%d+}\r\n", strings.Join(m.Flags, " "), date, m.Size) + fmt.Fprintf(c.xbw, " (%s)%s {%d+}\r\n", strings.Join(m.Flags, " "), date, m.Size) defer c.xtrace(mlog.LevelTracedata)() - _, err := io.Copy(c.bw, m.Data) + _, err := io.Copy(c.xbw, m.Data) c.xcheckf(err, "write message data") c.xtrace(mlog.LevelTrace) // Restore } - fmt.Fprintf(c.bw, "\r\n") + fmt.Fprintf(c.xbw, "\r\n") c.xflush() return c.Response() } @@ -441,14 +441,15 @@ func (c *Conn) replace(cmd string, num string, mailbox string, msg Append) (unta } // todo: only use literal8 if needed, possibly with "UTF8()" // todo: encode mailbox - c.Commandf("", "%s %s %s (%s)%s ~{%d+}", cmd, num, astring(mailbox), strings.Join(msg.Flags, " "), date, msg.Size) + err := c.Commandf("", "%s %s %s (%s)%s ~{%d+}", cmd, num, astring(mailbox), strings.Join(msg.Flags, " "), date, msg.Size) + c.xcheckf(err, "writing replace command") defer c.xtrace(mlog.LevelTracedata)() - _, err := io.Copy(c.bw, msg.Data) + _, err = io.Copy(c.xbw, msg.Data) c.xcheckf(err, "write message data") c.xtrace(mlog.LevelTrace) - fmt.Fprintf(c.bw, "\r\n") + fmt.Fprintf(c.xbw, "\r\n") c.xflush() return c.Response() diff --git a/imapclient/parse.go b/imapclient/parse.go index ceeffe3..dce5abb 100644 --- a/imapclient/parse.go +++ b/imapclient/parse.go @@ -856,8 +856,7 @@ func (c *Conn) xliteral() []byte { c.xerrorf("refusing to read more than 1MB: %d", size) } if sync { - _, err := fmt.Fprintf(c.bw, "+ ok\r\n") - c.xcheckf(err, "write continuation") + fmt.Fprintf(c.xbw, "+ ok\r\n") c.xflush() } buf := make([]byte, int(size)) diff --git a/imapserver/fetch.go b/imapserver/fetch.go index 9019754..fc3233d 100644 --- a/imapserver/fetch.go +++ b/imapserver/fetch.go @@ -428,9 +428,9 @@ func (cmd *fetchCmd) process(atts []fetchAtt) { } // Write errors are turned into panics because we write through c. - fmt.Fprintf(cmd.conn.bw, "* %d FETCH ", cmd.conn.xsequence(cmd.uid)) - data.writeTo(cmd.conn, cmd.conn.bw) - cmd.conn.bw.Write([]byte("\r\n")) + fmt.Fprintf(cmd.conn.xbw, "* %d FETCH ", cmd.conn.xsequence(cmd.uid)) + data.writeTo(cmd.conn, cmd.conn.xbw) + cmd.conn.xbw.Write([]byte("\r\n")) } // result for one attribute. if processing fails, e.g. because data was requested diff --git a/imapserver/list.go b/imapserver/list.go index 2367beb..0d69c23 100644 --- a/imapserver/list.go +++ b/imapserver/list.go @@ -264,7 +264,7 @@ func (c *conn) cmdList(tag, cmd string, p *parser) { c.bwritelinef("%s", line) } for _, meta := range respMetadata { - meta.writeTo(c, c.bw) + meta.writeTo(c, c.xbw) c.bwritelinef("") } c.ok(tag, cmd) diff --git a/imapserver/metadata.go b/imapserver/metadata.go index e2527c3..5929ea8 100644 --- a/imapserver/metadata.go +++ b/imapserver/metadata.go @@ -155,18 +155,18 @@ func (c *conn) cmdGetmetadata(tag, cmd string, p *parser) { // Response syntax: ../rfc/5464:807 ../rfc/5464:778 // We can only send untagged responses when we have any matches. if len(annotations) > 0 { - fmt.Fprintf(c.bw, "* METADATA %s (", mailboxt(mailboxName).pack(c)) + fmt.Fprintf(c.xbw, "* METADATA %s (", mailboxt(mailboxName).pack(c)) for i, a := range annotations { if i > 0 { - fmt.Fprint(c.bw, " ") + fmt.Fprint(c.xbw, " ") } - astring(a.Key).writeTo(c, c.bw) - fmt.Fprint(c.bw, " ") + astring(a.Key).writeTo(c, c.xbw) + fmt.Fprint(c.xbw, " ") if a.IsString { - string0(string(a.Value)).writeTo(c, c.bw) + string0(string(a.Value)).writeTo(c, c.xbw) } else { v := readerSizeSyncliteral{bytes.NewReader(a.Value), int64(len(a.Value)), true} - v.writeTo(c, c.bw) + v.writeTo(c, c.xbw) } } c.bwritelinef(")") diff --git a/imapserver/pack.go b/imapserver/pack.go index f03b4f4..6244578 100644 --- a/imapserver/pack.go +++ b/imapserver/pack.go @@ -9,7 +9,7 @@ import ( type token interface { pack(c *conn) string - writeTo(c *conn, w io.Writer) + writeTo(c *conn, xw io.Writer) // Writes to xw panic on error. } type bare string @@ -18,8 +18,8 @@ func (t bare) pack(c *conn) string { return string(t) } -func (t bare) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t bare) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } type niltoken struct{} @@ -30,8 +30,8 @@ func (t niltoken) pack(c *conn) string { return "NIL" } -func (t niltoken) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t niltoken) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } func nilOrString(s string) token { @@ -60,8 +60,8 @@ func (t string0) pack(c *conn) string { return r } -func (t string0) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t string0) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } type dquote string @@ -78,8 +78,8 @@ func (t dquote) pack(c *conn) string { return r } -func (t dquote) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t dquote) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } type syncliteral string @@ -88,9 +88,9 @@ func (t syncliteral) pack(c *conn) string { return fmt.Sprintf("{%d}\r\n", len(t)) + string(t) } -func (t syncliteral) writeTo(c *conn, w io.Writer) { - fmt.Fprintf(w, "{%d}\r\n", len(t)) - w.Write([]byte(t)) +func (t syncliteral) writeTo(c *conn, xw io.Writer) { + fmt.Fprintf(xw, "{%d}\r\n", len(t)) + xw.Write([]byte(t)) } // data from reader with known size. @@ -112,14 +112,14 @@ func (t readerSizeSyncliteral) pack(c *conn) string { return fmt.Sprintf("%s{%d}\r\n", lit, t.size) + string(buf) } -func (t readerSizeSyncliteral) writeTo(c *conn, w io.Writer) { +func (t readerSizeSyncliteral) writeTo(c *conn, xw io.Writer) { var lit string if t.lit8 { lit = "~" } - fmt.Fprintf(w, "%s{%d}\r\n", lit, t.size) + fmt.Fprintf(xw, "%s{%d}\r\n", lit, t.size) defer c.xtrace(mlog.LevelTracedata)() - if _, err := io.Copy(w, io.LimitReader(t.r, t.size)); err != nil { + if _, err := io.Copy(xw, io.LimitReader(t.r, t.size)); err != nil { panic(err) } } @@ -137,17 +137,14 @@ func (t readerSyncliteral) pack(c *conn) string { return fmt.Sprintf("{%d}\r\n", len(buf)) + string(buf) } -func (t readerSyncliteral) writeTo(c *conn, w io.Writer) { +func (t readerSyncliteral) writeTo(c *conn, xw io.Writer) { buf, err := io.ReadAll(t.r) if err != nil { panic(err) } - fmt.Fprintf(w, "{%d}\r\n", len(buf)) + fmt.Fprintf(xw, "{%d}\r\n", len(buf)) defer c.xtrace(mlog.LevelTracedata)() - _, err = w.Write(buf) - if err != nil { - panic(err) - } + xw.Write(buf) } // list with tokens space-separated @@ -165,15 +162,15 @@ func (t listspace) pack(c *conn) string { return s } -func (t listspace) writeTo(c *conn, w io.Writer) { - fmt.Fprint(w, "(") +func (t listspace) writeTo(c *conn, xw io.Writer) { + fmt.Fprint(xw, "(") for i, e := range t { if i > 0 { - fmt.Fprint(w, " ") + fmt.Fprint(xw, " ") } - e.writeTo(c, w) + e.writeTo(c, xw) } - fmt.Fprint(w, ")") + fmt.Fprint(xw, ")") } // concatenate tokens space-separated @@ -190,12 +187,12 @@ func (t concatspace) pack(c *conn) string { return s } -func (t concatspace) writeTo(c *conn, w io.Writer) { +func (t concatspace) writeTo(c *conn, xw io.Writer) { for i, e := range t { if i > 0 { - fmt.Fprint(w, " ") + fmt.Fprint(xw, " ") } - e.writeTo(c, w) + e.writeTo(c, xw) } } @@ -210,9 +207,9 @@ func (t concat) pack(c *conn) string { return s } -func (t concat) writeTo(c *conn, w io.Writer) { +func (t concat) writeTo(c *conn, xw io.Writer) { for _, e := range t { - e.writeTo(c, w) + e.writeTo(c, xw) } } @@ -234,8 +231,8 @@ next: return string(t) } -func (t astring) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t astring) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } // mailbox with utf7 encoding if connection requires it, or utf8 otherwise. @@ -249,8 +246,8 @@ func (t mailboxt) pack(c *conn) string { return astring(s).pack(c) } -func (t mailboxt) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t mailboxt) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } type number uint32 @@ -259,6 +256,6 @@ func (t number) pack(c *conn) string { return fmt.Sprintf("%d", t) } -func (t number) writeTo(c *conn, w io.Writer) { - w.Write([]byte(t.pack(c))) +func (t number) writeTo(c *conn, xw io.Writer) { + xw.Write([]byte(t.pack(c))) } diff --git a/imapserver/server.go b/imapserver/server.go index 8df9264..e3028ba 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -191,14 +191,16 @@ type conn struct { tls bool // Whether TLS has been initialized. viaHTTPS bool // Whether this connection came in via HTTPS (using TLS ALPN). br *bufio.Reader // From remote, with TLS unwrapped in case of TLS, and possibly wrapping inflate. + tr *moxio.TraceReader // Kept to change trace level when reading/writing cmd/auth/data. line chan lineErr // If set, instead of reading from br, a line is read from this channel. For reading a line in IDLE while also waiting for mailbox/account updates. lastLine string // For detecting if syntax error is fatal, i.e. if this ends with a literal. Without crlf. - bw *bufio.Writer // To remote, with TLS added in case of TLS, and possibly wrapping deflate, see conn.flateWriter. - tr *moxio.TraceReader // Kept to change trace level when reading/writing cmd/auth/data. - tw *moxio.TraceWriter - slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy. - lastlog time.Time // For printing time since previous log line. - baseTLSConfig *tls.Config // Base TLS config to use for handshake. + xbw *bufio.Writer // To remote, with TLS added in case of TLS, and possibly wrapping deflate, see conn.xflateWriter. Writes go through xtw to conn.Write, which panics on errors, hence the "x". + xtw *moxio.TraceWriter + xflateWriter *moxio.FlateWriter // For flushing output after flushing conn.xbw, and for closing. + xflateBW *bufio.Writer // Wraps raw connection writes, xflateWriter writes here, also needs flushing. + slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy. + lastlog time.Time // For printing time since previous log line. + baseTLSConfig *tls.Config // Base TLS config to use for handshake. remoteIP net.IP noRequireSTARTTLS bool cmd string // Currently executing, for deciding to applyChanges and logging. @@ -208,8 +210,6 @@ type conn struct { log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine. enabled map[capability]bool // All upper-case. compress bool // Whether compression is enabled, via compress command. - flateWriter *moxio.FlateWriter // For flushing output after flushing conn.bw, and for closing. - flateBW *bufio.Writer // Wraps raw connection writes, flateWriter writes here, also needs flushing. // Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with // value "$". When used, UIDs must be verified to still exist, because they may @@ -529,11 +529,11 @@ func (c *conn) Write(buf []byte) (int, error) { func (c *conn) xtrace(level slog.Level) func() { c.xflush() c.tr.SetTrace(level) - c.tw.SetTrace(level) + c.xtw.SetTrace(level) return func() { c.xflush() c.tr.SetTrace(mlog.LevelTrace) - c.tw.SetTrace(mlog.LevelTrace) + c.xtw.SetTrace(mlog.LevelTrace) } } @@ -641,7 +641,7 @@ func (c *conn) writelinef(format string, args ...any) { // Buffer line for write. func (c *conn) bwritelinef(format string, args ...any) { format += "\r\n" - fmt.Fprintf(c.bw, format, args...) + fmt.Fprintf(c.xbw, format, args...) } func (c *conn) xflush() { @@ -650,7 +650,7 @@ func (c *conn) xflush() { return } - err := c.bw.Flush() + err := c.xbw.Flush() xcheckf(err, "flush") // Should never happen, the Write caused by the Flush should panic on i/o error. // If compression is enabled, we need to flush its stream. @@ -658,11 +658,11 @@ func (c *conn) xflush() { // Note: Flush writes a sync message if there is nothing to flush. Ideally we // wouldn't send that, but we would have to keep track of whether data needs to be // flushed. - err := c.flateWriter.Flush() + err := c.xflateWriter.Flush() xcheckf(err, "flush deflate") // The flate writer writes to a bufio.Writer, we must also flush that. - err = c.flateBW.Flush() + err = c.xflateBW.Flush() xcheckf(err, "flush deflate writer") } } @@ -753,8 +753,8 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x c.tr = moxio.NewTraceReader(c.log, "C: ", c.conn) // todo: tracing should be done on whatever comes out of c.br. the remote connection write a command plus data, and bufio can read it in one read, causing a command parser that sets the tracing level to data to have no effect. we are now typically logging sent messages, when mail clients append to the Sent mailbox. c.br = bufio.NewReader(c.tr) - c.tw = moxio.NewTraceWriter(c.log, "S: ", c) - c.bw = bufio.NewWriter(c.tw) + c.xtw = moxio.NewTraceWriter(c.log, "S: ", c) + c.xbw = bufio.NewWriter(c.xtw) // Many IMAP connections use IDLE to wait for new incoming messages. We'll enable // keepalive to get a higher chance of the connection staying alive, or otherwise @@ -1144,9 +1144,9 @@ func (c *conn) command() { // If compression was enabled, we flush & close the deflate stream. if c.compress { // Note: Close and flush can Write and may panic with an i/o error. - if err := c.flateWriter.Close(); err != nil { + if err := c.xflateWriter.Close(); err != nil { c.log.Debugx("close deflate writer", err) - } else if err := c.flateBW.Flush(); err != nil { + } else if err := c.xflateBW.Flush(); err != nil { c.log.Debugx("flush deflate buffer", err) } } @@ -1870,15 +1870,15 @@ func (c *conn) cmdCompress(tag, cmd string, p *parser) { c.log.Debug("compression enabled") c.ok(tag, cmd) - c.flateBW = bufio.NewWriter(c) - fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) + c.xflateBW = bufio.NewWriter(c) + fw0, err := flate.NewWriter(c.xflateBW, flate.DefaultCompression) xcheckf(err, "deflate") // Cannot happen. - fw := moxio.NewFlateWriter(fw0) + xfw := moxio.NewFlateWriter(fw0) c.compress = true - c.flateWriter = fw - c.tw = moxio.NewTraceWriter(c.log, "S: ", c.flateWriter) - c.bw = bufio.NewWriter(c.tw) // The previous c.bw will not have buffered data. + c.xflateWriter = xfw + c.xtw = moxio.NewTraceWriter(c.log, "S: ", c.xflateWriter) + c.xbw = bufio.NewWriter(c.xtw) // The previous c.xbw will not have buffered data. rc := xprefixConn(c.conn, c.br) // c.br may contain buffered data. // We use the special partial reader. Some clients write commands and flush the diff --git a/import.go b/import.go index d767ab9..b0c55a1 100644 --- a/import.go +++ b/import.go @@ -131,22 +131,22 @@ func xcmdXImport(mbox bool, c *cmd) { ctlcmdImport(&clientctl, mbox, account, args[1], args[2]) } -func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) { +func ctlcmdImport(xctl *ctl, mbox bool, account, mailbox, src string) { if mbox { - ctl.xwrite("importmbox") + xctl.xwrite("importmbox") } else { - ctl.xwrite("importmaildir") + xctl.xwrite("importmaildir") } - ctl.xwrite(account) + xctl.xwrite(account) if strings.EqualFold(mailbox, "Inbox") { mailbox = "Inbox" } - ctl.xwrite(mailbox) - ctl.xwrite(src) - ctl.xreadok() + xctl.xwrite(mailbox) + xctl.xwrite(src) + xctl.xreadok() fmt.Fprintln(os.Stderr, "importing...") for { - line := ctl.xread() + line := xctl.xread() if strings.HasPrefix(line, "progress ") { n := line[len("progress "):] fmt.Fprintf(os.Stderr, "%s...\n", n) @@ -157,11 +157,11 @@ func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) { } break } - count := ctl.xread() + count := xctl.xread() fmt.Fprintf(os.Stderr, "%s imported\n", count) } -func importctl(ctx context.Context, ctl *ctl, mbox bool) { +func ximportctl(ctx context.Context, xctl *ctl, mbox bool) { /* protocol: > "importmaildir" or "importmbox" > account @@ -172,15 +172,15 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { < "ok" when done, or error < count (of total imported messages, only if not error) */ - account := ctl.xread() - mailbox := ctl.xread() - src := ctl.xread() + account := xctl.xread() + mailbox := xctl.xread() + src := xctl.xread() kind := "maildir" if mbox { kind = "mbox" } - ctl.log.Info("importing messages", + xctl.log.Info("importing messages", slog.String("kind", kind), slog.String("account", account), slog.String("mailbox", mailbox), @@ -194,34 +194,34 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { // Ensure normalized form. mailbox = norm.NFC.String(mailbox) mailbox, _, err = store.CheckMailboxName(mailbox, true) - ctl.xcheck(err, "checking mailbox name") + xctl.xcheck(err, "checking mailbox name") // Open account, creating a database file if it doesn't exist yet. It must be known // in the configuration file. - a, err := store.OpenAccount(ctl.log, account, false) - ctl.xcheck(err, "opening account") + a, err := store.OpenAccount(xctl.log, account, false) + xctl.xcheck(err, "opening account") defer func() { if a != nil { err := a.Close() - ctl.log.Check(err, "closing account after import") + xctl.log.Check(err, "closing account after import") } }() - err = a.ThreadingWait(ctl.log) - ctl.xcheck(err, "waiting for account thread upgrade") + err = a.ThreadingWait(xctl.log) + xctl.xcheck(err, "waiting for account thread upgrade") defer func() { if mboxf != nil { err := mboxf.Close() - ctl.log.Check(err, "closing mbox file after import") + xctl.log.Check(err, "closing mbox file after import") } if mdnewf != nil { err := mdnewf.Close() - ctl.log.Check(err, "closing maildir new after import") + xctl.log.Check(err, "closing maildir new after import") } if mdcurf != nil { err := mdcurf.Close() - ctl.log.Check(err, "closing maildir cur after import") + xctl.log.Check(err, "closing maildir cur after import") } }() @@ -233,14 +233,14 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { // may be a different user who can access the files. if mbox { mboxf, err = os.Open(src) - ctl.xcheck(err, "open mbox file") - msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf) + xctl.xcheck(err, "open mbox file") + msgreader = store.NewMboxReader(xctl.log, store.CreateMessageTemp, src, mboxf) } else { mdnewf, err = os.Open(filepath.Join(src, "new")) - ctl.xcheck(err, "open subdir new of maildir") + xctl.xcheck(err, "open subdir new of maildir") mdcurf, err = os.Open(filepath.Join(src, "cur")) - ctl.xcheck(err, "open subdir cur of maildir") - msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf) + xctl.xcheck(err, "open subdir cur of maildir") + msgreader = store.NewMaildirReader(xctl.log, store.CreateMessageTemp, mdnewf, mdcurf) } // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training. @@ -249,16 +249,16 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { var changes []store.Change tx, err := a.DB.Begin(ctx, true) - ctl.xcheck(err, "begin transaction") + xctl.xcheck(err, "begin transaction") defer func() { if tx != nil { err := tx.Rollback() - ctl.log.Check(err, "rolling back transaction") + xctl.log.Check(err, "rolling back transaction") } }() // All preparations done. Good to go. - ctl.xwriteok() + xctl.xwriteok() // We will be delivering messages. If we fail halfway, we need to remove the created msg files. var newIDs []int64 @@ -268,22 +268,22 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { return } - if x != ctl.x { - ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x))) + if x != xctl.x { + xctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x))) debug.PrintStack() metrics.PanicInc(metrics.Import) } else { - ctl.log.Error("import error") + xctl.log.Error("import error") } for _, id := range newIDs { p := a.MessagePath(id) err := os.Remove(p) - ctl.log.Check(err, "closing message file after import error", slog.String("path", p)) + xctl.log.Check(err, "closing message file after import error", slog.String("path", p)) } newIDs = nil - ctl.xerror(fmt.Sprintf("import error: %v", x)) + xctl.xerror(fmt.Sprintf("import error: %v", x)) }() var modseq store.ModSeq // Assigned on first delivered messages, used for all messages. @@ -291,18 +291,18 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { // Ensure mailbox exists. var mb store.Mailbox mb, changes, err = a.MailboxEnsure(tx, mailbox, true, store.SpecialUse{}, &modseq) - ctl.xcheck(err, "ensuring mailbox exists") + xctl.xcheck(err, "ensuring mailbox exists") nkeywords := len(mb.Keywords) - jf, _, err := a.OpenJunkFilter(ctx, ctl.log) + jf, _, err := a.OpenJunkFilter(ctx, xctl.log) if err != nil && !errors.Is(err, store.ErrNoJunkFilter) { - ctl.xcheck(err, "open junk filter") + xctl.xcheck(err, "open junk filter") } defer func() { if jf != nil { err = jf.CloseDiscard() - ctl.xcheck(err, "close junk filter") + xctl.xcheck(err, "close junk filter") } }() @@ -312,30 +312,30 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { var addSize int64 du := store.DiskUsage{ID: 1} err = tx.Get(&du) - ctl.xcheck(err, "get disk usage") + xctl.xcheck(err, "get disk usage") msgDirs := map[string]struct{}{} process := func(m *store.Message, msgf *os.File, origPath string) { - defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import") + defer store.CloseRemoveTempFile(xctl.log, msgf, "message to import") addSize += m.Size if maxSize > 0 && du.MessageSize+addSize > maxSize { - ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota") + xctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota") } // Parse message and store parsed information for later fast retrieval. - p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size) + p, err := message.EnsurePart(xctl.log.Logger, false, msgf, m.Size) if err != nil { - ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath)) + xctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath)) } m.ParsedBuf, err = json.Marshal(p) - ctl.xcheck(err, "marshal parsed message structure") + xctl.xcheck(err, "marshal parsed message structure") // Set fields needed for future threading. By doing it now, MessageAdd won't // have to parse the Part again. p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf)) - m.PrepareThreading(ctl.log, &p) + m.PrepareThreading(xctl.log, &p) if m.Received.IsZero() { if p.Envelope != nil && !p.Envelope.Date.IsZero() { @@ -348,10 +348,10 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { m.JunkFlagsForMailbox(mb, conf) if jf != nil && m.NeedsTraining() { if words, err := jf.ParseMessage(p); err != nil { - ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath)) + xctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath)) } else { err = jf.Train(ctx, !m.Junk, words) - ctl.xcheck(err, "training junk filter") + xctl.xcheck(err, "training junk filter") m.TrainedJunk = &m.Junk } } @@ -359,7 +359,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { if modseq == 0 { var err error modseq, err = a.NextModSeq(tx) - ctl.xcheck(err, "assigning next modseq") + xctl.xcheck(err, "assigning next modseq") mb.ModSeq = modseq } @@ -376,8 +376,8 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { SkipUpdateDiskUsage: true, // We do this once at the end. SkipCheckQuota: true, // We check before. } - err = a.MessageAdd(ctl.log, tx, &mb, m, msgf, opts) - ctl.xcheck(err, "delivering message") + err = a.MessageAdd(xctl.log, tx, &mb, m, msgf, opts) + xctl.xcheck(err, "delivering message") newIDs = append(newIDs, m.ID) changes = append(changes, m.ChangeAddUID()) @@ -385,7 +385,7 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { n++ if n%1000 == 0 { - ctl.xwrite(fmt.Sprintf("progress %d", n)) + xctl.xwrite(fmt.Sprintf("progress %d", n)) } } @@ -394,15 +394,15 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { if err == io.EOF { break } - ctl.xcheck(err, "reading next message") + xctl.xcheck(err, "reading next message") process(m, msgf, origPath) } // Match threads. if len(newIDs) > 0 { - err = a.AssignThreads(ctx, ctl.log, tx, newIDs[0], 0, io.Discard) - ctl.xcheck(err, "assigning messages to threads") + err = a.AssignThreads(ctx, xctl.log, tx, newIDs[0], 0, io.Discard) + xctl.xcheck(err, "assigning messages to threads") } changes = append(changes, mb.ChangeCounts()) @@ -411,35 +411,35 @@ func importctl(ctx context.Context, ctl *ctl, mbox bool) { } err = tx.Update(&mb) - ctl.xcheck(err, "updating message counts and keywords in mailbox") + xctl.xcheck(err, "updating message counts and keywords in mailbox") - err = a.AddMessageSize(ctl.log, tx, addSize) - ctl.xcheck(err, "updating total message size") + err = a.AddMessageSize(xctl.log, tx, addSize) + xctl.xcheck(err, "updating total message size") for msgDir := range msgDirs { - err := moxio.SyncDir(ctl.log, msgDir) - ctl.xcheck(err, "sync dir") + err := moxio.SyncDir(xctl.log, msgDir) + xctl.xcheck(err, "sync dir") } if jf != nil { err := jf.Close() - ctl.log.Check(err, "close junk filter") + xctl.log.Check(err, "close junk filter") jf = nil } err = tx.Commit() - ctl.xcheck(err, "commit") + xctl.xcheck(err, "commit") tx = nil - ctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs))) + xctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs))) newIDs = nil store.BroadcastChanges(a, changes) }) err = a.Close() - ctl.xcheck(err, "closing account") + xctl.xcheck(err, "closing account") a = nil - ctl.xwriteok() - ctl.xwrite(fmt.Sprintf("%d", n)) + xctl.xwriteok() + xctl.xwrite(fmt.Sprintf("%d", n)) } diff --git a/main.go b/main.go index 3b5dfca..25e1a0f 100644 --- a/main.go +++ b/main.go @@ -1661,11 +1661,11 @@ new mail deliveries. } mustLoadConfig() - ctl := xctl() - ctl.xwrite("stop") + xctl := xctl() + xctl.xwrite("stop") // Read will hang until remote has shut down. buf := make([]byte, 128) - n, err := ctl.conn.Read(buf) + n, err := xctl.conn.Read(buf) if err == nil { log.Fatalf("expected eof after graceful shutdown, got data %q", buf[:n]) } else if err != io.EOF { diff --git a/message/compose.go b/message/compose.go index fafab10..8bfb955 100644 --- a/message/compose.go +++ b/message/compose.go @@ -44,20 +44,20 @@ func NewComposer(w io.Writer, maxSize int64, smtputf8 bool) *Composer { // Write implements io.Writer, but calls panic (that is handled higher up) on // i/o errors. -func (c *Composer) Write(buf []byte) (int, error) { - if c.maxSize > 0 && c.Size+int64(len(buf)) > c.maxSize { - c.Checkf(ErrMessageSize, "writing message") +func (xc *Composer) Write(buf []byte) (int, error) { + if xc.maxSize > 0 && xc.Size+int64(len(buf)) > xc.maxSize { + xc.Checkf(ErrMessageSize, "writing message") } - n, err := c.bw.Write(buf) + n, err := xc.bw.Write(buf) if n > 0 { - c.Size += int64(n) + xc.Size += int64(n) } - c.Checkf(err, "write") + xc.Checkf(err, "write") return n, nil } // Checkf checks err, panicing with sentinel error value. -func (c *Composer) Checkf(err error, format string, args ...any) { +func (xc *Composer) Checkf(err error, format string, args ...any) { if err != nil { // We expose the original error too, needed at least for ErrMessageSize. panic(fmt.Errorf("%w: %w: %v", ErrCompose, err, fmt.Sprintf(format, args...))) @@ -65,14 +65,14 @@ func (c *Composer) Checkf(err error, format string, args ...any) { } // Flush writes any buffered output. -func (c *Composer) Flush() { - err := c.bw.Flush() - c.Checkf(err, "flush") +func (xc *Composer) Flush() { + err := xc.bw.Flush() + xc.Checkf(err, "flush") } // Header writes a message header. -func (c *Composer) Header(k, v string) { - fmt.Fprintf(c, "%s: %s\r\n", k, v) +func (xc *Composer) Header(k, v string) { + fmt.Fprintf(xc, "%s: %s\r\n", k, v) } // NameAddress holds both an address display name, and an SMTP path address. @@ -82,7 +82,7 @@ type NameAddress struct { } // HeaderAddrs writes a message header with addresses. -func (c *Composer) HeaderAddrs(k string, l []NameAddress) { +func (xc *Composer) HeaderAddrs(k string, l []NameAddress) { if len(l) == 0 { return } @@ -93,7 +93,7 @@ func (c *Composer) HeaderAddrs(k string, l []NameAddress) { v += "," linelen++ } - addr := mail.Address{Name: a.DisplayName, Address: a.Address.Pack(c.SMTPUTF8)} + addr := mail.Address{Name: a.DisplayName, Address: a.Address.Pack(xc.SMTPUTF8)} s := addr.String() if v != "" && linelen+1+len(s) > 77 { v += "\r\n\t" @@ -105,16 +105,16 @@ func (c *Composer) HeaderAddrs(k string, l []NameAddress) { v += s linelen += len(s) } - fmt.Fprintf(c, "%s: %s\r\n", k, v) + fmt.Fprintf(xc, "%s: %s\r\n", k, v) } // Subject writes a subject message header. -func (c *Composer) Subject(subject string) { +func (xc *Composer) Subject(subject string) { var subjectValue string subjectLineLen := len("Subject: ") subjectWord := false for i, word := range strings.Split(subject, " ") { - if !c.SMTPUTF8 && !isASCII(word) { + if !xc.SMTPUTF8 && !isASCII(word) { word = mime.QEncoding.Encode("utf-8", word) } if i > 0 { @@ -129,19 +129,19 @@ func (c *Composer) Subject(subject string) { subjectLineLen += len(word) subjectWord = true } - c.Header("Subject", subjectValue) + xc.Header("Subject", subjectValue) } // Line writes an empty line. -func (c *Composer) Line() { - _, _ = c.Write([]byte("\r\n")) +func (xc *Composer) Line() { + _, _ = xc.Write([]byte("\r\n")) } // TextPart prepares a text part to be added. Text should contain lines terminated // with newlines (lf), which are replaced with crlf. The returned text may be // quotedprintable, if needed. The returned ct and cte headers are for use with // Content-Type and Content-Transfer-Encoding headers. -func (c *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte string) { +func (xc *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte string) { if !strings.HasSuffix(text, "\n") { text += "\n" } @@ -153,10 +153,10 @@ func (c *Composer) TextPart(subtype, text string) (textBody []byte, ct, cte stri if NeedsQuotedPrintable(text) { var sb strings.Builder _, err := io.Copy(quotedprintable.NewWriter(&sb), strings.NewReader(text)) - c.Checkf(err, "converting text to quoted printable") + xc.Checkf(err, "converting text to quoted printable") text = sb.String() cte = "quoted-printable" - } else if c.Has8bit || charset == "utf-8" { + } else if xc.Has8bit || charset == "utf-8" { cte = "8bit" } else { cte = "7bit" diff --git a/sendmail.go b/sendmail.go index 45b1a26..5706778 100644 --- a/sendmail.go +++ b/sendmail.go @@ -313,7 +313,7 @@ binary should be setgid that group: homedir, err := os.UserHomeDir() xcheckf(err, "finding homedir for storing message after failed delivery") maildir := filepath.Join(homedir, "moxsubmit.failures") - os.Mkdir(maildir, 0700) + os.Mkdir(maildir, 0700) // Exists is no problem, failure is found during create. f, err := os.CreateTemp(maildir, "newmsg.") xcheckf(err, "creating temp file for storing message after failed delivery") // note: not removing the partial file if writing/closing below fails. diff --git a/smtpserver/server.go b/smtpserver/server.go index ef76de7..6d2eb7e 100644 --- a/smtpserver/server.go +++ b/smtpserver/server.go @@ -323,14 +323,16 @@ type conn struct { origConn net.Conn conn net.Conn - tls bool - extRequireTLS bool // Whether to announce and allow the REQUIRETLS extension. - viaHTTPS bool // Whether the connection came in via the HTTPS port (using TLS ALPN). - resolver dns.Resolver - r *bufio.Reader - w *bufio.Writer - tr *moxio.TraceReader // Kept for changing trace level during cmd/auth/data. - tw *moxio.TraceWriter + tls bool + extRequireTLS bool // Whether to announce and allow the REQUIRETLS extension. + viaHTTPS bool // Whether the connection came in via the HTTPS port (using TLS ALPN). + resolver dns.Resolver + // The "x" in the readers and writes indicate Read and Write errors use panic to + // propagate the error. + xbr *bufio.Reader + xbw *bufio.Writer + xtr *moxio.TraceReader // Kept for changing trace level during cmd/auth/data. + xtw *moxio.TraceWriter slow bool // If set, reads are done with a 1 second sleep, and writes are done 1 byte at a time, to keep spammers busy. lastlog time.Time // Used for printing the delta time since the previous logging for this connection. submission bool // ../rfc/6409:19 applies @@ -692,12 +694,12 @@ func (c *conn) xcheckAuth() { func (c *conn) xtrace(level slog.Level) func() { c.xflush() - c.tr.SetTrace(level) - c.tw.SetTrace(level) + c.xtr.SetTrace(level) + c.xtw.SetTrace(level) return func() { c.xflush() - c.tr.SetTrace(mlog.LevelTrace) - c.tw.SetTrace(mlog.LevelTrace) + c.xtr.SetTrace(mlog.LevelTrace) + c.xtw.SetTrace(mlog.LevelTrace) } } @@ -780,7 +782,7 @@ func (c *conn) Read(buf []byte) (int, error) { var bufpool = moxio.NewBufpool(8, 2*1024) func (c *conn) readline() string { - line, err := bufpool.Readline(c.log, c.r) + line, err := bufpool.Readline(c.log, c.xbr) if err != nil && errors.Is(err, moxio.ErrLineTooLong) { c.writecodeline(smtp.C500BadSyntax, smtp.SeProto5Other0, "line too long, smtp max is 512, we reached 2048", nil) panic(fmt.Errorf("%s (%w)", err, errIO)) @@ -834,12 +836,12 @@ func (c *conn) bwritecodeline(code int, secode string, msg string, err error) { // Buffered-write a formatted response line to connection. func (c *conn) bwritelinef(format string, args ...any) { msg := fmt.Sprintf(format, args...) - fmt.Fprint(c.w, msg+"\r\n") + fmt.Fprint(c.xbw, msg+"\r\n") } // Flush pending buffered writes to connection. func (c *conn) xflush() { - c.w.Flush() // Errors will have caused a panic in Write. + c.xbw.Flush() // Errors will have caused a panic in Write. } // Write (with flush) a response line with codes and message. err is not written, used for logging and can be nil. @@ -919,10 +921,10 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C } return l }) - c.tr = moxio.NewTraceReader(c.log, "RC: ", c) - c.r = bufio.NewReader(c.tr) - c.tw = moxio.NewTraceWriter(c.log, "LS: ", c) - c.w = bufio.NewWriter(c.tw) + c.xtr = moxio.NewTraceReader(c.log, "RC: ", c) + c.xbr = bufio.NewReader(c.xtr) + c.xtw = moxio.NewTraceWriter(c.log, "LS: ", c) + c.xbw = bufio.NewWriter(c.xtw) metricConnection.WithLabelValues(c.kind()).Inc() c.log.Info("new connection", @@ -1007,9 +1009,9 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C // If another command is present, don't flush our buffered response yet. Holding // off will cause us to respond with a single packet. - n := c.r.Buffered() + n := c.xbr.Buffered() if n > 0 { - buf, err := c.r.Peek(n) + buf, err := c.xbr.Peek(n) if err == nil && bytes.IndexByte(buf, '\n') >= 0 { continue } @@ -1246,9 +1248,9 @@ func (c *conn) cmdStarttls(p *parser) { // but make sure any bytes already read and in the buffer are used for the TLS // handshake. conn := c.conn - if n := c.r.Buffered(); n > 0 { + if n := c.xbr.Buffered(); n > 0 { conn = &moxio.PrefixConn{ - PrefixReader: io.LimitReader(c.r, int64(n)), + PrefixReader: io.LimitReader(c.xbr, int64(n)), Conn: conn, } } @@ -2121,7 +2123,7 @@ func (c *conn) cmdData(p *parser) { } defer store.CloseRemoveTempFile(c.log, dataFile, "smtpserver delivered message") msgWriter := message.NewWriter(dataFile) - dr := smtp.NewDataReader(c.r) + dr := smtp.NewDataReader(c.xbr) n, err := io.Copy(&limitWriter{maxSize: c.maxMessageSize, w: msgWriter}, dr) c.xtrace(mlog.LevelTrace) // Restore. if err != nil { diff --git a/store/threads.go b/store/threads.go index a377f13..98a37f8 100644 --- a/store/threads.go +++ b/store/threads.go @@ -231,7 +231,7 @@ func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize in // Does not set Seen flag for muted threads. // // Progress is written to progressWriter, every 100k messages. -func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error { +func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, xprogressWriter io.Writer) error { // We use a more basic version of the thread-matching algorithm describe in: // ../rfc/5256:443 // The algorithm assumes you'll select messages, then group into threads. We normally do @@ -240,6 +240,9 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore // soon as we process them. We can handle large number of messages, but not very // quickly because we make lots of database queries. + // xprogressWriter can call panic on write errors, when assigning threads through a + // ctl command. + type childMsg struct { ID int64 // This message will be fetched and updated with the threading fields once the parent is resolved. MessageID string // Of child message. Once child is resolved, its own children can be resolved too. @@ -533,18 +536,18 @@ func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore nassigned += n if nassigned%100000 == 0 { log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending))) - if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil { + if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil { return fmt.Errorf("writing progress: %v", err) } } } - if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil { + if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil { return fmt.Errorf("writing progress: %v", err) } log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending))) - if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil { + if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil { return fmt.Errorf("writing progress: %v", err) }