fix data race in code for logging login attempts

logging of login attempts happens in the background, because we don't want to
block regular operation with disk since for such logging. however, when a line
is logged, we evaluate some attributes of a connection, notably the username.
but about when we do authentication, we change the username on a connection. so
we were reading and writing at the same time. this is now fixed by evaluating
the attributes before we pass off the logger to the goroutine.

found by the go race detector.
This commit is contained in:
Mechiel Lukkien 2025-02-19 15:23:19 +01:00
parent de6262b90a
commit cbe5bb235c
No known key found for this signature in database
8 changed files with 89 additions and 48 deletions

View File

@ -353,7 +353,6 @@ func TestAuthenticateTLSClientCert(t *testing.T) {
os.RemoveAll("../testdata/imap/data") os.RemoveAll("../testdata/imap/data")
err = store.Init(ctxbg) err = store.Init(ctxbg)
tcheck(t, err, "store init") tcheck(t, err, "store init")
mox.Context = ctxbg
mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf")
mox.MustLoadConfig(true, false) mox.MustLoadConfig(true, false)
switchStop := store.Switchboard() switchStop := store.Switchboard()

View File

@ -60,7 +60,6 @@ func FuzzServer(f *testing.F) {
} }
log := mlog.New("imapserver", nil) log := mlog.New("imapserver", nil)
mox.Context = ctxbg
mox.ConfigStaticPath = filepath.FromSlash("../testdata/imapserverfuzz/mox.conf") mox.ConfigStaticPath = filepath.FromSlash("../testdata/imapserverfuzz/mox.conf")
mox.MustLoadConfig(true, false) mox.MustLoadConfig(true, false)
dataDir := mox.ConfigDirPath(mox.Conf.Static.DataDir) dataDir := mox.ConfigDirPath(mox.Conf.Static.DataDir)

View File

