mirror of
https://github.com/mjl-/mox.git
synced 2025-07-12 18:24:35 +03:00
implement IMAP extension COMPRESS=DEFLATE, rfc 4978
to compress the entire IMAP connection. tested with thunderbird, meli, k9, ios mail. the initial implementation had interoperability issues with some of these clients: if they write the deflate stream and flush in "partial mode", the go stdlib flate reader does not return any data (until there is an explicit zero-length "sync flush" block, or until the history/sliding window is full), blocking progress, resulting in clients closing the seemingly stuck connection after considering the connection timed out. this includes a coy of the flate package with a new reader that returns partially flushed blocks earlier. this also adds imap trace logging to imapclient.Conn, which was useful for debugging.
This commit is contained in:
@ -21,12 +21,26 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/mjl-/flate"
|
||||
|
||||
"github.com/mjl-/mox/mlog"
|
||||
"github.com/mjl-/mox/moxio"
|
||||
)
|
||||
|
||||
// Conn is an IMAP connection to a server.
|
||||
type Conn struct {
|
||||
conn net.Conn
|
||||
r *bufio.Reader
|
||||
// Connection, may be original TCP or TLS connection. Reads go through c.br, and
|
||||
// writes through c.bw. It wraps a tracing reading/writer and may wrap flate
|
||||
// compression.
|
||||
conn net.Conn
|
||||
br *bufio.Reader
|
||||
bw *bufio.Writer
|
||||
compress bool // If compression is enabled, we must flush flateWriter and its target original bufio writer.
|
||||
flateWriter *flate.Writer
|
||||
flateBW *bufio.Writer
|
||||
|
||||
log mlog.Log
|
||||
panic bool
|
||||
tagGen int
|
||||
record bool // If true, bytes read are added to recordBuf. recorded() resets.
|
||||
@ -57,14 +71,18 @@ func (e Error) Unwrap() error {
|
||||
// The initial untagged greeting response is read and must be "OK" or
|
||||
// "PREAUTH". If preauth, the connection is already in authenticated state,
|
||||
// typically through TLS client certificate. This is indicated in Conn.Preauth.
|
||||
func New(conn net.Conn, xpanic bool) (client *Conn, rerr error) {
|
||||
func New(cid int64, conn net.Conn, xpanic bool) (client *Conn, rerr error) {
|
||||
log := mlog.New("imapclient", nil).WithCid(cid)
|
||||
c := Conn{
|
||||
conn: conn,
|
||||
r: bufio.NewReader(conn),
|
||||
br: bufio.NewReader(moxio.NewTraceReader(log, "CR: ", conn)),
|
||||
log: log,
|
||||
panic: xpanic,
|
||||
CapAvailable: map[Capability]struct{}{},
|
||||
CapEnabled: map[Capability]struct{}{},
|
||||
}
|
||||
// Writes are buffered and write to Conn, which may panic.
|
||||
c.bw = bufio.NewWriter(moxio.NewTraceWriter(log, "CW: ", &c))
|
||||
|
||||
defer c.recover(&rerr)
|
||||
tag := c.xnonspace()
|
||||
@ -122,6 +140,53 @@ func (c *Conn) xcheck(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Write writes directly to the connection. Write errors do take the connection's
|
||||
// panic mode into account, i.e. Write can panic.
|
||||
func (c *Conn) Write(buf []byte) (n int, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
n, rerr = c.conn.Write(buf)
|
||||
c.xcheckf(rerr, "write")
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (c *Conn) xflush() {
|
||||
err := c.bw.Flush()
|
||||
c.xcheckf(err, "flush")
|
||||
|
||||
// If compression is active, we need to flush the deflate stream.
|
||||
if c.compress {
|
||||
err := c.flateWriter.Flush()
|
||||
c.xcheckf(err, "flush deflate")
|
||||
err = c.flateBW.Flush()
|
||||
c.xcheckf(err, "flush deflate buffer")
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the connection, flushing and closing any compression and TLS layer.
|
||||
//
|
||||
// You may want to call Logout first. Closing a connection with a mailbox with
|
||||
// deleted messages not yet expunged will not expunge those messages.
|
||||
func (c *Conn) Close() (rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
if c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
if c.flateWriter != nil {
|
||||
err := c.flateWriter.Close()
|
||||
c.xcheckf(err, "close deflate writer")
|
||||
err = c.flateBW.Flush()
|
||||
c.xcheckf(err, "flush deflate buffer")
|
||||
c.flateWriter = nil
|
||||
c.flateBW = nil
|
||||
}
|
||||
err := c.conn.Close()
|
||||
c.xcheckf(err, "close connection")
|
||||
c.conn = nil
|
||||
return
|
||||
}
|
||||
|
||||
// TLSConnectionState returns the TLS connection state if the connection uses TLS.
|
||||
func (c *Conn) TLSConnectionState() *tls.ConnectionState {
|
||||
if conn, ok := c.conn.(*tls.Conn); ok {
|
||||
@ -141,8 +206,9 @@ func (c *Conn) Commandf(tag string, format string, args ...any) (rerr error) {
|
||||
}
|
||||
c.LastTag = tag
|
||||
|
||||
_, err := fmt.Fprintf(c.conn, "%s %s\r\n", tag, fmt.Sprintf(format, args...))
|
||||
_, err := fmt.Fprintf(c.bw, "%s %s\r\n", tag, fmt.Sprintf(format, args...))
|
||||
c.xcheckf(err, "write command")
|
||||
c.xflush()
|
||||
return
|
||||
}
|
||||
|
||||
@ -196,7 +262,7 @@ func (c *Conn) ReadUntagged() (untagged Untagged, rerr error) {
|
||||
func (c *Conn) Readline() (line string, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
line, err := c.r.ReadString('\n')
|
||||
line, err := c.br.ReadString('\n')
|
||||
c.xcheckf(err, "read line")
|
||||
return line, nil
|
||||
}
|
||||
@ -225,37 +291,30 @@ func (c *Conn) Writelinef(format string, args ...any) (rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
s := fmt.Sprintf(format, args...)
|
||||
_, err := fmt.Fprintf(c.conn, "%s\r\n", s)
|
||||
_, err := fmt.Fprintf(c.bw, "%s\r\n", s)
|
||||
c.xcheckf(err, "writeline")
|
||||
c.xflush()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write writes directly to the connection. Write errors do take the connections
|
||||
// panic mode into account, i.e. Write can panic.
|
||||
func (c *Conn) Write(buf []byte) (n int, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
n, rerr = c.conn.Write(buf)
|
||||
c.xcheckf(rerr, "write")
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// WriteSyncLiteral first writes the synchronous literal size, then read the
|
||||
// WriteSyncLiteral first writes the synchronous literal size, then reads the
|
||||
// continuation "+" and finally writes the data.
|
||||
func (c *Conn) WriteSyncLiteral(s string) (untagged []Untagged, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
_, err := fmt.Fprintf(c.conn, "{%d}\r\n", len(s))
|
||||
_, err := fmt.Fprintf(c.bw, "{%d}\r\n", len(s))
|
||||
c.xcheckf(err, "write sync literal size")
|
||||
c.xflush()
|
||||
|
||||
plus, err := c.r.Peek(1)
|
||||
plus, err := c.br.Peek(1)
|
||||
c.xcheckf(err, "read continuation")
|
||||
if plus[0] == '+' {
|
||||
_, err = c.Readline()
|
||||
c.xcheckf(err, "read continuation line")
|
||||
|
||||
_, err = c.conn.Write([]byte(s))
|
||||
_, err = c.bw.Write([]byte(s))
|
||||
c.xcheckf(err, "write literal data")
|
||||
c.xflush()
|
||||
return nil, nil
|
||||
}
|
||||
untagged, result, err := c.Response()
|
||||
@ -301,15 +360,3 @@ func (c *Conn) xgetUntagged(l []Untagged, dst any) {
|
||||
}
|
||||
dstv.Elem().Set(gotv)
|
||||
}
|
||||
|
||||
// Close closes the connection without writing anything to the server.
|
||||
// You may want to call Logout. Closing a connection with a mailbox with deleted
|
||||
// message not yet expunged will not expunge those messages.
|
||||
func (c *Conn) Close() error {
|
||||
var err error
|
||||
if c.conn != nil {
|
||||
err = c.conn.Close()
|
||||
c.conn = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -9,6 +9,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mjl-/flate"
|
||||
|
||||
"github.com/mjl-/mox/mlog"
|
||||
"github.com/mjl-/mox/moxio"
|
||||
"github.com/mjl-/mox/scram"
|
||||
)
|
||||
|
||||
@ -39,11 +43,14 @@ func (c *Conn) Starttls(config *tls.Config) (untagged []Untagged, result Result,
|
||||
defer c.recover(&rerr)
|
||||
untagged, result, rerr = c.Transactf("starttls")
|
||||
c.xcheckf(rerr, "starttls command")
|
||||
conn := tls.Client(c.conn, config)
|
||||
err := conn.Handshake()
|
||||
|
||||
conn := c.xprefixConn()
|
||||
tlsConn := tls.Client(conn, config)
|
||||
err := tlsConn.Handshake()
|
||||
c.xcheckf(err, "tls handshake")
|
||||
c.conn = conn
|
||||
c.r = bufio.NewReader(conn)
|
||||
c.conn = tlsConn
|
||||
c.br = bufio.NewReader(moxio.NewTraceReader(c.log, "CR: ", tlsConn))
|
||||
c.bw = bufio.NewWriter(moxio.NewTraceWriter(c.log, "CW: ", c))
|
||||
return untagged, result, nil
|
||||
}
|
||||
|
||||
@ -116,6 +123,38 @@ func (c *Conn) AuthenticateSCRAM(method string, h func() hash.Hash, username, pa
|
||||
return c.ResponseOK()
|
||||
}
|
||||
|
||||
// CompressDeflate enables compression with deflate on the connection.
|
||||
//
|
||||
// Only possible when server has announced the COMPRESS=DEFLATE capability.
|
||||
//
|
||||
// State: Authenticated or selected.
|
||||
func (c *Conn) CompressDeflate() (untagged []Untagged, result Result, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
||||
if _, ok := c.CapAvailable[CapCompressDeflate]; !ok {
|
||||
c.xerrorf("server does not implement capability %s", CapCompressDeflate)
|
||||
}
|
||||
|
||||
untagged, result, rerr = c.Transactf("compress deflate")
|
||||
c.xcheck(rerr)
|
||||
|
||||
c.flateBW = bufio.NewWriter(c)
|
||||
fw, err := flate.NewWriter(c.flateBW, flate.DefaultCompression)
|
||||
c.xcheckf(err, "deflate") // Cannot happen.
|
||||
|
||||
c.compress = true
|
||||
c.flateWriter = fw
|
||||
tw := moxio.NewTraceWriter(mlog.New("imapclient", nil), "CW: ", fw)
|
||||
c.bw = bufio.NewWriter(tw)
|
||||
|
||||
rc := c.xprefixConn()
|
||||
fr := flate.NewReaderPartial(rc)
|
||||
tr := moxio.NewTraceReader(mlog.New("imapclient", nil), "CR: ", fr)
|
||||
c.br = bufio.NewReader(tr)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Enable enables capabilities for use with the connection, verifying the server has indeed enabled them.
|
||||
func (c *Conn) Enable(capabilities ...string) (untagged []Untagged, result Result, rerr error) {
|
||||
defer c.recover(&rerr)
|
||||
|
@ -23,7 +23,7 @@ func (c *Conn) recordAdd(buf []byte) {
|
||||
|
||||
func (c *Conn) xtake(s string) {
|
||||
buf := make([]byte, len(s))
|
||||
_, err := io.ReadFull(c.r, buf)
|
||||
_, err := io.ReadFull(c.br, buf)
|
||||
c.xcheckf(err, "taking %q", s)
|
||||
if !strings.EqualFold(string(buf), s) {
|
||||
c.xerrorf("got %q, expected %q", buf, s)
|
||||
@ -32,7 +32,7 @@ func (c *Conn) xtake(s string) {
|
||||
}
|
||||
|
||||
func (c *Conn) readbyte() (byte, error) {
|
||||
b, err := c.r.ReadByte()
|
||||
b, err := c.br.ReadByte()
|
||||
if err == nil {
|
||||
c.recordAdd([]byte{b})
|
||||
}
|
||||
@ -43,12 +43,12 @@ func (c *Conn) unreadbyte() {
|
||||
if c.record {
|
||||
c.recordBuf = c.recordBuf[:len(c.recordBuf)-1]
|
||||
}
|
||||
err := c.r.UnreadByte()
|
||||
err := c.br.UnreadByte()
|
||||
c.xcheckf(err, "unread byte")
|
||||
}
|
||||
|
||||
func (c *Conn) readrune() (rune, error) {
|
||||
x, _, err := c.r.ReadRune()
|
||||
x, _, err := c.br.ReadRune()
|
||||
if err == nil {
|
||||
c.recordAdd([]byte(string(x)))
|
||||
}
|
||||
@ -126,7 +126,8 @@ func (c *Conn) xrespText() RespText {
|
||||
var knownCodes = stringMap(
|
||||
// Without parameters.
|
||||
"ALERT", "PARSE", "READ-ONLY", "READ-WRITE", "TRYCREATE", "UIDNOTSTICKY", "UNAVAILABLE", "AUTHENTICATIONFAILED", "AUTHORIZATIONFAILED", "EXPIRED", "PRIVACYREQUIRED", "CONTACTADMIN", "NOPERM", "INUSE", "EXPUNGEISSUED", "CORRUPTION", "SERVERBUG", "CLIENTBUG", "CANNOT", "LIMIT", "OVERQUOTA", "ALREADYEXISTS", "NONEXISTENT", "NOTSAVED", "HASCHILDREN", "CLOSED", "UNKNOWN-CTE",
|
||||
"OVERQUOTA", // ../rfc/9208:472
|
||||
"OVERQUOTA", // ../rfc/9208:472
|
||||
"COMPRESSIONACTIVE", // ../rfc/4978:143
|
||||
// With parameters.
|
||||
"BADCHARSET", "CAPABILITY", "PERMANENTFLAGS", "UIDNEXT", "UIDVALIDITY", "UNSEEN", "APPENDUID", "COPYUID",
|
||||
"HIGHESTMODSEQ", "MODIFIED",
|
||||
@ -810,7 +811,7 @@ func (c *Conn) xatom() string {
|
||||
b, err := c.readbyte()
|
||||
c.xcheckf(err, "read byte for atom")
|
||||
if b <= ' ' || strings.IndexByte("(){%*\"\\]", b) >= 0 {
|
||||
c.r.UnreadByte()
|
||||
c.br.UnreadByte()
|
||||
if s == "" {
|
||||
c.xerrorf("expected atom")
|
||||
}
|
||||
@ -850,11 +851,12 @@ func (c *Conn) xliteral() []byte {
|
||||
c.xerrorf("refusing to read more than 1MB: %d", size)
|
||||
}
|
||||
if sync {
|
||||
_, err := fmt.Fprintf(c.conn, "+ ok\r\n")
|
||||
_, err := fmt.Fprintf(c.bw, "+ ok\r\n")
|
||||
c.xcheckf(err, "write continuation")
|
||||
c.xflush()
|
||||
}
|
||||
buf := make([]byte, int(size))
|
||||
_, err := io.ReadFull(c.r, buf)
|
||||
_, err := io.ReadFull(c.br, buf)
|
||||
c.xcheckf(err, "reading data for literal")
|
||||
return buf
|
||||
}
|
||||
|
44
imapclient/prefixconn.go
Normal file
44
imapclient/prefixconn.go
Normal file
@ -0,0 +1,44 @@
|
||||
package imapclient
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
)
|
||||
|
||||
// prefixConn is a net.Conn with a buffer from which the first reads are satisfied.
|
||||
// used for STARTTLS where already did a buffered read of initial TLS data.
|
||||
type prefixConn struct {
|
||||
prefix []byte
|
||||
net.Conn
|
||||
}
|
||||
|
||||
func (c *prefixConn) Read(buf []byte) (int, error) {
|
||||
if len(c.prefix) > 0 {
|
||||
n := len(buf)
|
||||
if n > len(c.prefix) {
|
||||
n = len(c.prefix)
|
||||
}
|
||||
copy(buf[:n], c.prefix[:n])
|
||||
c.prefix = c.prefix[n:]
|
||||
if len(c.prefix) == 0 {
|
||||
c.prefix = nil
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
return c.Conn.Read(buf)
|
||||
}
|
||||
|
||||
// xprefixConn checks if there are any buffered unconsumed reads. If not, it
|
||||
// returns c.conn. Otherwise, it returns a *prefixConn from which the buffered data
|
||||
// can be read followed by data from c.conn.
|
||||
func (c *Conn) xprefixConn() net.Conn {
|
||||
n := c.br.Buffered()
|
||||
if n == 0 {
|
||||
return c.conn
|
||||
}
|
||||
|
||||
buf := make([]byte, n)
|
||||
_, err := io.ReadFull(c.br, buf)
|
||||
c.xcheckf(err, "get buffered data")
|
||||
return &prefixConn{buf, c.conn}
|
||||
}
|
@ -36,6 +36,7 @@ const (
|
||||
CapMetadataServer Capability = "METADATA-SERVER" // ../rfc/5464:124
|
||||
CapSaveDate Capability = "SAVEDATE" // ../rfc/8514
|
||||
CapCreateSpecialUse Capability = "CREATE-SPECIAL-USE" // ../rfc/6154:296
|
||||
CapCompressDeflate Capability = "COMPRESS=DEFLATE" // ../rfc/4978:65
|
||||
)
|
||||
|
||||
// Status is the tagged final result of a command.
|
||||
@ -381,7 +382,7 @@ func (ns NumSet) String() string {
|
||||
}
|
||||
|
||||
func ParseNumSet(s string) (ns NumSet, rerr error) {
|
||||
c := Conn{r: bufio.NewReader(strings.NewReader(s))}
|
||||
c := Conn{br: bufio.NewReader(strings.NewReader(s))}
|
||||
defer c.recover(&rerr)
|
||||
ns = c.xsequenceSet()
|
||||
return
|
||||
|
Reference in New Issue
Block a user