diff --git a/imapserver/authenticate_test.go b/imapserver/authenticate_test.go index 6011bf7..2cb26f3 100644 --- a/imapserver/authenticate_test.go +++ b/imapserver/authenticate_test.go @@ -353,7 +353,6 @@ func TestAuthenticateTLSClientCert(t *testing.T) { os.RemoveAll("../testdata/imap/data") err = store.Init(ctxbg) tcheck(t, err, "store init") - mox.Context = ctxbg mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") mox.MustLoadConfig(true, false) switchStop := store.Switchboard() diff --git a/imapserver/fuzz_test.go b/imapserver/fuzz_test.go index 3a3f573..81e4d17 100644 --- a/imapserver/fuzz_test.go +++ b/imapserver/fuzz_test.go @@ -60,7 +60,6 @@ func FuzzServer(f *testing.F) { } log := mlog.New("imapserver", nil) - mox.Context = ctxbg mox.ConfigStaticPath = filepath.FromSlash("../testdata/imapserverfuzz/mox.conf") mox.MustLoadConfig(true, false) dataDir := mox.ConfigDirPath(mox.Conf.Static.DataDir) diff --git a/imapserver/server.go b/imapserver/server.go index 24190c4..cf3162b 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -186,8 +186,8 @@ type conn struct { cmd string // Currently executing, for deciding to applyChanges and logging. cmdMetric string // Currently executing, for metrics. cmdStart time.Time - ncmds int // Number of commands processed. Used to abort connection when first incoming command is unknown/invalid. - log mlog.Log + ncmds int // Number of commands processed. Used to abort connection when first incoming command is unknown/invalid. + log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine. enabled map[capability]bool // All upper-case. // Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with @@ -415,6 +415,18 @@ func Serve() { servers = nil } +// Logbg returns a logger for logging in the background (in a goroutine), eg for +// logging LoginAttempts. The regular c.log has a handler that evaluates fields on +// the connection at time of logging, which may happen at the same time as +// modifications to those fields. +func (c *conn) logbg() mlog.Log { + log := mlog.New("imapserver", nil).WithCid(c.cid) + if c.username != "" { + log = log.With(slog.String("username", c.username)) + } + return log +} + // returns whether this connection accepts utf-8 in strings. func (c *conn) utf8strings() bool { return c.enabled[capIMAP4rev2] || c.enabled[capUTF8Accept] @@ -692,6 +704,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x cmdStart: time.Now(), } var logmutex sync.Mutex + // Also see (and possibly update) c.logbg, for logging in a goroutine. c.log = mlog.New("imapserver", nil).WithFunc(func() []slog.Attr { logmutex.Lock() defer logmutex.Unlock() @@ -818,7 +831,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x // Ensure any pending loginAttempt is written before we stop. defer func() { if c.loginAttempt != nil { - store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) + store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt) c.loginAttempt = nil } }() @@ -834,7 +847,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x if storeLoginAttempt { storeLoginAttempt = false if c.loginAttempt != nil { - store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) + store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt) c.loginAttempt = nil } } else if c.loginAttempt != nil { @@ -853,7 +866,7 @@ func isClosed(err error) bool { // filling in the results and other details. func (c *conn) newLoginAttempt(useTLS bool, authMech string) { if c.loginAttempt != nil { - store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) + store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt) c.loginAttempt = nil } @@ -941,6 +954,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error { conn := c.conn.(*tls.Conn) la := *c.loginAttempt c.loginAttempt = nil + logbg := c.logbg() // Evaluate attributes now, can't do it in goroutine. go func() { defer func() { // In case of panic don't take the whole program down. @@ -954,7 +968,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error { state := conn.ConnectionState() la.TLS = store.LoginAttemptTLS(&state) - store.LoginAttemptAdd(context.Background(), c.log, la) + store.LoginAttemptAdd(context.Background(), logbg, la) }() if la.Result == store.AuthSuccess { @@ -1773,7 +1787,7 @@ func (c *conn) cmdID(tag, cmd string, p *parser) { // prepared the LoginAttempt and write it now. if c.loginAttempt != nil { c.loginAttempt.UserAgent = strings.Join(values, " ") - store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) + store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt) c.loginAttempt = nil } diff --git a/imapserver/server_test.go b/imapserver/server_test.go index 66aa8b9..12af27f 100644 --- a/imapserver/server_test.go +++ b/imapserver/server_test.go @@ -32,6 +32,8 @@ func init() { // Don't slow down tests. badClientDelay = 0 authFailDelay = 0 + + mox.Context = ctxbg } func tocrlf(s string) string { @@ -350,7 +352,6 @@ func startArgs(t *testing.T, first, immediateTLS bool, allowLoginWithoutTLS, set func startArgsMore(t *testing.T, first, immediateTLS bool, serverConfig, clientConfig *tls.Config, allowLoginWithoutTLS, noCloseSwitchboard, setPassword bool, accname string, afterInit func() error) *testconn { limitersInit() // Reset rate limiters. - mox.Context = ctxbg mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") mox.MustLoadConfig(true, false) if first { diff --git a/smtpserver/server.go b/smtpserver/server.go index 8d3eded..122a935 100644 --- a/smtpserver/server.go +++ b/smtpserver/server.go @@ -338,7 +338,7 @@ type conn struct { localIP net.IP remoteIP net.IP hostname dns.Domain - log mlog.Log + log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine. maxMessageSize int64 requireTLSForAuth bool requireTLSForDelivery bool // If set, delivery is only allowed with TLS (STARTTLS), except if delivery is to a TLS reporting address. @@ -401,6 +401,18 @@ func isClosed(err error) bool { return errors.Is(err, errIO) || moxio.IsClosed(err) } +// Logbg returns a logger for logging in the background (in a goroutine), eg for +// logging LoginAttempts. The regular c.log has a handler that evaluates fields on +// the connection at time of logging, which may happen at the same time as +// modifications to those fields. +func (c *conn) logbg() mlog.Log { + log := mlog.New("smtpserver", nil).WithCid(c.cid) + if c.username != "" { + log = log.With(slog.String("username", c.username)) + } + return log +} + // loginAttempt initializes a store.LoginAttempt, for adding to the store after // filling in the results and other details. func (c *conn) loginAttempt(useTLS bool, authMech string) store.LoginAttempt { @@ -483,6 +495,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error { // Get TLS connection state in goroutine because we are called while performing the // TLS handshake, which already has the tls connection locked. conn := c.conn.(*tls.Conn) + logbg := c.logbg() // Evaluate attributes now, can't do it in goroutine. go func() { defer func() { // In case of panic don't take the whole program down. @@ -496,7 +509,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error { state := conn.ConnectionState() la.TLS = store.LoginAttemptTLS(&state) - store.LoginAttemptAdd(context.Background(), c.log, la) + store.LoginAttemptAdd(context.Background(), logbg, la) }() if la.Result == store.AuthSuccess { @@ -896,6 +909,7 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C firstTimeSenderDelay: firstTimeSenderDelay, } var logmutex sync.Mutex + // Also see (and possibly update) c.logbg, for logging in a goroutine. c.log = mlog.New("smtpserver", nil).WithFunc(func() []slog.Attr { logmutex.Lock() defer logmutex.Unlock() @@ -1294,7 +1308,7 @@ func (c *conn) cmdAuth(p *parser) { la := c.loginAttempt(true, "") defer func() { - store.LoginAttemptAdd(context.Background(), c.log, la) + store.LoginAttemptAdd(context.Background(), c.logbg(), la) if la.Result == store.AuthSuccess { mox.LimiterFailedAuth.Reset(c.remoteIP, time.Now()) } else if !missingDerivedSecrets { diff --git a/store/init.go b/store/init.go index 343288a..8d55912 100644 --- a/store/init.go +++ b/store/init.go @@ -21,7 +21,7 @@ import ( var AuthDB *bstore.DB var AuthDBTypes = []any{TLSPublicKey{}, LoginAttempt{}, LoginAttemptState{}} -// Init opens auth.db. +// Init opens auth.db and starts the login writer. func Init(ctx context.Context) error { if AuthDB != nil { return fmt.Errorf("already initialized") @@ -36,7 +36,7 @@ func Init(ctx context.Context) error { return err } - startLoginAttemptWriter(ctx) + startLoginAttemptWriter() go func() { defer func() { @@ -67,12 +67,18 @@ func Init(ctx context.Context) error { return nil } -// Close closes auth.db. +// Close closes auth.db and stops the login writer. func Close() error { if AuthDB == nil { return fmt.Errorf("not open") } + + stopc := make(chan struct{}) + writeLoginAttemptStop <- stopc + <-stopc + err := AuthDB.Close() AuthDB = nil + return err } diff --git a/store/loginattempt.go b/store/loginattempt.go index 0c81f1f..0cda44c 100644 --- a/store/loginattempt.go +++ b/store/loginattempt.go @@ -14,7 +14,6 @@ import ( "github.com/mjl-/mox/metrics" "github.com/mjl-/mox/mlog" - "github.com/mjl-/mox/mox-" ) var loginAttemptsMaxPerAccount = 10 * 1000 // Lower during tests. @@ -101,16 +100,35 @@ const ( ) var writeLoginAttempt chan LoginAttempt -var writeLoginAttemptStopped chan struct{} // For synchronizing with tests. +var writeLoginAttemptStop chan chan struct{} -func startLoginAttemptWriter(ctx context.Context) { +func startLoginAttemptWriter() { writeLoginAttempt = make(chan LoginAttempt, 100) - writeLoginAttemptStopped = make(chan struct{}, 1) + writeLoginAttemptStop = make(chan chan struct{}) + + process := func(la *LoginAttempt) { + var l []LoginAttempt + if la != nil { + l = []LoginAttempt{*la} + } + // Gather all that we can write now. + All: + for { + select { + case xla := <-writeLoginAttempt: + l = append(l, xla) + default: + break All + } + } + + if len(l) > 0 { + loginAttemptWrite(l...) + } + } go func() { defer func() { - writeLoginAttemptStopped <- struct{}{} - x := recover() if x == nil { return @@ -121,26 +139,15 @@ func startLoginAttemptWriter(ctx context.Context) { metrics.PanicInc(metrics.Store) }() - done := ctx.Done() for { select { - case <-done: + case stopc := <-writeLoginAttemptStop: + process(nil) + stopc <- struct{}{} return case la := <-writeLoginAttempt: - l := []LoginAttempt{la} - // Gather all that we can write now. - All: - for { - select { - case la = <-writeLoginAttempt: - l = append(l, la) - default: - break All - } - } - - loginAttemptWrite(l...) + process(&la) } } }() @@ -157,14 +164,8 @@ func LoginAttemptAdd(ctx context.Context, log mlog.Log, a LoginAttempt) { metrics.AuthenticationInc(a.Protocol, a.AuthMech, string(a.Result)) a.log = log - select { - case <-mox.Context.Done(): - // During shutdown, don't return before writing. - loginAttemptWrite(a) - default: - // Send login attempt to writer. Only blocks if there are lots of login attempts. - writeLoginAttempt <- a - } + // Send login attempt to writer. Only blocks if there are lots of login attempts. + writeLoginAttempt <- a } func loginAttemptWrite(l ...LoginAttempt) { diff --git a/store/loginattempt_test.go b/store/loginattempt_test.go index da25d61..26c8a5a 100644 --- a/store/loginattempt_test.go +++ b/store/loginattempt_test.go @@ -17,12 +17,19 @@ func TestLoginAttempt(t *testing.T) { mox.MustLoadConfig(true, false) xctx, xcancel := context.WithCancel(ctxbg) + defer xcancel() // Stop clearing of LoginAttempts. err := Init(xctx) tcheck(t, err, "store init") - // Stop the background LoginAttempt writer for synchronous tests. - xcancel() - <-writeLoginAttemptStopped + stopc := make(chan struct{}) + writeLoginAttemptStop <- stopc + <-stopc defer func() { + // Ensure Close() below finishes + go func() { + c := <-writeLoginAttemptStop + c <- struct{}{} + }() + err := Close() tcheck(t, err, "store close") }()