diff --git a/imapclient/client.go b/imapclient/client.go index 8a28e1f..b5a5513 100644 --- a/imapclient/client.go +++ b/imapclient/client.go @@ -22,8 +22,6 @@ import ( "reflect" "strings" - "github.com/mjl-/flate" - "github.com/mjl-/mox/mlog" "github.com/mjl-/mox/moxio" ) @@ -34,10 +32,11 @@ type Conn struct { // writes through c.bw. It wraps a tracing reading/writer and may wrap flate // compression. conn net.Conn + connBroken bool // If connection is broken, we won't flush (and write) again. br *bufio.Reader bw *bufio.Writer compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer. - flateWriter *flate.Writer + flateWriter *moxio.FlateWriter flateBW *bufio.Writer log mlog.Log @@ -146,11 +145,19 @@ func (c *Conn) Write(buf []byte) (n int, rerr error) { defer c.recover(&rerr) n, rerr = c.conn.Write(buf) + if rerr != nil { + c.connBroken = true + } c.xcheckf(rerr, "write") return n, nil } func (c *Conn) xflush() { + // Not writing any more when connection is broken. + if c.connBroken { + return + } + err := c.bw.Flush() c.xcheckf(err, "flush") @@ -173,7 +180,7 @@ func (c *Conn) Close() (rerr error) { if c.conn == nil { return nil } - if c.flateWriter != nil { + if !c.connBroken && c.flateWriter != nil { err := c.flateWriter.Close() c.xcheckf(err, "close deflate writer") err = c.flateBW.Flush() diff --git a/imapclient/cmds.go b/imapclient/cmds.go index 86f6bfa..bd472ec 100644 --- a/imapclient/cmds.go +++ b/imapclient/cmds.go @@ -140,8 +140,9 @@ func (c *Conn) CompressDeflate() (untagged []Untagged, result Result, rerr error c.xcheck(rerr) c.flateBW = bufio.NewWriter(c) - fw, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) + fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) c.xcheckf(err, "deflate") // Cannot happen. + fw := moxio.NewFlateWriter(fw0) c.compress = true c.flateWriter = fw diff --git a/imapserver/compress_test.go b/imapserver/compress_test.go index 55b362a..8f4c716 100644 --- a/imapserver/compress_test.go +++ b/imapserver/compress_test.go @@ -2,7 +2,11 @@ package imapserver import ( "crypto/tls" + "encoding/base64" + "io" + mathrand "math/rand/v2" "testing" + "time" ) func TestCompress(t *testing.T) { @@ -37,3 +41,42 @@ func TestCompressStartTLS(t *testing.T) { tc.transactf("ok", "noop") tc.transactf("ok", "fetch 1 body.peek[1]") } + +func TestCompressBreak(t *testing.T) { + // Close the client connection when the server is writing. That causes writes in + // the server to fail (panic), jumping out of the flate writer and leaving its + // state inconsistent. We must not call into the flate writer again because due to + // its broken internal state it may cause array out of bounds accesses. + + tc := start(t) + defer tc.close() + + msg := exampleMsg + // Add random data (so it is not compressible). Don't know why, but only + // reproducible with large writes. As if setting socket buffers had no effect. + buf := make([]byte, 64*1024) + _, err := io.ReadFull(mathrand.NewChaCha8([32]byte{}), buf) + tcheck(t, err, "read random") + text := base64.StdEncoding.EncodeToString(buf) + for len(text) > 0 { + n := min(78, len(text)) + msg += text[:n] + "\r\n" + text = text[n:] + } + + tc.client.Login("mjl@mox.example", password0) + tc.client.CompressDeflate() + tc.client.Select("inbox") + tc.transactf("ok", "append inbox (\\seen) {%d+}\r\n%s", len(msg), msg) + tc.transactf("ok", "noop") + + // Write request. Close connection instead of reading data. Write will panic, + // coming through flate writer leaving its state inconsistent. Server must not try + // to Flush/Write again on flate writer or it may panic. + tc.client.Writelinef("x fetch 1 body.peek[1]") + + // Close client connection and prevent cleanup from closing the client again. + time.Sleep(time.Second / 10) + tc.client = nil + tc.conn.Close() // Simulate client disappearing. +} diff --git a/imapserver/replace.go b/imapserver/replace.go index 29f999f..98738f6 100644 --- a/imapserver/replace.go +++ b/imapserver/replace.go @@ -211,10 +211,10 @@ func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) { c.xtrace(mlog.LevelTrace) // Restore. if err != nil { // Cannot use xcheckf due to %w handling of errIO. - xserverErrorf("reading literal message: %s (%w)", err, errIO) + c.xbrokenf("reading literal message: %s (%w)", err, errIO) } if msize != size { - xserverErrorf("read %d bytes for message, expected %d (%w)", msize, size, errIO) + c.xbrokenf("read %d bytes for message, expected %d (%w)", msize, size, errIO) } // Finish reading the command. diff --git a/imapserver/server.go b/imapserver/server.go index 9b42a0a..a1695e2 100644 --- a/imapserver/server.go +++ b/imapserver/server.go @@ -56,6 +56,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "golang.org/x/exp/maps" @@ -102,6 +103,8 @@ var ( ) ) +var unhandledPanics atomic.Int64 // For tests. + var limiterConnectionrate, limiterConnections *ratelimit.Limiter func init() { @@ -177,6 +180,7 @@ type conn struct { cid int64 state state conn net.Conn + connBroken bool // Once broken, we won't flush any more data. tls bool // Whether TLS has been initialized. viaHTTPS bool // Whether this connection came in via HTTPS (using TLS ALPN). br *bufio.Reader // From remote, with TLS unwrapped in case of TLS, and possibly wrapping inflate. @@ -197,7 +201,7 @@ type conn struct { log mlog.Log // Used for all synchronous logging on this connection, see logbg for logging in a separate goroutine. enabled map[capability]bool // All upper-case. compress bool // Whether compression is enabled, via compress command. - flateWriter *flate.Writer // For flushing output after flushing conn.bw, and for closing. + flateWriter *moxio.FlateWriter // For flushing output after flushing conn.bw, and for closing. flateBW *bufio.Writer // Wraps raw connection writes, flateWriter writes here, also needs flushing. // Set by SEARCH with SAVE. Can be used by commands accepting a sequence-set with @@ -343,6 +347,11 @@ func (c *conn) xsanity(err error, format string, args ...any) { c.log.Errorx(fmt.Sprintf(format, args...), err) } +func (c *conn) xbrokenf(format string, args ...any) { + c.connBroken = true + panic(fmt.Errorf(format, args...)) +} + type msgseq uint32 // Listen initializes all imap listeners for the configuration, and stores them for Serve to start them. @@ -499,7 +508,7 @@ func (c *conn) Write(buf []byte) (int, error) { nn, err := c.conn.Write(buf[:chunk]) if err != nil { - panic(fmt.Errorf("write: %s (%w)", err, errIO)) + c.xbrokenf("write: %s (%w)", err, errIO) } n += nn buf = buf[chunk:] @@ -542,6 +551,7 @@ func (c *conn) readline0() (string, error) { if err != nil && errors.Is(err, moxio.ErrLineTooLong) { return "", fmt.Errorf("%s (%w)", err, errProtocol) } else if err != nil { + c.connBroken = true return "", fmt.Errorf("%s (%w)", err, errIO) } return line, nil @@ -576,7 +586,7 @@ func (c *conn) readline(readCmd bool) string { c.writelinef("* BYE inactive") } if !errors.Is(err, errIO) && !errors.Is(err, errProtocol) { - err = fmt.Errorf("%s (%w)", err, errIO) + c.xbrokenf("%s (%w)", err, errIO) } panic(err) } @@ -628,6 +638,11 @@ func (c *conn) bwritelinef(format string, args ...any) { } func (c *conn) xflush() { + // If the connection is already broken, we're not going to write more. + if c.connBroken { + return + } + err := c.bw.Flush() xcheckf(err, "flush") // Should never happen, the Write caused by the Flush should panic on i/o error. @@ -668,8 +683,7 @@ func (c *conn) xreadliteral(size int64, sync bool) []byte { _, err := io.ReadFull(c.br, buf) if err != nil { - // Cannot use xcheckf due to %w handling of errIO. - panic(fmt.Errorf("reading literal: %s (%w)", err, errIO)) + c.xbrokenf("reading literal: %s (%w)", err, errIO) } } return buf @@ -780,6 +794,7 @@ func serve(listenerName string, cid int64, tlsConfig *tls.Config, nc net.Conn, x c.log.Error("unhandled panic", slog.Any("err", x)) debug.PrintStack() metrics.PanicInc(metrics.Imapserver) + unhandledPanics.Add(1) // For tests. } }() @@ -1067,7 +1082,7 @@ func (c *conn) xtlsHandshakeAndAuthenticate(conn net.Conn) { defer cancel() c.log.Debug("starting tls server handshake") if err := tlsConn.HandshakeContext(ctx); err != nil { - panic(fmt.Errorf("tls handshake: %s (%w)", err, errIO)) + c.xbrokenf("tls handshake: %s (%w)", err, errIO) } cancel() @@ -1076,8 +1091,8 @@ func (c *conn) xtlsHandshakeAndAuthenticate(conn net.Conn) { // Verify client after session resumption. err := c.tlsClientAuthVerifyPeerCertParsed(cs.PeerCertificates[0]) if err != nil { - c.bwritelinef("* BYE [ALERT] Error verifying client certificate after TLS session resumption: %s", err) - panic(fmt.Errorf("tls verify client certificate after resumption: %s (%w)", err, errIO)) + c.writelinef("* BYE [ALERT] Error verifying client certificate after TLS session resumption: %s", err) + c.xbrokenf("tls verify client certificate after resumption: %s (%w)", err, errIO) } } @@ -1162,7 +1177,7 @@ func (c *conn) command() { // stop processing because there is a good chance whatever they sent has multiple // lines. c.writelinef("* BYE please try again speaking imap") - panic(errIO) + c.xbrokenf("not speaking imap (%w)", errIO) } c.log.Debugx("imap command syntax error", sxerr.err, logFields...) c.log.Info("imap syntax error", slog.String("lastline", c.lastLine)) @@ -1215,7 +1230,7 @@ func (c *conn) command() { case <-mox.Shutdown.Done(): // ../rfc/9051:5375 c.writelinef("* BYE shutting down") - panic(errIO) + c.xbrokenf("shutting down (%w)", errIO) default: } @@ -1851,8 +1866,9 @@ func (c *conn) cmdCompress(tag, cmd string, p *parser) { c.ok(tag, cmd) c.flateBW = bufio.NewWriter(c) - fw, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) + fw0, err := flate.NewWriter(c.flateBW, flate.DefaultCompression) xcheckf(err, "deflate") // Cannot happen. + fw := moxio.NewFlateWriter(fw0) c.compress = true c.flateWriter = fw @@ -3452,10 +3468,10 @@ func (c *conn) cmdAppend(tag, cmd string, p *parser) { c.xtrace(mlog.LevelTrace) // Restore. if err != nil { // Cannot use xcheckf due to %w handling of errIO. - panic(fmt.Errorf("reading literal message: %s (%w)", err, errIO)) + c.xbrokenf("reading literal message: %s (%w)", err, errIO) } if msize != size { - xserverErrorf("read %d bytes for message, expected %d (%w)", msize, size, errIO) + c.xbrokenf("read %d bytes for message, expected %d (%w)", msize, size, errIO) } totalSize += msize @@ -3610,7 +3626,7 @@ wait: case <-mox.Shutdown.Done(): // ../rfc/9051:5375 c.writelinef("* BYE shutting down") - panic(errIO) + c.xbrokenf("shutting down (%w)", errIO) } } @@ -3621,7 +3637,7 @@ wait: if strings.ToUpper(line) != "DONE" { // We just close the connection because our protocols are out of sync. - panic(fmt.Errorf("%w: in IDLE, expected DONE", errIO)) + c.xbrokenf("%w: in IDLE, expected DONE", errIO) } c.ok(tag, cmd) diff --git a/imapserver/server_test.go b/imapserver/server_test.go index 62d1539..7f81e01 100644 --- a/imapserver/server_test.go +++ b/imapserver/server_test.go @@ -309,6 +309,12 @@ func (tc *testconn) waitDone() { } func (tc *testconn) close() { + defer func() { + if unhandledPanics.Swap(0) > 0 { + tc.t.Fatalf("handled panic in server") + } + }() + if tc.account == nil { // Already closed, we are not strict about closing multiple times. return @@ -317,7 +323,9 @@ func (tc *testconn) close() { tc.check(err, "close account") // no account.CheckClosed(), the tests open accounts multiple times. tc.account = nil - tc.client.Close() + if tc.client != nil { + tc.client.Close() + } tc.serverConn.Close() tc.waitDone() if tc.switchStop != nil { @@ -381,9 +389,9 @@ func (c namedConn) RemoteAddr() net.Addr { func startArgsMore(t *testing.T, first, immediateTLS bool, serverConfig, clientConfig *tls.Config, allowLoginWithoutTLS, noCloseSwitchboard, setPassword bool, accname string, afterInit func() error) *testconn { limitersInit() // Reset rate limiters. - mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") - mox.MustLoadConfig(true, false) if first { + mox.ConfigStaticPath = filepath.FromSlash("../testdata/imap/mox.conf") + mox.MustLoadConfig(true, false) store.Close() // May not be open, we ignore error. os.RemoveAll("../testdata/imap/data") err := store.Init(ctxbg) @@ -418,7 +426,15 @@ func startArgsMore(t *testing.T, first, immediateTLS bool, serverConfig, clientC tcheck(t, err, "fileconn") err = f.Close() tcheck(t, err, "close file for conn") - return namedConn{fc} + + // Small read/write buffers, for detecting closed/broken connections quickly. + uc := fc.(*net.UnixConn) + err = uc.SetReadBuffer(512) + tcheck(t, err, "set read buffer") + uc.SetWriteBuffer(512) + tcheck(t, err, "set write buffer") + + return namedConn{uc} } serverConn := xfdconn(fds[0], "server") clientConn := xfdconn(fds[1], "client") diff --git a/moxio/flatewriter.go b/moxio/flatewriter.go new file mode 100644 index 0000000..27fe9ac --- /dev/null +++ b/moxio/flatewriter.go @@ -0,0 +1,48 @@ +package moxio + +import ( + "github.com/mjl-/flate" +) + +// FlateWriter wraps a flate.Writer and ensures no Write/Flush/Close calls are made +// again on the underlying flate writer when a panic came out of the flate writer +// (e.g. raised by the destination writer of the flate writer). After a panic +// "through" a flate.Writer, its state is inconsistent and further calls could +// panic with out of bounds slice accesses. +type FlateWriter struct { + w *flate.Writer + panic any +} + +func NewFlateWriter(w *flate.Writer) *FlateWriter { + return &FlateWriter{w, nil} +} + +func (w *FlateWriter) checkBroken() func() { + if w.panic != nil { + panic(w.panic) + } + return func() { + x := recover() + if x == nil { + return + } + w.panic = x + panic(x) + } +} + +func (w *FlateWriter) Write(data []byte) (int, error) { + defer w.checkBroken()() + return w.w.Write(data) +} + +func (w *FlateWriter) Flush() error { + defer w.checkBroken()() + return w.w.Flush() +} + +func (w *FlateWriter) Close() error { + defer w.checkBroken()() + return w.w.Close() +}