@ -187,7 +187,7 @@ type conn struct {
cmdMetric string // Currently executing, for metrics. cmdMetric string // Currently executing, for metrics.
cmdStart time.Time cmdStart time.Time
ncmds int // Number of commands processed. Used to abort connection when first incoming command is unknown/invalid. ncmds int // Number of commands processed. Used to abort connection when first incoming command is unknown/invalid.
log mlog.Log log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine.
enabled map[capability]bool // All upper-case. enabled map[capability]bool // All upper-case.
// Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with // Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with
@ -415,6 +415,18 @@ func Serve() {
servers = nil servers = nil
} }
// Logbg returns a logger for logging in the background (in a goroutine), eg for
// logging LoginAttempts. The regular c.log has a handler that evaluates fields on
// the connection at time of logging, which may happen at the same time as
// modifications to those fields.
func (c *conn) logbg() mlog.Log {
log := mlog.New("imapserver", nil).WithCid(c.cid)
if c.username != "" {
log = log.With(slog.String("username", c.username))
}
return log
}
// returns whether this connection accepts utf-8 in strings. // returns whether this connection accepts utf-8 in strings.
func (c *conn) utf8strings() bool { func (c *conn) utf8strings() bool {
return c.enabled[capIMAP4rev2] || c.enabled[capUTF8Accept] return c.enabled[capIMAP4rev2] || c.enabled[capUTF8Accept]
@ -692,6 +704,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x
cmdStart: time.Now(), cmdStart: time.Now(),
} }
var logmutex sync.Mutex var logmutex sync.Mutex
// Also see (and possibly update) c.logbg, for logging in a goroutine.
c.log = mlog.New("imapserver", nil).WithFunc(func() []slog.Attr { c.log = mlog.New("imapserver", nil).WithFunc(func() []slog.Attr {
logmutex.Lock() logmutex.Lock()
defer logmutex.Unlock() defer logmutex.Unlock()
@ -818,7 +831,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x
// Ensure any pending loginAttempt is written before we stop. // Ensure any pending loginAttempt is written before we stop.
defer func() { defer func() {
if c.loginAttempt != nil { if c.loginAttempt != nil {
store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt)
c.loginAttempt = nil c.loginAttempt = nil
} }
}() }()
@ -834,7 +847,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x
if storeLoginAttempt { if storeLoginAttempt {
storeLoginAttempt = false storeLoginAttempt = false
if c.loginAttempt != nil { if c.loginAttempt != nil {
store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt)
c.loginAttempt = nil c.loginAttempt = nil
} }
} else if c.loginAttempt != nil { } else if c.loginAttempt != nil {
@ -853,7 +866,7 @@ func isClosed(err error) bool {
// filling in the results and other details. // filling in the results and other details.
func (c *conn) newLoginAttempt(useTLS bool, authMech string) { func (c *conn) newLoginAttempt(useTLS bool, authMech string) {
if c.loginAttempt != nil { if c.loginAttempt != nil {
store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt)
c.loginAttempt = nil c.loginAttempt = nil
} }
@ -941,6 +954,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error {
conn := c.conn.(*tls.Conn) conn := c.conn.(*tls.Conn)
la := *c.loginAttempt la := *c.loginAttempt
c.loginAttempt = nil c.loginAttempt = nil
logbg := c.logbg() // Evaluate attributes now, can't do it in goroutine.
go func() { go func() {
defer func() { defer func() {
// In case of panic don't take the whole program down. // In case of panic don't take the whole program down.
@ -954,7 +968,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error {
state := conn.ConnectionState() state := conn.ConnectionState()
la.TLS = store.LoginAttemptTLS(&state) la.TLS = store.LoginAttemptTLS(&state)
store.LoginAttemptAdd(context.Background(), c.log, la) store.LoginAttemptAdd(context.Background(), logbg, la)
}() }()
if la.Result == store.AuthSuccess { if la.Result == store.AuthSuccess {
@ -1773,7 +1787,7 @@ func (c *conn) cmdID(tag, cmd string, p *parser) {
// prepared the LoginAttempt and write it now. // prepared the LoginAttempt and write it now.
if c.loginAttempt != nil { if c.loginAttempt != nil {
c.loginAttempt.UserAgent = strings.Join(values, " ") c.loginAttempt.UserAgent = strings.Join(values, " ")
store.LoginAttemptAdd(context.Background(), c.log, *c.loginAttempt) store.LoginAttemptAdd(context.Background(), c.logbg(), *c.loginAttempt)
c.loginAttempt = nil c.loginAttempt = nil
} }

View File

@ -32,6 +32,8 @@ func init() {
// Don't slow down tests. // Don't slow down tests.
badClientDelay = 0 badClientDelay = 0
authFailDelay = 0 authFailDelay = 0
mox.Context = ctxbg
} }
func tocrlf(s string) string { func tocrlf(s string) string {
@ -350,7 +352,6 @@ func startArgs(t *testing.T, first, immediateTLS bool, allowLoginWithoutTLS, set
func startArgsMore(t *testing.T, first, immediateTLS bool, serverConfig, clientConfig *tls.Config, allowLoginWithoutTLS, noCloseSwitchboard, setPassword bool, accname string, afterInit func() error) *testconn { func startArgsMore(t *testing.T, first, immediateTLS bool, serverConfig, clientConfig *tls.Config, allowLoginWithoutTLS, noCloseSwitchboard, setPassword bool, accname string, afterInit func() error) *testconn {
limitersInit() // Reset rate limiters. limitersInit() // Reset rate limiters.
mox.Context = ctxbg
mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf")
mox.MustLoadConfig(true, false) mox.MustLoadConfig(true, false)
if first { if first {

View File

@ -338,7 +338,7 @@ type conn struct {
localIP net.IP localIP net.IP
remoteIP net.IP remoteIP net.IP
hostname dns.Domain hostname dns.Domain
log mlog.Log log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine.
maxMessageSize int64 maxMessageSize int64
requireTLSForAuth bool requireTLSForAuth bool
requireTLSForDelivery bool // If set, delivery is only allowed with TLS (STARTTLS), except if delivery is to a TLS reporting address. requireTLSForDelivery bool // If set, delivery is only allowed with TLS (STARTTLS), except if delivery is to a TLS reporting address.
@ -401,6 +401,18 @@ func isClosed(err error) bool {
return errors.Is(err, errIO) || moxio.IsClosed(err) return errors.Is(err, errIO) || moxio.IsClosed(err)
} }
// Logbg returns a logger for logging in the background (in a goroutine), eg for
// logging LoginAttempts. The regular c.log has a handler that evaluates fields on
// the connection at time of logging, which may happen at the same time as
// modifications to those fields.
func (c *conn) logbg() mlog.Log {
log := mlog.New("smtpserver", nil).WithCid(c.cid)
if c.username != "" {
log = log.With(slog.String("username", c.username))
}
return log
}
// loginAttempt initializes a store.LoginAttempt, for adding to the store after // loginAttempt initializes a store.LoginAttempt, for adding to the store after
// filling in the results and other details. // filling in the results and other details.
func (c *conn) loginAttempt(useTLS bool, authMech string) store.LoginAttempt { func (c *conn) loginAttempt(useTLS bool, authMech string) store.LoginAttempt {
@ -483,6 +495,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error {
// Get TLS connection state in goroutine because we are called while performing the // Get TLS connection state in goroutine because we are called while performing the
// TLS handshake, which already has the tls connection locked. // TLS handshake, which already has the tls connection locked.
conn := c.conn.(*tls.Conn) conn := c.conn.(*tls.Conn)
logbg := c.logbg() // Evaluate attributes now, can't do it in goroutine.
go func() { go func() {
defer func() { defer func() {
// In case of panic don't take the whole program down. // In case of panic don't take the whole program down.
@ -496,7 +509,7 @@ func (c *conn) tlsClientAuthVerifyPeerCertParsed(cert *x509.Certificate) error {
state := conn.ConnectionState() state := conn.ConnectionState()
la.TLS = store.LoginAttemptTLS(&state) la.TLS = store.LoginAttemptTLS(&state)
store.LoginAttemptAdd(context.Background(), c.log, la) store.LoginAttemptAdd(context.Background(), logbg, la)
}() }()
if la.Result == store.AuthSuccess { if la.Result == store.AuthSuccess {
@ -896,6 +909,7 @@ func serve(listenerName string, cid int64, hostname dns.Domain, tlsConfig *tls.C
firstTimeSenderDelay: firstTimeSenderDelay, firstTimeSenderDelay: firstTimeSenderDelay,
} }
var logmutex sync.Mutex var logmutex sync.Mutex
// Also see (and possibly update) c.logbg, for logging in a goroutine.
c.log = mlog.New("smtpserver", nil).WithFunc(func() []slog.Attr { c.log = mlog.New("smtpserver", nil).WithFunc(func() []slog.Attr {
logmutex.Lock() logmutex.Lock()
defer logmutex.Unlock() defer logmutex.Unlock()
@ -1294,7 +1308,7 @@ func (c *conn) cmdAuth(p *parser) {
la := c.loginAttempt(true, "") la := c.loginAttempt(true, "")
defer func() { defer func() {
store.LoginAttemptAdd(context.Background(), c.log, la) store.LoginAttemptAdd(context.Background(), c.logbg(), la)
if la.Result == store.AuthSuccess { if la.Result == store.AuthSuccess {
mox.LimiterFailedAuth.Reset(c.remoteIP, time.Now()) mox.LimiterFailedAuth.Reset(c.remoteIP, time.Now())
} else if !missingDerivedSecrets { } else if !missingDerivedSecrets {

View File

@ -21,7 +21,7 @@ import (
var AuthDB *bstore.DB var AuthDB *bstore.DB
var AuthDBTypes = []any{TLSPublicKey{}, LoginAttempt{}, LoginAttemptState{}} var AuthDBTypes = []any{TLSPublicKey{}, LoginAttempt{}, LoginAttemptState{}}
// Init opens auth.db. // Init opens auth.db and starts the login writer.
func Init(ctx context.Context) error { func Init(ctx context.Context) error {
if AuthDB != nil { if AuthDB != nil {
return fmt.Errorf("already initialized") return fmt.Errorf("already initialized")
@ -36,7 +36,7 @@ func Init(ctx context.Context) error {
return err return err
} }
startLoginAttemptWriter(ctx) startLoginAttemptWriter()
go func() { go func() {
defer func() { defer func() {
@ -67,12 +67,18 @@ func Init(ctx context.Context) error {
return nil return nil
} }
// Close closes auth.db. // Close closes auth.db and stops the login writer.
func Close() error { func Close() error {
if AuthDB == nil { if AuthDB == nil {
return fmt.Errorf("not open") return fmt.Errorf("not open")
} }
stopc := make(chan struct{})
writeLoginAttemptStop <- stopc
<-stopc
err := AuthDB.Close() err := AuthDB.Close()
AuthDB = nil AuthDB = nil
return err return err
} }

View File

@ -14,7 +14,6 @@ import (
"github.com/mjl-/mox/metrics" "github.com/mjl-/mox/metrics"
"github.com/mjl-/mox/mlog" "github.com/mjl-/mox/mlog"
"github.com/mjl-/mox/mox-"
) )
var loginAttemptsMaxPerAccount = 10 * 1000 // Lower during tests. var loginAttemptsMaxPerAccount = 10 * 1000 // Lower during tests.
@ -101,16 +100,35 @@ const (
) )
var writeLoginAttempt chan LoginAttempt var writeLoginAttempt chan LoginAttempt
var writeLoginAttemptStopped chan struct{} // For synchronizing with tests. var writeLoginAttemptStop chan chan struct{}
func startLoginAttemptWriter(ctx context.Context) { func startLoginAttemptWriter() {
writeLoginAttempt = make(chan LoginAttempt, 100) writeLoginAttempt = make(chan LoginAttempt, 100)
writeLoginAttemptStopped = make(chan struct{}, 1) writeLoginAttemptStop = make(chan chan struct{})
process := func(la *LoginAttempt) {
var l []LoginAttempt
if la != nil {
l = []LoginAttempt{*la}
}
// Gather all that we can write now.
All:
for {
select {
case xla := <-writeLoginAttempt:
l = append(l, xla)
default:
break All
}
}
if len(l) > 0 {
loginAttemptWrite(l...)
}
}
go func() { go func() {
defer func() { defer func() {
writeLoginAttemptStopped <- struct{}{}
x := recover() x := recover()
if x == nil { if x == nil {
return return
@ -121,26 +139,15 @@ func startLoginAttemptWriter(ctx context.Context) {
metrics.PanicInc(metrics.Store) metrics.PanicInc(metrics.Store)
}() }()
done := ctx.Done()
for { for {
select { select {
case <-done: case stopc := <-writeLoginAttemptStop:
process(nil)
stopc <- struct{}{}
return return
case la := <-writeLoginAttempt: case la := <-writeLoginAttempt:
l := []LoginAttempt{la} process(&la)
// Gather all that we can write now.
All:
for {
select {
case la = <-writeLoginAttempt:
l = append(l, la)
default:
break All
}
}
loginAttemptWrite(l...)
} }
} }
}() }()
@ -157,15 +164,9 @@ func LoginAttemptAdd(ctx context.Context, log mlog.Log, a LoginAttempt) {
metrics.AuthenticationInc(a.Protocol, a.AuthMech, string(a.Result)) metrics.AuthenticationInc(a.Protocol, a.AuthMech, string(a.Result))
a.log = log a.log = log
select {
case <-mox.Context.Done():
// During shutdown, don't return before writing.
loginAttemptWrite(a)
default:
// Send login attempt to writer. Only blocks if there are lots of login attempts. // Send login attempt to writer. Only blocks if there are lots of login attempts.
writeLoginAttempt <- a writeLoginAttempt <- a
} }
}
func loginAttemptWrite(l ...LoginAttempt) { func loginAttemptWrite(l ...LoginAttempt) {
// Log on the way out, for "count" fetched from database. // Log on the way out, for "count" fetched from database.

View File

@ -17,12 +17,19 @@ func TestLoginAttempt(t *testing.T) {
mox.MustLoadConfig(true, false) mox.MustLoadConfig(true, false)
xctx, xcancel := context.WithCancel(ctxbg) xctx, xcancel := context.WithCancel(ctxbg)
defer xcancel() // Stop clearing of LoginAttempts.
err := Init(xctx) err := Init(xctx)
tcheck(t, err, "store init") tcheck(t, err, "store init")
// Stop the background LoginAttempt writer for synchronous tests. stopc := make(chan struct{})
xcancel() writeLoginAttemptStop <- stopc
<-writeLoginAttemptStopped <-stopc
defer func() { defer func() {
// Ensure Close() below finishes
go func() {
c := <-writeLoginAttemptStop
c <- struct{}{}
}()
err := Close() err := Close()
tcheck(t, err, "store close") tcheck(t, err, "store close")
}() }()