1package queue
2
3import (
4 "bufio"
5 "context"
6 "crypto/ed25519"
7 cryptorand "crypto/rand"
8 "crypto/sha256"
9 "crypto/tls"
10 "crypto/x509"
11 "encoding/json"
12 "fmt"
13 "io"
14 "math/big"
15 "net"
16 "os"
17 "path/filepath"
18 "reflect"
19 "slices"
20 "strings"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/mjl-/adns"
26 "github.com/mjl-/bstore"
27
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/mtastsdb"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/smtpclient"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/tlsrpt"
36 "github.com/mjl-/mox/tlsrptdb"
37 "github.com/mjl-/mox/webhook"
38)
39
40var ctxbg = context.Background()
41var pkglog = mlog.New("queue", nil)
42
43func tcheck(t *testing.T, err error, msg string) {
44 if err != nil {
45 t.Helper()
46 t.Fatalf("%s: %s", msg, err)
47 }
48}
49
50func tcompare(t *testing.T, got, exp any) {
51 t.Helper()
52 if !reflect.DeepEqual(got, exp) {
53 t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
54 }
55}
56
57func setup(t *testing.T) (*store.Account, func()) {
58 // Prepare config so email can be delivered to mjl@mox.example.
59 os.RemoveAll("../testdata/queue/data")
60 log := mlog.New("queue", nil)
61 mox.Context = ctxbg
62 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
63 mox.MustLoadConfig(true, false)
64 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
65 err := Init()
66 tcheck(t, err, "queue init")
67 err = mtastsdb.Init(false)
68 tcheck(t, err, "mtastsdb init")
69 err = tlsrptdb.Init()
70 tcheck(t, err, "tlsrptdb init")
71 switchStop := store.Switchboard()
72
73 acc, err := store.OpenAccount(log, "mjl", false)
74 tcheck(t, err, "open account")
75 err = acc.SetPassword(log, "testtest")
76 tcheck(t, err, "set password")
77
78 return acc, func() {
79 acc.Close()
80 acc.WaitClosed()
81 mox.ShutdownCancel()
82 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
83 Shutdown()
84 err := mtastsdb.Close()
85 tcheck(t, err, "mtastsdb close")
86 err = tlsrptdb.Close()
87 tcheck(t, err, "tlsrptdb close")
88 switchStop()
89 }
90}
91
92var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
93To: <mjl@mox.example>
94Subject: test
95
96test email
97`, "\n", "\r\n")
98
99func prepareFile(t *testing.T) *os.File {
100 t.Helper()
101 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
102 tcheck(t, err, "create temp message for delivery to queue")
103 _, err = msgFile.Write([]byte(testmsg))
104 tcheck(t, err, "write message file")
105 return msgFile
106}
107
108func TestQueue(t *testing.T) {
109 acc, cleanup := setup(t)
110 defer cleanup()
111
112 idfilter := func(msgID int64) Filter {
113 return Filter{IDs: []int64{msgID}}
114 }
115
116 kick := func(expn int, id int64) {
117 t.Helper()
118 n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
119 tcheck(t, err, "kick queue")
120 if n != expn {
121 t.Fatalf("kick changed %d messages, expected %d", n, expn)
122 }
123 }
124
125 msgs, err := List(ctxbg, Filter{}, Sort{})
126 tcheck(t, err, "listing messages in queue")
127 if len(msgs) != 0 {
128 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
129 }
130
131 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
132 mf := prepareFile(t)
133 defer os.Remove(mf.Name())
134 defer mf.Close()
135
136 var qm Msg
137
138 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
139 err = Add(ctxbg, pkglog, "mjl", mf, qm)
140 tcheck(t, err, "add message to queue for delivery")
141
142 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
143 err = Add(ctxbg, pkglog, "mjl", mf, qm)
144 tcheck(t, err, "add message to queue for delivery")
145
146 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
147 err = Add(ctxbg, pkglog, "mjl", mf, qm)
148 tcheck(t, err, "add message to queue for delivery")
149
150 msgs, err = List(ctxbg, Filter{}, Sort{})
151 tcheck(t, err, "listing queue")
152 if len(msgs) != 3 {
153 t.Fatalf("got msgs %v, expected 1", msgs)
154 }
155
156 yes := true
157 n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
158 tcheck(t, err, "requiretlsset")
159 tcompare(t, n, 1)
160
161 msg := msgs[0]
162 if msg.Attempts != 0 {
163 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
164 }
165 n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
166 tcheck(t, err, "drop")
167 if n != 1 {
168 t.Fatalf("dropped %d, expected 1", n)
169 }
170 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
171 t.Fatalf("dropped message not removed from file system")
172 }
173
174 // Fail a message, check the account has a message afterwards, the DSN.
175 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
176 tcheck(t, err, "count messages in account")
177 tcompare(t, n, 0)
178 n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
179 tcheck(t, err, "fail")
180 if n != 1 {
181 t.Fatalf("failed %d, expected 1", n)
182 }
183 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
184 tcheck(t, err, "count messages in account")
185 tcompare(t, n, 1)
186
187 // Check filter through various List calls. Other code uses the same filtering function.
188 filter := func(f Filter, expn int) {
189 t.Helper()
190 l, err := List(ctxbg, f, Sort{})
191 tcheck(t, err, "list messages")
192 tcompare(t, len(l), expn)
193 }
194 filter(Filter{}, 1)
195 filter(Filter{Account: "mjl"}, 1)
196 filter(Filter{Account: "bogus"}, 0)
197 filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
198 filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
199 filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
200 filter(Filter{From: "mjl@"}, 1)
201 filter(Filter{From: "bogus@"}, 0)
202 filter(Filter{To: "mjl@"}, 1)
203 filter(Filter{To: "bogus@"}, 0)
204 filter(Filter{Hold: &yes}, 0)
205 no := false
206 filter(Filter{Hold: &no}, 1)
207 filter(Filter{Submitted: "<now"}, 1)
208 filter(Filter{Submitted: ">now"}, 0)
209 filter(Filter{NextAttempt: "<1m"}, 1)
210 filter(Filter{NextAttempt: ">1m"}, 0)
211 var empty string
212 bogus := "bogus"
213 filter(Filter{Transport: &empty}, 1)
214 filter(Filter{Transport: &bogus}, 0)
215
216 next := nextWork(ctxbg, pkglog, nil)
217 if next > 0 {
218 t.Fatalf("nextWork in %s, should be now", next)
219 }
220 busy := map[string]struct{}{"mox.example": {}}
221 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
222 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
223 }
224 if nn := launchWork(pkglog, nil, busy); nn != 0 {
225 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
226 }
227
228 mailDomain := dns.Domain{ASCII: "mox.example"}
229 mailHost := dns.Domain{ASCII: "mail.mox.example"}
230 resolver := dns.MockResolver{
231 A: map[string][]string{
232 "mail.mox.example.": {"127.0.0.1"},
233 "submission.example.": {"127.0.0.1"},
234 },
235 MX: map[string][]*net.MX{
236 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
237 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
238 },
239 }
240
241 // Try a failing delivery attempt.
242 var ndial int
243 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
244 ndial++
245 return nil, fmt.Errorf("failure from test")
246 }
247 defer func() {
248 smtpclient.DialHook = nil
249 }()
250
251 n = launchWork(pkglog, resolver, map[string]struct{}{})
252 tcompare(t, n, 1)
253
254 // Wait until we see the dial and the failed attempt.
255 timer := time.NewTimer(time.Second)
256 defer timer.Stop()
257 select {
258 case <-deliveryResults:
259 tcompare(t, ndial, 1)
260 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
261 tcheck(t, err, "get")
262 tcompare(t, m.Attempts, 1)
263 case <-timer.C:
264 t.Fatalf("no delivery within 1s")
265 }
266
267 // OpenMessage.
268 _, err = OpenMessage(ctxbg, msg.ID+1)
269 if err != bstore.ErrAbsent {
270 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
271 }
272 reader, err := OpenMessage(ctxbg, msg.ID)
273 tcheck(t, err, "open message")
274 defer reader.Close()
275 msgbuf, err := io.ReadAll(reader)
276 tcheck(t, err, "read message")
277 if string(msgbuf) != testmsg {
278 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
279 }
280
281 // Reduce by more than first attempt interval of 7.5 minutes.
282 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
283 tcheck(t, err, "kick")
284 if n != 0 {
285 t.Fatalf("kick %d, expected 0", n)
286 }
287 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
288 tcheck(t, err, "kick")
289 if n != 1 {
290 t.Fatalf("kicked %d, expected 1", n)
291 }
292
293 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
294 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
295 // cyclic dependencies.
296 fmt.Fprintf(server, "220 mail.mox.example\r\n")
297 br := bufio.NewReader(server)
298
299 readline := func(cmd string) {
300 line, err := br.ReadString('\n')
301 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
302 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
303 }
304 }
305 writeline := func(s string) {
306 fmt.Fprintf(server, "%s\r\n", s)
307 }
308
309 readline("ehlo")
310 writeline("250-mail.mox.example")
311 for _, ext := range extensions {
312 writeline("250-" + ext)
313 }
314 writeline("250 pipelining")
315 for range ntx {
316 readline("mail")
317 writeline("250 ok")
318 for i := range rcpts {
319 readline("rcpt")
320 if onercpt && i > 0 {
321 writeline("552 ok")
322 } else {
323 writeline("250 ok")
324 }
325 }
326 readline("data")
327 writeline("354 continue")
328 reader := smtp.NewDataReader(br)
329 io.Copy(io.Discard, reader)
330 writeline("250 ok")
331 }
332 readline("quit")
333 writeline("221 ok")
334 }
335 fakeSMTPServer := func(server net.Conn) {
336 nfakeSMTPServer(server, 1, 1, false, nil)
337 }
338 fakeSMTPServer2Rcpts := func(server net.Conn) {
339 nfakeSMTPServer(server, 2, 1, false, nil)
340 }
341 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
342 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
343 }
344 // Server that returns an error after first recipient. We expect another
345 // transaction to deliver the second message.
346 fakeSMTPServerRcpt1 := func(server net.Conn) {
347 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
348 // cyclic dependencies.
349 fmt.Fprintf(server, "220 mail.mox.example\r\n")
350 br := bufio.NewReader(server)
351
352 readline := func(cmd string) {
353 line, err := br.ReadString('\n')
354 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
355 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
356 }
357 }
358 writeline := func(s string) {
359 fmt.Fprintf(server, "%s\r\n", s)
360 }
361
362 readline("ehlo")
363 writeline("250-mail.mox.example")
364 writeline("250 pipelining")
365
366 readline("mail")
367 writeline("250 ok")
368 readline("rcpt")
369 writeline("250 ok")
370 readline("rcpt")
371 writeline("552 ok")
372 readline("data")
373 writeline("354 continue")
374 reader := smtp.NewDataReader(br)
375 io.Copy(io.Discard, reader)
376 writeline("250 ok")
377
378 readline("mail")
379 writeline("250 ok")
380 readline("rcpt")
381 writeline("250 ok")
382 readline("data")
383 writeline("354 continue")
384 reader = smtp.NewDataReader(br)
385 io.Copy(io.Discard, reader)
386 writeline("250 ok")
387
388 readline("quit")
389 writeline("221 ok")
390 }
391
392 moxCert := fakeCert(t, "mail.mox.example", false)
393 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
394 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
395 attempt := 0
396 return func(server net.Conn) {
397 attempt++
398
399 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
400 // cyclic dependencies.
401 fmt.Fprintf(server, "220 mail.mox.example\r\n")
402 br := bufio.NewReader(server)
403
404 readline := func(cmd string) {
405 line, err := br.ReadString('\n')
406 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
407 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
408 }
409 }
410 writeline := func(s string) {
411 fmt.Fprintf(server, "%s\r\n", s)
412 }
413
414 readline("ehlo")
415 writeline("250-mail.mox.example")
416 writeline("250 starttls")
417 if nstarttls == 0 || attempt <= nstarttls {
418 readline("starttls")
419 writeline("220 ok")
420 tlsConn := tls.Server(server, tlsConfig)
421 err := tlsConn.Handshake()
422 if err != nil {
423 return
424 }
425 server = tlsConn
426 br = bufio.NewReader(server)
427
428 readline("ehlo")
429 if requiretls {
430 writeline("250-mail.mox.example")
431 writeline("250 requiretls")
432 } else {
433 writeline("250 mail.mox.example")
434 }
435 }
436 readline("mail")
437 writeline("250 ok")
438 readline("rcpt")
439 writeline("250 ok")
440 readline("data")
441 writeline("354 continue")
442 reader := smtp.NewDataReader(br)
443 io.Copy(io.Discard, reader)
444 writeline("250 ok")
445 readline("quit")
446 writeline("221 ok")
447 }
448 }
449
450 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
451 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
452 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
453 }
454
455 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
456 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
457 // cyclic dependencies.
458 fmt.Fprintf(server, "220 mail.mox.example\r\n")
459 br := bufio.NewReader(server)
460 br.ReadString('\n') // Should be EHLO.
461 fmt.Fprintf(server, "250-localhost\r\n")
462 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
463 br.ReadString('\n') // Should be AUTH PLAIN
464 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
465 br.ReadString('\n') // Should be MAIL FROM.
466 fmt.Fprintf(server, "250 ok\r\n")
467 for range nrcpt {
468 br.ReadString('\n') // Should be RCPT TO.
469 fmt.Fprintf(server, "250 ok\r\n")
470 }
471 br.ReadString('\n') // Should be DATA.
472 fmt.Fprintf(server, "354 continue\r\n")
473 reader := smtp.NewDataReader(br)
474 io.Copy(io.Discard, reader)
475 fmt.Fprintf(server, "250 ok\r\n")
476 br.ReadString('\n') // Should be QUIT.
477 fmt.Fprintf(server, "221 ok\r\n")
478 }
479 fakeSubmitServer := func(server net.Conn) {
480 nfakeSubmitServer(server, 1)
481 }
482 fakeSubmitServer2Rcpts := func(server net.Conn) {
483 nfakeSubmitServer(server, 2)
484 }
485
486 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
487 t.Helper()
488
489 var pipes []net.Conn
490 defer func() {
491 for _, conn := range pipes {
492 conn.Close()
493 }
494 }()
495
496 var connMu sync.Mutex
497 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
498 connMu.Lock()
499 defer connMu.Unlock()
500
501 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
502 // client-side to the invocation dial, for the attempted delivery from the queue.
503 server, client := net.Pipe()
504 pipes = append(pipes, server, client)
505 go fakeServer(server)
506
507 _, wasNetDialer = dialer.(*net.Dialer)
508
509 return client, nil
510 }
511 defer func() {
512 smtpclient.DialHook = nil
513 }()
514
515 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
516 tcheck(t, err, "get inbox")
517
518 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
519 tcheck(t, err, "querying messages in inbox")
520
521 launchWork(pkglog, resolver, map[string]struct{}{})
522
523 // Wait for all results.
524 timer.Reset(time.Second)
525 for range nresults {
526 select {
527 case <-deliveryResults:
528 case <-timer.C:
529 t.Fatalf("no dial within 1s")
530 }
531 }
532
533 // Check that queue is now empty.
534 xmsgs, err := List(ctxbg, Filter{}, Sort{})
535 tcheck(t, err, "list queue")
536 tcompare(t, len(xmsgs), 0)
537
538 // And that we possibly got a DSN delivered.
539 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
540 tcheck(t, err, "querying messages in inbox")
541 if expectDSN && ninbox != inboxCount+1 {
542 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
543 } else if !expectDSN && ninbox != inboxCount {
544 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
545 }
546
547 return wasNetDialer
548 }
549 testDeliver := func(fakeServer func(conn net.Conn)) bool {
550 t.Helper()
551 return testQueue(false, fakeServer, 1)
552 }
553 testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
554 t.Helper()
555 return testQueue(false, fakeServer, nresults)
556 }
557 testDSN := func(fakeServer func(conn net.Conn)) bool {
558 t.Helper()
559 return testQueue(true, fakeServer, 1)
560 }
561
562 // Test direct delivery.
563 wasNetDialer := testDeliver(fakeSMTPServer)
564 if !wasNetDialer {
565 t.Fatalf("expected net.Dialer as dialer")
566 }
567
568 // Single delivery to two recipients at same domain, expecting single connection
569 // and single transaction.
570 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
571 qml := []Msg{qm0, qm0} // Same NextAttempt.
572 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
573 tcheck(t, err, "add messages to queue for delivery")
574 testDeliver(fakeSMTPServer2Rcpts)
575
576 // Single enqueue to two recipients at different domain, expecting two connections.
577 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
578 otherpath := otheraddr.Path()
579 t0 := time.Now()
580 qml = []Msg{
581 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
582 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
583 }
584 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
585 tcheck(t, err, "add messages to queue for delivery")
586 conns := ConnectionCounter()
587 testDeliverN(fakeSMTPServer, 2)
588 nconns := ConnectionCounter()
589 if nconns != conns+2 {
590 t.Errorf("saw %d connections, expected 2", nconns-conns)
591 }
592
593 // Single enqueue with two recipients at same domain, but with smtp server that has
594 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
595 qml = []Msg{qm0, qm0}
596 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
597 tcheck(t, err, "add messages to queue for delivery")
598 testDeliver(fakeSMTPServerLimitRcpt1)
599
600 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
601 // 2nd recipient, so we expect a single connection with two transactions.
602 qml = []Msg{qm0, qm0}
603 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
604 tcheck(t, err, "add messages to queue for delivery")
605 testDeliver(fakeSMTPServerRcpt1)
606
607 // Add a message to be delivered with submit because of its route.
608 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
609 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
610 err = Add(ctxbg, pkglog, "mjl", mf, qm)
611 tcheck(t, err, "add message to queue for delivery")
612 wasNetDialer = testDeliver(fakeSubmitServer)
613 if !wasNetDialer {
614 t.Fatalf("expected net.Dialer as dialer")
615 }
616
617 // Two messages for submission.
618 qml = []Msg{qm, qm}
619 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
620 tcheck(t, err, "add messages to queue for delivery")
621 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
622 if !wasNetDialer {
623 t.Fatalf("expected net.Dialer as dialer")
624 }
625
626 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
627 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
628 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
629 tcheck(t, err, "add message to queue for delivery")
630 transportSubmitTLS := "submittls"
631 n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
632 tcheck(t, err, "set transport")
633 if n != 1 {
634 t.Fatalf("TransportSet changed %d messages, expected 1", n)
635 }
636 // Make fake cert, and make it trusted.
637 cert := fakeCert(t, "submission.example", false)
638 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
639 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
640 tlsConfig := tls.Config{
641 Certificates: []tls.Certificate{cert},
642 }
643 wasNetDialer = testDeliver(func(conn net.Conn) {
644 conn = tls.Server(conn, &tlsConfig)
645 fakeSubmitServer(conn)
646 })
647 if !wasNetDialer {
648 t.Fatalf("expected net.Dialer as dialer")
649 }
650
651 // Various failure reasons.
652 fdNotTrusted := tlsrpt.FailureDetails{
653 ResultType: tlsrpt.ResultCertificateNotTrusted,
654 SendingMTAIP: "", // Missing due to pipe.
655 ReceivingMXHostname: "mail.mox.example",
656 ReceivingMXHelo: "mail.mox.example",
657 ReceivingIP: "", // Missing due to pipe.
658 FailedSessionCount: 1,
659 FailureReasonCode: "",
660 }
661 fdTLSAUnusable := tlsrpt.FailureDetails{
662 ResultType: tlsrpt.ResultTLSAInvalid,
663 ReceivingMXHostname: "mail.mox.example",
664 FailedSessionCount: 0,
665 FailureReasonCode: "all-unusable-records+ignored",
666 }
667 fdBadProtocol := tlsrpt.FailureDetails{
668 ResultType: tlsrpt.ResultValidationFailure,
669 ReceivingMXHostname: "mail.mox.example",
670 ReceivingMXHelo: "mail.mox.example",
671 FailedSessionCount: 1,
672 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
673 }
674
675 // Add a message to be delivered with socks.
676 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
677 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
678 tcheck(t, err, "add message to queue for delivery")
679 n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
680 tcheck(t, err, "TransportSet")
681 if n != 1 {
682 t.Fatalf("TransportSet changed %d messages, expected 1", n)
683 }
684 kick(1, qml[0].ID)
685 wasNetDialer = testDeliver(fakeSMTPServer)
686 if wasNetDialer {
687 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
688 }
689
690 // Add message to be delivered with opportunistic TLS verification.
691 clearTLSResults(t)
692 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
693 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
694 tcheck(t, err, "add message to queue for delivery")
695 kick(1, qml[0].ID)
696 testDeliver(fakeSMTPSTARTTLSServer)
697 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
698 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
699
700 // Test fallback to plain text with TLS handshake fails.
701 clearTLSResults(t)
702 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
703 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
704 tcheck(t, err, "add message to queue for delivery")
705 kick(1, qml[0].ID)
706 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
707 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
708 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
709
710 // Add message to be delivered with DANE verification.
711 clearTLSResults(t)
712 resolver.AllAuthentic = true
713 resolver.TLSA = map[string][]adns.TLSA{
714 "_25._tcp.mail.mox.example.": {
715 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
716 },
717 }
718 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
719 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
720 tcheck(t, err, "add message to queue for delivery")
721 kick(1, qml[0].ID)
722 testDeliver(fakeSMTPSTARTTLSServer)
723 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
724 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
725
726 // We should know starttls/requiretls by now.
727 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
728 err = acc.DB.Get(ctxbg, &rdt)
729 tcheck(t, err, "get recipientdomaintls")
730 tcompare(t, rdt.STARTTLS, true)
731 tcompare(t, rdt.RequireTLS, true)
732
733 // Add message to be delivered with verified TLS and REQUIRETLS.
734 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
735 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
736 tcheck(t, err, "add message to queue for delivery")
737 kick(1, qml[0].ID)
738 testDeliver(fakeSMTPSTARTTLSServer)
739
740 // Check that message is delivered with all unusable DANE records.
741 clearTLSResults(t)
742 resolver.TLSA = map[string][]adns.TLSA{
743 "_25._tcp.mail.mox.example.": {
744 {},
745 },
746 }
747 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
748 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
749 tcheck(t, err, "add message to queue for delivery")
750 kick(1, qml[0].ID)
751 testDeliver(fakeSMTPSTARTTLSServer)
752 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
753 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
754
755 // Check that message is delivered with insecure TLSA records. They should be
756 // ignored and regular STARTTLS tried.
757 clearTLSResults(t)
758 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
759 resolver.TLSA = map[string][]adns.TLSA{
760 "_25._tcp.mail.mox.example.": {
761 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
762 },
763 }
764 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
765 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
766 tcheck(t, err, "add message to queue for delivery")
767 kick(1, qml[0].ID)
768 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
769 resolver.Inauthentic = nil
770 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
771 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
772
773 // STARTTLS failed, so not known supported.
774 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
775 err = acc.DB.Get(ctxbg, &rdt)
776 tcheck(t, err, "get recipientdomaintls")
777 tcompare(t, rdt.STARTTLS, false)
778 tcompare(t, rdt.RequireTLS, false)
779
780 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
781 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
782 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
783 tcheck(t, err, "add message to queue for delivery")
784 kick(1, qml[0].ID)
785 testDeliver(fakeSMTPSTARTTLSServer)
786
787 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
788 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
789 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
790 tcheck(t, err, "add message to queue for delivery")
791 kick(1, qml[0].ID)
792 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
793
794 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
795 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
796 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
797 tcheck(t, err, "add message to queue for delivery")
798 kick(1, qml[0].ID)
799 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
800
801 // Restore pre-DANE behaviour.
802 resolver.AllAuthentic = false
803 resolver.TLSA = nil
804
805 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
806 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
807 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
808 tcheck(t, err, "add message to queue for delivery")
809 kick(1, qml[0].ID)
810 // Based on DNS lookups, there won't be any dialing or SMTP connection.
811 testDSN(func(conn net.Conn) {})
812
813 // Add another message that we'll fail to deliver entirely.
814 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
815 err = Add(ctxbg, pkglog, "mjl", mf, qm)
816 tcheck(t, err, "add message to queue for delivery")
817
818 msgs, err = List(ctxbg, Filter{}, Sort{})
819 tcheck(t, err, "list queue")
820 if len(msgs) != 1 {
821 t.Fatalf("queue has %d messages, expected 1", len(msgs))
822 }
823 msg = msgs[0]
824
825 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
826 server, client := net.Pipe()
827 go func() {
828 fn(server)
829 server.Close()
830 }()
831 return client, func() {
832 server.Close()
833 client.Close()
834 }
835 }
836
837 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
838 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
839 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
840 defer func() {
841 cleanup2()
842 cleanup3()
843 cleanup4()
844 }()
845
846 seq := 0
847 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
848 seq++
849 switch seq {
850 default:
851 return nil, fmt.Errorf("connect error from test")
852 case 2:
853 return conn2, nil
854 case 3:
855 return conn3, nil
856 case 4:
857 return conn4, nil
858 }
859 }
860 defer func() {
861 smtpclient.DialHook = nil
862 }()
863
864 comm := store.RegisterComm(acc)
865 defer comm.Unregister()
866
867 for i := 1; i < 8; i++ {
868 if i == 4 {
869 resolver.AllAuthentic = true
870 resolver.TLSA = map[string][]adns.TLSA{
871 "_25._tcp.mail.mox.example.": {
872 // Non-matching zero CertAssoc, should cause failure.
873 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
874 },
875 }
876 } else {
877 resolver.AllAuthentic = false
878 resolver.TLSA = nil
879 }
880 go deliver(pkglog, resolver, msg)
881 <-deliveryResults
882 err = DB.Get(ctxbg, &msg)
883 tcheck(t, err, "get msg")
884 if msg.Attempts != i {
885 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
886 }
887 if msg.Attempts == 5 {
888 timer.Reset(time.Second)
889 changes := make(chan struct{}, 1)
890 go func() {
891 comm.Get()
892 changes <- struct{}{}
893 }()
894 select {
895 case <-changes:
896 case <-timer.C:
897 t.Fatalf("no dsn in 1s")
898 }
899 }
900 }
901
902 // Trigger final failure.
903 go deliver(pkglog, resolver, msg)
904 <-deliveryResults
905 err = DB.Get(ctxbg, &msg)
906 if err != bstore.ErrAbsent {
907 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
908 }
909
910 timer.Reset(time.Second)
911 changes := make(chan struct{}, 1)
912 go func() {
913 comm.Get()
914 changes <- struct{}{}
915 }()
916 select {
917 case <-changes:
918 case <-timer.C:
919 t.Fatalf("no dsn in 1s")
920 }
921
922 // We shouldn't have any more work to do.
923 msgs, err = List(ctxbg, Filter{}, Sort{})
924 tcheck(t, err, "list messages at end of test")
925 tcompare(t, len(msgs), 0)
926}
927
928func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
929 result.Summary.TotalSuccessfulSessionCount += success
930 result.Summary.TotalFailureSessionCount += failure
931 return result
932}
933
934func clearTLSResults(t *testing.T) {
935 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
936 tcheck(t, err, "delete tls results")
937}
938
939func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
940 t.Helper()
941 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
942 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
943 result, err := q.Get()
944 tcheck(t, err, "get tls result")
945 tcompare(t, result.RecipientDomain, expRecipientDomain)
946 tcompare(t, result.IsHost, expIsHost)
947
948 // Before comparing, compensate for go1.20 vs go1.21 difference.
949 for i, r := range result.Results {
950 for j, fd := range r.FailureDetails {
951 if fd.FailureReasonCode == "tls-remote-alert-70" {
952 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
953 }
954 }
955 }
956 tcompare(t, result.Results, expResults)
957}
958
959// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
960// retired list if configured, with a proper result, that webhooks are scheduled,
961// and that cleaning up works.
962func TestRetiredHooks(t *testing.T) {
963 _, cleanup := setup(t)
964 defer cleanup()
965
966 addr, err := smtp.ParseAddress("mjl@mox.example")
967 tcheck(t, err, "parse address")
968 path := addr.Path()
969
970 mf := prepareFile(t)
971 defer os.Remove(mf.Name())
972 defer mf.Close()
973
974 resolver := dns.MockResolver{
975 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
976 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
977 }
978
979 testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
980 t.Helper()
981
982 _, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
983 tcheck(t, err, "clearing retired messages")
984 _, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
985 tcheck(t, err, "clearing hooks")
986
987 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
988 qm.Extra = map[string]string{"a": "123"}
989 err = Add(ctxbg, pkglog, account, mf, qm)
990 tcheck(t, err, "add to queue")
991
992 action()
993
994 // Should be no messages left in queue.
995 msgs, err := List(ctxbg, Filter{}, Sort{})
996 tcheck(t, err, "list messages")
997 tcompare(t, len(msgs), 0)
998
999 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1000 tcheck(t, err, "list retired messages")
1001 hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
1002 tcheck(t, err, "list hooks")
1003 if expResult == nil {
1004 tcompare(t, len(retireds), 0)
1005 tcompare(t, len(hooks), 0)
1006 } else {
1007 tcompare(t, len(retireds), 1)
1008 mr := retireds[0]
1009 tcompare(t, len(mr.Results) > 0, true)
1010 lr := mr.LastResult()
1011 lr.Start = time.Time{}
1012 lr.Duration = 0
1013 tcompare(t, lr.Error == "", expResult.Error == "")
1014 lr.Error = expResult.Error
1015 tcompare(t, lr, *expResult)
1016
1017 // Compare added webhook.
1018 tcompare(t, len(hooks), 1)
1019 h := hooks[0]
1020 var out webhook.Outgoing
1021 dec := json.NewDecoder(strings.NewReader(h.Payload))
1022 dec.DisallowUnknownFields()
1023 err := dec.Decode(&out)
1024 tcheck(t, err, "unmarshal outgoing webhook payload")
1025 tcompare(t, out.Error == "", expResult.Error == "")
1026 out.WebhookQueued = time.Time{}
1027 out.Error = ""
1028 var ecode string
1029 if expResult.Secode != "" {
1030 ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
1031 }
1032 var code int // Only set for errors.
1033 if expResult.Code != 250 {
1034 code = expResult.Code
1035 }
1036 expOut := webhook.Outgoing{
1037 Event: webhook.OutgoingEvent(expEvent),
1038 Suppressing: expSuppressing,
1039 QueueMsgID: mr.ID,
1040 FromID: mr.FromID,
1041 MessageID: mr.MessageID,
1042 Subject: mr.Subject,
1043 SMTPCode: code,
1044 SMTPEnhancedCode: ecode,
1045 Extra: mr.Extra,
1046 }
1047 tcompare(t, out, expOut)
1048 h.ID = 0
1049 h.Payload = ""
1050 h.Submitted = time.Time{}
1051 h.NextAttempt = time.Time{}
1052 exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
1053 tcompare(t, h, exph)
1054 }
1055 }
1056
1057 makeLaunchAction := func(handler func(conn net.Conn)) func() {
1058 return func() {
1059 server, client := net.Pipe()
1060 defer server.Close()
1061
1062 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1063 go handler(server)
1064 return client, nil
1065 }
1066 defer func() {
1067 smtpclient.DialHook = nil
1068 }()
1069
1070 // Trigger delivery attempt.
1071 n := launchWork(pkglog, resolver, map[string]struct{}{})
1072 tcompare(t, n, 1)
1073
1074 // Wait until delivery has finished.
1075 tm := time.NewTimer(5 * time.Second)
1076 defer tm.Stop()
1077 select {
1078 case <-tm.C:
1079 t.Fatalf("delivery didn't happen within 5s")
1080 case <-deliveryResults:
1081 }
1082 }
1083 }
1084
1085 smtpAccept := func(conn net.Conn) {
1086 br := bufio.NewReader(conn)
1087 readline := func(cmd string) {
1088 line, err := br.ReadString('\n')
1089 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1090 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1091 }
1092 }
1093 writeline := func(s string) {
1094 fmt.Fprintf(conn, "%s\r\n", s)
1095 }
1096
1097 writeline("220 mail.mox.example")
1098 readline("ehlo")
1099 writeline("250 mail.mox.example")
1100
1101 readline("mail")
1102 writeline("250 ok")
1103 readline("rcpt")
1104 writeline("250 ok")
1105 readline("data")
1106 writeline("354 continue")
1107 reader := smtp.NewDataReader(br)
1108 io.Copy(io.Discard, reader)
1109 writeline("250 ok")
1110 readline("quit")
1111 writeline("250 ok")
1112 }
1113 smtpReject := func(code int) func(conn net.Conn) {
1114 return func(conn net.Conn) {
1115 br := bufio.NewReader(conn)
1116 readline := func(cmd string) {
1117 line, err := br.ReadString('\n')
1118 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1119 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1120 }
1121 }
1122 writeline := func(s string) {
1123 fmt.Fprintf(conn, "%s\r\n", s)
1124 }
1125
1126 writeline("220 mail.mox.example")
1127 readline("ehlo")
1128 writeline("250-mail.mox.example")
1129 writeline("250 enhancedstatuscodes")
1130
1131 readline("mail")
1132 writeline(fmt.Sprintf("%d 5.1.0 nok", code))
1133 readline("quit")
1134 writeline("250 ok")
1135 }
1136 }
1137
1138 testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
1139 testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{Code: 250, Success: true}, string(webhook.EventDelivered), false)
1140 // 554 is generic, doesn't immediately cause suppression.
1141 testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
1142 testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
1143 // 550 causes immediate suppression, check for it in webhook.
1144 testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
1145 testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
1146 // Try to deliver to suppressed addresses.
1147 launch := func() {
1148 n := launchWork(pkglog, resolver, map[string]struct{}{})
1149 tcompare(t, n, 1)
1150 <-deliveryResults
1151 }
1152 testAction("mjl", launch, nil, "", false)
1153 testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
1154
1155 queueFail := func() {
1156 n, err := Fail(ctxbg, pkglog, Filter{})
1157 tcheck(t, err, "cancel delivery with failure dsn")
1158 tcompare(t, n, 1)
1159 }
1160 queueDrop := func() {
1161 n, err := Drop(ctxbg, pkglog, Filter{})
1162 tcheck(t, err, "cancel delivery without failure dsn")
1163 tcompare(t, n, 1)
1164 }
1165 testAction("mjl", queueFail, nil, "", false)
1166 testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
1167 testAction("mjl", queueDrop, nil, "", false)
1168 testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
1169
1170 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1171 tcheck(t, err, "list retired messages")
1172 tcompare(t, len(retireds), 1)
1173
1174 cleanupMsgRetiredSingle(pkglog)
1175 retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1176 tcheck(t, err, "list retired messages")
1177 tcompare(t, len(retireds), 0)
1178}
1179
1180// test Start and that it attempts to deliver.
1181func TestQueueStart(t *testing.T) {
1182 // Override dial function. We'll make connecting fail and check the attempt.
1183 resolver := dns.MockResolver{
1184 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
1185 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
1186 }
1187 dialed := make(chan struct{}, 1)
1188 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1189 dialed <- struct{}{}
1190 return nil, fmt.Errorf("failure from test")
1191 }
1192 defer func() {
1193 smtpclient.DialHook = nil
1194 }()
1195
1196 _, cleanup := setup(t)
1197 defer cleanup()
1198
1199 done := make(chan struct{})
1200 defer func() {
1201 mox.ShutdownCancel()
1202 // Wait for message and hooks deliverers and cleaners.
1203 <-done
1204 <-done
1205 <-done
1206 <-done
1207 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
1208 }()
1209 Shutdown() // DB was opened already. Start will open it again. Just close it before.
1210 err := Start(resolver, done)
1211 tcheck(t, err, "queue start")
1212
1213 checkDialed := func(need bool) {
1214 t.Helper()
1215 d := time.Second / 10
1216 if need {
1217 d = time.Second
1218 }
1219 timer := time.NewTimer(d)
1220 defer timer.Stop()
1221 select {
1222 case <-dialed:
1223 if !need {
1224 t.Fatalf("unexpected dial attempt")
1225 }
1226 case <-timer.C:
1227 if need {
1228 t.Fatalf("expected to see a dial attempt")
1229 }
1230 }
1231 }
1232
1233 // HoldRule to mark mark all messages sent by mjl on hold, including existing
1234 // messages.
1235 hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
1236 tcheck(t, err, "add hold rule")
1237
1238 // All zero HoldRule holds all deliveries, and marks all on hold.
1239 hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
1240 tcheck(t, err, "add hold rule")
1241
1242 hrl, err := HoldRuleList(ctxbg)
1243 tcheck(t, err, "listing hold rules")
1244 tcompare(t, hrl, []HoldRule{hr0, hr1})
1245
1246 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1247 mf := prepareFile(t)
1248 defer os.Remove(mf.Name())
1249 defer mf.Close()
1250 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1251 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1252 tcheck(t, err, "add message to queue for delivery")
1253 checkDialed(false) // No delivery attempt yet.
1254
1255 n, err := Count(ctxbg)
1256 tcheck(t, err, "count messages in queue")
1257 tcompare(t, n, 1)
1258
1259 // Take message off hold.
1260 n, err = HoldSet(ctxbg, Filter{}, false)
1261 tcheck(t, err, "taking message off hold")
1262 tcompare(t, n, 1)
1263 checkDialed(true)
1264
1265 // Remove hold rules.
1266 err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
1267 tcheck(t, err, "removing hold rule")
1268 err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
1269 tcheck(t, err, "removing hold rule")
1270 // Check it is gone.
1271 hrl, err = HoldRuleList(ctxbg)
1272 tcheck(t, err, "listing hold rules")
1273 tcompare(t, len(hrl), 0)
1274
1275 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1276 msgqueueKick()
1277 checkDialed(false)
1278
1279 // Set new next attempt, should see another attempt.
1280 n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
1281 tcheck(t, err, "kick queue")
1282 if n != 1 {
1283 t.Fatalf("kick changed %d messages, expected 1", n)
1284 }
1285 checkDialed(true)
1286
1287 // Submit another, should be delivered immediately without HoldRule.
1288 path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1289 mf = prepareFile(t)
1290 defer os.Remove(mf.Name())
1291 defer mf.Close()
1292 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1293 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1294 tcheck(t, err, "add message to queue for delivery")
1295 checkDialed(true) // Immediate.
1296}
1297
1298func TestListFilterSort(t *testing.T) {
1299 _, cleanup := setup(t)
1300 defer cleanup()
1301
1302 // insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
1303
1304 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1305 mf := prepareFile(t)
1306 defer os.Remove(mf.Name())
1307 defer mf.Close()
1308
1309 now := time.Now().Round(0)
1310 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
1311 qm.Queued = now
1312 qm1 := qm
1313 qm1.Queued = now.Add(-time.Second)
1314 qm1.NextAttempt = now.Add(time.Minute)
1315 qml := []Msg{qm, qm, qm, qm, qm, qm1}
1316 err := Add(ctxbg, pkglog, "mjl", mf, qml...)
1317 tcheck(t, err, "add messages to queue")
1318 qm1 = qml[len(qml)-1]
1319
1320 qmlrev := slices.Clone(qml)
1321 slices.Reverse(qmlrev)
1322
1323 // Ascending by nextattempt,id.
1324 l, err := List(ctxbg, Filter{}, Sort{Asc: true})
1325 tcheck(t, err, "list messages")
1326 tcompare(t, l, qml)
1327
1328 // Descending by nextattempt,id.
1329 l, err = List(ctxbg, Filter{}, Sort{})
1330 tcheck(t, err, "list messages")
1331 tcompare(t, l, qmlrev)
1332
1333 // Descending by queued,id.
1334 l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
1335 tcheck(t, err, "list messages")
1336 ql := append(slices.Clone(qmlrev[1:]), qml[5])
1337 tcompare(t, l, ql)
1338
1339 // Filter by all fields to get a single.
1340 no := false
1341 allfilters := Filter{
1342 Max: 2,
1343 IDs: []int64{qm1.ID},
1344 Account: "mjl",
1345 From: path.XString(true),
1346 To: path.XString(true),
1347 Hold: &no,
1348 Submitted: "<1s",
1349 NextAttempt: ">1s",
1350 }
1351 l, err = List(ctxbg, allfilters, Sort{})
1352 tcheck(t, err, "list single")
1353 tcompare(t, l, []Msg{qm1})
1354
1355 // Paginated NextAttmpt asc.
1356 var lastID int64
1357 var last any
1358 l = nil
1359 for {
1360 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
1361 tcheck(t, err, "list paginated")
1362 l = append(l, nl...)
1363 if len(nl) == 0 {
1364 break
1365 }
1366 tcompare(t, len(nl), 1)
1367 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1368 }
1369 tcompare(t, l, qml)
1370
1371 // Paginated NextAttempt desc.
1372 l = nil
1373 lastID = 0
1374 last = ""
1375 for {
1376 nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
1377 tcheck(t, err, "list paginated")
1378 l = append(l, nl...)
1379 if len(nl) == 0 {
1380 break
1381 }
1382 tcompare(t, len(nl), 1)
1383 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1384 }
1385 tcompare(t, l, qmlrev)
1386
1387 // Paginated Queued desc.
1388 l = nil
1389 lastID = 0
1390 last = ""
1391 for {
1392 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
1393 tcheck(t, err, "list paginated")
1394 l = append(l, nl...)
1395 if len(nl) == 0 {
1396 break
1397 }
1398 tcompare(t, len(nl), 1)
1399 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1400 }
1401 tcompare(t, l, ql)
1402
1403 // Paginated Queued asc.
1404 l = nil
1405 lastID = 0
1406 last = ""
1407 for {
1408 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
1409 tcheck(t, err, "list paginated")
1410 l = append(l, nl...)
1411 if len(nl) == 0 {
1412 break
1413 }
1414 tcompare(t, len(nl), 1)
1415 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1416 }
1417 qlrev := slices.Clone(ql)
1418 slices.Reverse(qlrev)
1419 tcompare(t, l, qlrev)
1420
1421 // Retire messages and do similar but more basic tests. The code is similar.
1422 var mrl []MsgRetired
1423 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
1424 for _, m := range qml {
1425 mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
1426 err := tx.Insert(&mr)
1427 tcheck(t, err, "inserting retired message")
1428 mrl = append(mrl, mr)
1429 }
1430 return nil
1431 })
1432 tcheck(t, err, "adding retired messages")
1433
1434 // Paginated LastActivity desc.
1435 var lr []MsgRetired
1436 lastID = 0
1437 last = ""
1438 for {
1439 nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
1440 tcheck(t, err, "list paginated")
1441 lr = append(lr, nl...)
1442 if len(nl) == 0 {
1443 break
1444 }
1445 tcompare(t, len(nl), 1)
1446 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
1447 }
1448 mrlrev := slices.Clone(mrl)
1449 slices.Reverse(mrlrev)
1450 tcompare(t, lr, mrlrev)
1451
1452 // Filter by all fields to get a single.
1453 allretiredfilters := RetiredFilter{
1454 Max: 2,
1455 IDs: []int64{mrlrev[0].ID},
1456 Account: "mjl",
1457 From: path.XString(true),
1458 To: path.XString(true),
1459 Submitted: "<1s",
1460 LastActivity: ">1s",
1461 }
1462 lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
1463 tcheck(t, err, "list single")
1464 tcompare(t, lr, []MsgRetired{mrlrev[0]})
1465}
1466
1467// Just a cert that appears valid.
1468func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1469 notAfter := time.Now()
1470 if expired {
1471 notAfter = notAfter.Add(-time.Hour)
1472 } else {
1473 notAfter = notAfter.Add(time.Hour)
1474 }
1475
1476 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1477 template := &x509.Certificate{
1478 SerialNumber: big.NewInt(1), // Required field...
1479 DNSNames: []string{name},
1480 NotBefore: time.Now().Add(-time.Hour),
1481 NotAfter: notAfter,
1482 }
1483 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1484 if err != nil {
1485 t.Fatalf("making certificate: %s", err)
1486 }
1487 cert, err := x509.ParseCertificate(localCertBuf)
1488 if err != nil {
1489 t.Fatalf("parsing generated certificate: %s", err)
1490 }
1491 c := tls.Certificate{
1492 Certificate: [][]byte{localCertBuf},
1493 PrivateKey: privKey,
1494 Leaf: cert,
1495 }
1496 return c
1497}
1498