mirror of
https://github.com/mjl-/mox.git
synced 2025-07-10 05:54:38 +03:00
mox!
This commit is contained in:
535
queue/queue_test.go
Normal file
535
queue/queue_test.go
Normal file
@ -0,0 +1,535 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mjl-/bstore"
|
||||
|
||||
"github.com/mjl-/mox/dns"
|
||||
"github.com/mjl-/mox/mox-"
|
||||
"github.com/mjl-/mox/smtp"
|
||||
"github.com/mjl-/mox/store"
|
||||
)
|
||||
|
||||
func tcheck(t *testing.T, err error, msg string) {
|
||||
if err != nil {
|
||||
t.Helper()
|
||||
t.Fatalf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*store.Account, func()) {
|
||||
// Prepare config so email can be delivered to mjl@mox.example.
|
||||
os.RemoveAll("../testdata/queue/data")
|
||||
mox.Context = context.Background()
|
||||
mox.ConfigStaticPath = "../testdata/queue/mox.conf"
|
||||
mox.MustLoadConfig()
|
||||
acc, err := store.OpenAccount("mjl")
|
||||
tcheck(t, err, "open account")
|
||||
err = acc.SetPassword("testtest")
|
||||
tcheck(t, err, "set password")
|
||||
switchDone := store.Switchboard()
|
||||
mox.Shutdown = make(chan struct{})
|
||||
return acc, func() {
|
||||
acc.Close()
|
||||
close(mox.Shutdown)
|
||||
mox.Shutdown = make(chan struct{})
|
||||
Shutdown()
|
||||
close(switchDone)
|
||||
}
|
||||
}
|
||||
|
||||
var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
|
||||
To: <mjl@mox.example>
|
||||
Subject: test
|
||||
|
||||
test email
|
||||
`, "\n", "\r\n")
|
||||
|
||||
func prepareFile(t *testing.T) *os.File {
|
||||
t.Helper()
|
||||
msgFile, err := store.CreateMessageTemp("queue")
|
||||
tcheck(t, err, "create temp message for delivery to queue")
|
||||
_, err = msgFile.Write([]byte(testmsg))
|
||||
tcheck(t, err, "write message file")
|
||||
return msgFile
|
||||
}
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
acc, cleanup := setup(t)
|
||||
defer cleanup()
|
||||
err := Init()
|
||||
tcheck(t, err, "queue init")
|
||||
|
||||
msgs, err := List()
|
||||
tcheck(t, err, "listing messages in queue")
|
||||
if len(msgs) != 0 {
|
||||
t.Fatalf("got %d messages in queue, expected 0", len(msgs))
|
||||
}
|
||||
|
||||
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
||||
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
||||
tcheck(t, err, "add message to queue for delivery")
|
||||
|
||||
mf2 := prepareFile(t)
|
||||
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, mf2, nil, false)
|
||||
tcheck(t, err, "add message to queue for delivery")
|
||||
os.Remove(mf2.Name())
|
||||
|
||||
msgs, err = List()
|
||||
tcheck(t, err, "listing queue")
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("got msgs %v, expected 1", msgs)
|
||||
}
|
||||
msg := msgs[0]
|
||||
if msg.Attempts != 0 {
|
||||
t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
|
||||
}
|
||||
n, err := Drop(msgs[1].ID, "", "")
|
||||
tcheck(t, err, "drop")
|
||||
if n != 1 {
|
||||
t.Fatalf("dropped %d, expected 1", n)
|
||||
}
|
||||
|
||||
next := nextWork(nil)
|
||||
if next > 0 {
|
||||
t.Fatalf("nextWork in %s, should be now", next)
|
||||
}
|
||||
busy := map[string]struct{}{"mox.example": {}}
|
||||
if x := nextWork(busy); x != 24*time.Hour {
|
||||
t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
|
||||
}
|
||||
if nn := launchWork(nil, busy); nn != 0 {
|
||||
t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
|
||||
}
|
||||
|
||||
// Override dial function. We'll make connecting fail for now.
|
||||
resolver := dns.MockResolver{
|
||||
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
||||
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
||||
}
|
||||
dialed := make(chan struct{}, 1)
|
||||
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
||||
dialed <- struct{}{}
|
||||
return nil, fmt.Errorf("failure from test")
|
||||
}
|
||||
|
||||
launchWork(resolver, map[string]struct{}{})
|
||||
|
||||
// Wait until we see the dial and the failed attempt.
|
||||
timer := time.NewTimer(time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-dialed:
|
||||
i := 0
|
||||
for {
|
||||
m, err := bstore.QueryDB[Msg](queueDB).Get()
|
||||
tcheck(t, err, "get")
|
||||
if m.Attempts == 1 {
|
||||
break
|
||||
}
|
||||
i++
|
||||
if i == 10 {
|
||||
t.Fatalf("message in queue not updated")
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatalf("no dial within 1s")
|
||||
}
|
||||
<-deliveryResult // Deliver sends here.
|
||||
|
||||
_, err = OpenMessage(msg.ID + 1)
|
||||
if err != bstore.ErrAbsent {
|
||||
t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
|
||||
}
|
||||
reader, err := OpenMessage(msg.ID)
|
||||
tcheck(t, err, "open message")
|
||||
defer reader.Close()
|
||||
msgbuf, err := io.ReadAll(reader)
|
||||
tcheck(t, err, "read message")
|
||||
if string(msgbuf) != testmsg {
|
||||
t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
|
||||
}
|
||||
|
||||
n, err = Kick(msg.ID+1, "", "")
|
||||
tcheck(t, err, "kick")
|
||||
if n != 0 {
|
||||
t.Fatalf("kick %d, expected 0", n)
|
||||
}
|
||||
n, err = Kick(msg.ID, "", "")
|
||||
tcheck(t, err, "kick")
|
||||
if n != 1 {
|
||||
t.Fatalf("kicked %d, expected 1", n)
|
||||
}
|
||||
|
||||
// Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
|
||||
// client-side to the invocation dial, for the attempted delivery from the queue.
|
||||
// The delivery should succeed.
|
||||
server, client := net.Pipe()
|
||||
defer server.Close()
|
||||
defer client.Close()
|
||||
|
||||
smtpdone := make(chan struct{})
|
||||
go func() {
|
||||
// We do a minimal fake smtp server. We cannot import smtpserver.Serve due to cyclic dependencies.
|
||||
fmt.Fprintf(server, "220 mox.example\r\n")
|
||||
br := bufio.NewReader(server)
|
||||
br.ReadString('\n') // Should be EHLO.
|
||||
fmt.Fprintf(server, "250 ok\r\n")
|
||||
br.ReadString('\n') // Should be MAIL FROM.
|
||||
fmt.Fprintf(server, "250 ok\r\n")
|
||||
br.ReadString('\n') // Should be RCPT TO.
|
||||
fmt.Fprintf(server, "250 ok\r\n")
|
||||
br.ReadString('\n') // Should be DATA.
|
||||
fmt.Fprintf(server, "354 continue\r\n")
|
||||
reader := smtp.NewDataReader(br)
|
||||
io.Copy(io.Discard, reader)
|
||||
fmt.Fprintf(server, "250 ok\r\n")
|
||||
br.ReadString('\n') // Should be QUIT.
|
||||
fmt.Fprintf(server, "221 ok\r\n")
|
||||
|
||||
smtpdone <- struct{}{}
|
||||
}()
|
||||
|
||||
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
||||
dialed <- struct{}{}
|
||||
return client, nil
|
||||
}
|
||||
launchWork(resolver, map[string]struct{}{})
|
||||
|
||||
timer.Reset(time.Second)
|
||||
select {
|
||||
case <-dialed:
|
||||
select {
|
||||
case <-smtpdone:
|
||||
i := 0
|
||||
for {
|
||||
xmsgs, err := List()
|
||||
tcheck(t, err, "list queue")
|
||||
if len(xmsgs) == 0 {
|
||||
break
|
||||
}
|
||||
i++
|
||||
if i == 10 {
|
||||
t.Fatalf("%d messages in queue, expected 0", len(xmsgs))
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatalf("no deliver within 1s")
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatalf("no dial within 1s")
|
||||
}
|
||||
<-deliveryResult // Deliver sends here.
|
||||
|
||||
// Add another message that we'll fail to deliver entirely.
|
||||
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
||||
tcheck(t, err, "add message to queue for delivery")
|
||||
|
||||
msgs, err = List()
|
||||
tcheck(t, err, "list queue")
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("queue has %d messages, expected 1", len(msgs))
|
||||
}
|
||||
msg = msgs[0]
|
||||
|
||||
prepServer := func(code string) (net.Conn, func()) {
|
||||
server, client := net.Pipe()
|
||||
go func() {
|
||||
fmt.Fprintf(server, "%s mox.example\r\n", code)
|
||||
server.Close()
|
||||
}()
|
||||
return client, func() {
|
||||
server.Close()
|
||||
client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
conn2, cleanup2 := prepServer("220")
|
||||
conn3, cleanup3 := prepServer("451")
|
||||
defer func() {
|
||||
cleanup2()
|
||||
cleanup3()
|
||||
}()
|
||||
|
||||
seq := 0
|
||||
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
||||
seq++
|
||||
switch seq {
|
||||
default:
|
||||
return nil, fmt.Errorf("connect error from test")
|
||||
case 2:
|
||||
return conn2, nil
|
||||
case 3:
|
||||
return conn3, nil
|
||||
}
|
||||
}
|
||||
|
||||
comm := store.RegisterComm(acc)
|
||||
defer comm.Unregister()
|
||||
|
||||
for i := 1; i < 8; i++ {
|
||||
go func() { <-deliveryResult }() // Deliver sends here.
|
||||
deliver(resolver, msg)
|
||||
err = queueDB.Get(&msg)
|
||||
tcheck(t, err, "get msg")
|
||||
if msg.Attempts != i {
|
||||
t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
|
||||
}
|
||||
if msg.Attempts == 5 {
|
||||
timer.Reset(time.Second)
|
||||
changes := make(chan struct{}, 1)
|
||||
go func() {
|
||||
comm.Get()
|
||||
changes <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-changes:
|
||||
case <-timer.C:
|
||||
t.Fatalf("no dsn in 1s")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger final failure.
|
||||
go func() { <-deliveryResult }() // Deliver sends here.
|
||||
deliver(resolver, msg)
|
||||
err = queueDB.Get(&msg)
|
||||
if err != bstore.ErrAbsent {
|
||||
t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
|
||||
}
|
||||
|
||||
timer.Reset(time.Second)
|
||||
changes := make(chan struct{}, 1)
|
||||
go func() {
|
||||
comm.Get()
|
||||
changes <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-changes:
|
||||
case <-timer.C:
|
||||
t.Fatalf("no dsn in 1s")
|
||||
}
|
||||
}
|
||||
|
||||
// test Start and that it attempts to deliver.
|
||||
func TestQueueStart(t *testing.T) {
|
||||
// Override dial function. We'll make connecting fail and check the attempt.
|
||||
resolver := dns.MockResolver{
|
||||
A: map[string][]string{"mox.example.": {"127.0.0.1"}},
|
||||
MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
|
||||
}
|
||||
dialed := make(chan struct{}, 1)
|
||||
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
||||
dialed <- struct{}{}
|
||||
return nil, fmt.Errorf("failure from test")
|
||||
}
|
||||
|
||||
_, cleanup := setup(t)
|
||||
defer cleanup()
|
||||
done := make(chan struct{}, 1)
|
||||
defer func() {
|
||||
close(mox.Shutdown)
|
||||
<-done
|
||||
mox.Shutdown = make(chan struct{})
|
||||
}()
|
||||
err := Start(resolver, done)
|
||||
tcheck(t, err, "queue start")
|
||||
|
||||
checkDialed := func(need bool) {
|
||||
t.Helper()
|
||||
d := time.Second / 10
|
||||
if need {
|
||||
d = time.Second
|
||||
}
|
||||
timer := time.NewTimer(d)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-dialed:
|
||||
if !need {
|
||||
t.Fatalf("unexpected dial attempt")
|
||||
}
|
||||
case <-timer.C:
|
||||
if need {
|
||||
t.Fatalf("expected to see a dial attempt")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
|
||||
err = Add(xlog, "mjl", path, path, false, false, int64(len(testmsg)), nil, prepareFile(t), nil, true)
|
||||
tcheck(t, err, "add message to queue for delivery")
|
||||
checkDialed(true)
|
||||
|
||||
// Don't change message nextattempt time, but kick queue. Message should not be delivered.
|
||||
queuekick()
|
||||
checkDialed(false)
|
||||
|
||||
// Kick for real, should see another attempt.
|
||||
n, err := Kick(0, "mox.example", "")
|
||||
tcheck(t, err, "kick queue")
|
||||
if n != 1 {
|
||||
t.Fatalf("kick changed %d messages, expected 1", n)
|
||||
}
|
||||
checkDialed(true)
|
||||
time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done...
|
||||
}
|
||||
|
||||
func TestWriteFile(t *testing.T) {
|
||||
name := "../testdata/queue.test"
|
||||
os.Remove(name)
|
||||
defer os.Remove(name)
|
||||
err := writeFile(name, strings.NewReader("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("writeFile, unexpected error %v", err)
|
||||
}
|
||||
buf, err := os.ReadFile(name)
|
||||
if err != nil || string(buf) != "test" {
|
||||
t.Fatalf("writeFile, read file, got err %v, data %q", err, buf)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatherHosts(t *testing.T) {
|
||||
mox.Context = context.Background()
|
||||
|
||||
// Test basic MX lookup case, but also following CNAME, detecting CNAME loops and
|
||||
// having a CNAME limit, connecting directly to a host, and domain that does not
|
||||
// exist or has temporary error.
|
||||
|
||||
resolver := dns.MockResolver{
|
||||
MX: map[string][]*net.MX{
|
||||
"basic.example.": {{Host: "mail.basic.example.", Pref: 10}},
|
||||
"multimx.example.": {{Host: "mail1.multimx.example.", Pref: 10}, {Host: "mail2.multimx.example.", Pref: 10}},
|
||||
"nullmx.example.": {{Host: ".", Pref: 10}},
|
||||
"temperror-mx.example.": {{Host: "absent.example.", Pref: 10}},
|
||||
},
|
||||
A: map[string][]string{
|
||||
"mail.basic.example": {"10.0.0.1"},
|
||||
"justhost.example.": {"10.0.0.1"}, // No MX record for domain, only an A record.
|
||||
"temperror-a.example.": {"10.0.0.1"},
|
||||
},
|
||||
AAAA: map[string][]string{
|
||||
"justhost6.example.": {"2001:db8::1"}, // No MX record for domain, only an AAAA record.
|
||||
},
|
||||
CNAME: map[string]string{
|
||||
"cname.example.": "basic.example.",
|
||||
"cnameloop.example.": "cnameloop2.example.",
|
||||
"cnameloop2.example.": "cnameloop.example.",
|
||||
"danglingcname.example.": "absent.example.", // Points to missing name.
|
||||
"temperror-cname.example.": "absent.example.",
|
||||
},
|
||||
Fail: map[dns.Mockreq]struct{}{
|
||||
{Type: "mx", Name: "temperror-mx.example."}: {},
|
||||
{Type: "host", Name: "temperror-a.example."}: {},
|
||||
{Type: "cname", Name: "temperror-cname.example."}: {},
|
||||
},
|
||||
}
|
||||
for i := 0; i <= 16; i++ {
|
||||
s := fmt.Sprintf("cnamelimit%d.example.", i)
|
||||
next := fmt.Sprintf("cnamelimit%d.example.", i+1)
|
||||
resolver.CNAME[s] = next
|
||||
}
|
||||
|
||||
test := func(ipd dns.IPDomain, expHosts []dns.IPDomain, expDomain dns.Domain, expPerm bool, expErr error) {
|
||||
t.Helper()
|
||||
|
||||
m := Msg{RecipientDomain: ipd}
|
||||
hosts, ed, perm, err := gatherHosts(resolver, m, 1, xlog)
|
||||
if (err == nil) != (expErr == nil) || err != nil && !errors.Is(err, expErr) {
|
||||
// todo: could also check the individual errors? code currently does not have structured errors.
|
||||
t.Fatalf("gather hosts: %v", err)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(hosts, expHosts) || ed != expDomain || perm != expPerm {
|
||||
t.Fatalf("got hosts %#v, effectiveDomain %#v, permanent %#v, expected %#v %#v %#v", hosts, ed, perm, expHosts, expDomain, expPerm)
|
||||
}
|
||||
}
|
||||
|
||||
domain := func(s string) dns.Domain {
|
||||
d, err := dns.ParseDomain(s)
|
||||
if err != nil {
|
||||
t.Fatalf("parse domain: %v", err)
|
||||
}
|
||||
return d
|
||||
}
|
||||
ipdomain := func(s string) dns.IPDomain {
|
||||
ip := net.ParseIP(s)
|
||||
if ip != nil {
|
||||
return dns.IPDomain{IP: ip}
|
||||
}
|
||||
d, err := dns.ParseDomain(s)
|
||||
if err != nil {
|
||||
t.Fatalf("parse domain %q: %v", s, err)
|
||||
}
|
||||
return dns.IPDomain{Domain: d}
|
||||
}
|
||||
|
||||
ipdomains := func(s ...string) (l []dns.IPDomain) {
|
||||
for _, e := range s {
|
||||
l = append(l, ipdomain(e))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var zerodom dns.Domain
|
||||
|
||||
test(ipdomain("10.0.0.1"), ipdomains("10.0.0.1"), zerodom, false, nil)
|
||||
test(ipdomain("basic.example"), ipdomains("mail.basic.example"), domain("basic.example"), false, nil) // Basic with simple MX.
|
||||
test(ipdomain("multimx.example"), ipdomains("mail1.multimx.example", "mail2.multimx.example"), domain("multimx.example"), false, nil) // Basic with simple MX.
|
||||
test(ipdomain("justhost.example"), ipdomains("justhost.example"), domain("justhost.example"), false, nil) // Only an A record.
|
||||
test(ipdomain("justhost6.example"), ipdomains("justhost6.example"), domain("justhost6.example"), false, nil) // Only an AAAA record.
|
||||
test(ipdomain("cname.example"), ipdomains("mail.basic.example"), domain("basic.example"), false, nil) // Follow CNAME.
|
||||
test(ipdomain("cnamelimit1.example"), nil, zerodom, true, errCNAMELimit)
|
||||
test(ipdomain("cnameloop.example"), nil, zerodom, true, errCNAMELoop)
|
||||
test(ipdomain("absent.example"), nil, zerodom, true, errNoRecord)
|
||||
test(ipdomain("danglingcname.example"), nil, zerodom, true, errNoRecord)
|
||||
test(ipdomain("nullmx.example"), nil, zerodom, true, errNoMail)
|
||||
test(ipdomain("temperror-mx.example"), nil, zerodom, false, errDNS)
|
||||
test(ipdomain("temperror-cname.example"), nil, zerodom, false, errDNS)
|
||||
test(ipdomain("temperror-a.example"), nil, zerodom, false, errDNS)
|
||||
}
|
||||
|
||||
func TestDialHost(t *testing.T) {
|
||||
// We mostly want to test that dialing a second time switches to the other address family.
|
||||
|
||||
resolver := dns.MockResolver{
|
||||
A: map[string][]string{
|
||||
"dualstack.example.": {"10.0.0.1"},
|
||||
},
|
||||
AAAA: map[string][]string{
|
||||
"dualstack.example.": {"2001:db8::1"},
|
||||
},
|
||||
}
|
||||
|
||||
dial = func(ctx context.Context, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
|
||||
return nil, nil // No error, nil connection isn't used.
|
||||
}
|
||||
|
||||
ipdomain := func(s string) dns.IPDomain {
|
||||
return dns.IPDomain{Domain: dns.Domain{ASCII: s}}
|
||||
}
|
||||
|
||||
m := Msg{DialedIPs: map[string][]net.IP{}}
|
||||
_, ip, dualstack, err := dialHost(context.Background(), xlog, resolver, ipdomain("dualstack.example"), &m)
|
||||
if err != nil || ip.String() != "10.0.0.1" || !dualstack {
|
||||
t.Fatalf("expected err nil, address 10.0.0.1, dualstack true, got %v %v %v", err, ip, dualstack)
|
||||
}
|
||||
_, ip, dualstack, err = dialHost(context.Background(), xlog, resolver, ipdomain("dualstack.example"), &m)
|
||||
if err != nil || ip.String() != "2001:db8::1" || !dualstack {
|
||||
t.Fatalf("expected err nil, address 2001:db8::1, dualstack true, got %v %v %v", err, ip, dualstack)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user