1package queue
2
3import (
4 "bufio"
5 "context"
6 "crypto/ed25519"
7 cryptorand "crypto/rand"
8 "crypto/sha256"
9 "crypto/tls"
10 "crypto/x509"
11 "encoding/json"
12 "fmt"
13 "io"
14 "math/big"
15 "net"
16 "os"
17 "path/filepath"
18 "reflect"
19 "slices"
20 "strings"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/mjl-/adns"
26 "github.com/mjl-/bstore"
27
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/smtp"
32 "github.com/mjl-/mox/smtpclient"
33 "github.com/mjl-/mox/store"
34 "github.com/mjl-/mox/tlsrpt"
35 "github.com/mjl-/mox/tlsrptdb"
36 "github.com/mjl-/mox/webhook"
37)
38
39var ctxbg = context.Background()
40var pkglog = mlog.New("queue", nil)
41
42func tcheck(t *testing.T, err error, msg string) {
43 if err != nil {
44 t.Helper()
45 t.Fatalf("%s: %s", msg, err)
46 }
47}
48
49func tcompare(t *testing.T, got, exp any) {
50 t.Helper()
51 if !reflect.DeepEqual(got, exp) {
52 t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
53 }
54}
55
56func setup(t *testing.T) (*store.Account, func()) {
57 // Prepare config so email can be delivered to mjl@mox.example.
58 os.RemoveAll("../testdata/queue/data")
59 log := mlog.New("queue", nil)
60 mox.Context = ctxbg
61 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
62 mox.MustLoadConfig(true, false)
63 acc, err := store.OpenAccount(log, "mjl")
64 tcheck(t, err, "open account")
65 err = acc.SetPassword(log, "testtest")
66 tcheck(t, err, "set password")
67 switchStop := store.Switchboard()
68 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
69 return acc, func() {
70 acc.Close()
71 acc.CheckClosed()
72 mox.ShutdownCancel()
73 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
74 Shutdown()
75 switchStop()
76 }
77}
78
79var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
80To: <mjl@mox.example>
81Subject: test
82
83test email
84`, "\n", "\r\n")
85
86func prepareFile(t *testing.T) *os.File {
87 t.Helper()
88 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
89 tcheck(t, err, "create temp message for delivery to queue")
90 _, err = msgFile.Write([]byte(testmsg))
91 tcheck(t, err, "write message file")
92 return msgFile
93}
94
95func TestQueue(t *testing.T) {
96 acc, cleanup := setup(t)
97 defer cleanup()
98 err := Init()
99 tcheck(t, err, "queue init")
100
101 idfilter := func(msgID int64) Filter {
102 return Filter{IDs: []int64{msgID}}
103 }
104
105 kick := func(expn int, id int64) {
106 t.Helper()
107 n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
108 tcheck(t, err, "kick queue")
109 if n != expn {
110 t.Fatalf("kick changed %d messages, expected %d", n, expn)
111 }
112 }
113
114 msgs, err := List(ctxbg, Filter{}, Sort{})
115 tcheck(t, err, "listing messages in queue")
116 if len(msgs) != 0 {
117 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
118 }
119
120 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
121 mf := prepareFile(t)
122 defer os.Remove(mf.Name())
123 defer mf.Close()
124
125 var qm Msg
126
127 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
128 err = Add(ctxbg, pkglog, "mjl", mf, qm)
129 tcheck(t, err, "add message to queue for delivery")
130
131 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
132 err = Add(ctxbg, pkglog, "mjl", mf, qm)
133 tcheck(t, err, "add message to queue for delivery")
134
135 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
136 err = Add(ctxbg, pkglog, "mjl", mf, qm)
137 tcheck(t, err, "add message to queue for delivery")
138
139 msgs, err = List(ctxbg, Filter{}, Sort{})
140 tcheck(t, err, "listing queue")
141 if len(msgs) != 3 {
142 t.Fatalf("got msgs %v, expected 1", msgs)
143 }
144
145 yes := true
146 n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
147 tcheck(t, err, "requiretlsset")
148 tcompare(t, n, 1)
149
150 msg := msgs[0]
151 if msg.Attempts != 0 {
152 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
153 }
154 n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
155 tcheck(t, err, "drop")
156 if n != 1 {
157 t.Fatalf("dropped %d, expected 1", n)
158 }
159 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
160 t.Fatalf("dropped message not removed from file system")
161 }
162
163 // Fail a message, check the account has a message afterwards, the DSN.
164 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
165 tcheck(t, err, "count messages in account")
166 tcompare(t, n, 0)
167 n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
168 tcheck(t, err, "fail")
169 if n != 1 {
170 t.Fatalf("failed %d, expected 1", n)
171 }
172 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
173 tcheck(t, err, "count messages in account")
174 tcompare(t, n, 1)
175
176 // Check filter through various List calls. Other code uses the same filtering function.
177 filter := func(f Filter, expn int) {
178 t.Helper()
179 l, err := List(ctxbg, f, Sort{})
180 tcheck(t, err, "list messages")
181 tcompare(t, len(l), expn)
182 }
183 filter(Filter{}, 1)
184 filter(Filter{Account: "mjl"}, 1)
185 filter(Filter{Account: "bogus"}, 0)
186 filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
187 filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
188 filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
189 filter(Filter{From: "mjl@"}, 1)
190 filter(Filter{From: "bogus@"}, 0)
191 filter(Filter{To: "mjl@"}, 1)
192 filter(Filter{To: "bogus@"}, 0)
193 filter(Filter{Hold: &yes}, 0)
194 no := false
195 filter(Filter{Hold: &no}, 1)
196 filter(Filter{Submitted: "<now"}, 1)
197 filter(Filter{Submitted: ">now"}, 0)
198 filter(Filter{NextAttempt: "<1m"}, 1)
199 filter(Filter{NextAttempt: ">1m"}, 0)
200 var empty string
201 bogus := "bogus"
202 filter(Filter{Transport: &empty}, 1)
203 filter(Filter{Transport: &bogus}, 0)
204
205 next := nextWork(ctxbg, pkglog, nil)
206 if next > 0 {
207 t.Fatalf("nextWork in %s, should be now", next)
208 }
209 busy := map[string]struct{}{"mox.example": {}}
210 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
211 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
212 }
213 if nn := launchWork(pkglog, nil, busy); nn != 0 {
214 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
215 }
216
217 mailDomain := dns.Domain{ASCII: "mox.example"}
218 mailHost := dns.Domain{ASCII: "mail.mox.example"}
219 resolver := dns.MockResolver{
220 A: map[string][]string{
221 "mail.mox.example.": {"127.0.0.1"},
222 "submission.example.": {"127.0.0.1"},
223 },
224 MX: map[string][]*net.MX{
225 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
226 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
227 },
228 }
229
230 // Try a failing delivery attempt.
231 var ndial int
232 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
233 ndial++
234 return nil, fmt.Errorf("failure from test")
235 }
236 defer func() {
237 smtpclient.DialHook = nil
238 }()
239
240 n = launchWork(pkglog, resolver, map[string]struct{}{})
241 tcompare(t, n, 1)
242
243 // Wait until we see the dial and the failed attempt.
244 timer := time.NewTimer(time.Second)
245 defer timer.Stop()
246 select {
247 case <-deliveryResults:
248 tcompare(t, ndial, 1)
249 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
250 tcheck(t, err, "get")
251 tcompare(t, m.Attempts, 1)
252 case <-timer.C:
253 t.Fatalf("no delivery within 1s")
254 }
255
256 // OpenMessage.
257 _, err = OpenMessage(ctxbg, msg.ID+1)
258 if err != bstore.ErrAbsent {
259 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
260 }
261 reader, err := OpenMessage(ctxbg, msg.ID)
262 tcheck(t, err, "open message")
263 defer reader.Close()
264 msgbuf, err := io.ReadAll(reader)
265 tcheck(t, err, "read message")
266 if string(msgbuf) != testmsg {
267 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
268 }
269
270 // Reduce by more than first attempt interval of 7.5 minutes.
271 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
272 tcheck(t, err, "kick")
273 if n != 0 {
274 t.Fatalf("kick %d, expected 0", n)
275 }
276 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
277 tcheck(t, err, "kick")
278 if n != 1 {
279 t.Fatalf("kicked %d, expected 1", n)
280 }
281
282 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
283 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
284 // cyclic dependencies.
285 fmt.Fprintf(server, "220 mail.mox.example\r\n")
286 br := bufio.NewReader(server)
287
288 readline := func(cmd string) {
289 line, err := br.ReadString('\n')
290 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
291 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
292 }
293 }
294 writeline := func(s string) {
295 fmt.Fprintf(server, "%s\r\n", s)
296 }
297
298 readline("ehlo")
299 writeline("250-mail.mox.example")
300 for _, ext := range extensions {
301 writeline("250-" + ext)
302 }
303 writeline("250 pipelining")
304 for tx := 0; tx < ntx; tx++ {
305 readline("mail")
306 writeline("250 ok")
307 for i := 0; i < rcpts; i++ {
308 readline("rcpt")
309 if onercpt && i > 0 {
310 writeline("552 ok")
311 } else {
312 writeline("250 ok")
313 }
314 }
315 readline("data")
316 writeline("354 continue")
317 reader := smtp.NewDataReader(br)
318 io.Copy(io.Discard, reader)
319 writeline("250 ok")
320 }
321 readline("quit")
322 writeline("221 ok")
323 }
324 fakeSMTPServer := func(server net.Conn) {
325 nfakeSMTPServer(server, 1, 1, false, nil)
326 }
327 fakeSMTPServer2Rcpts := func(server net.Conn) {
328 nfakeSMTPServer(server, 2, 1, false, nil)
329 }
330 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
331 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
332 }
333 // Server that returns an error after first recipient. We expect another
334 // transaction to deliver the second message.
335 fakeSMTPServerRcpt1 := func(server net.Conn) {
336 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
337 // cyclic dependencies.
338 fmt.Fprintf(server, "220 mail.mox.example\r\n")
339 br := bufio.NewReader(server)
340
341 readline := func(cmd string) {
342 line, err := br.ReadString('\n')
343 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
344 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
345 }
346 }
347 writeline := func(s string) {
348 fmt.Fprintf(server, "%s\r\n", s)
349 }
350
351 readline("ehlo")
352 writeline("250-mail.mox.example")
353 writeline("250 pipelining")
354
355 readline("mail")
356 writeline("250 ok")
357 readline("rcpt")
358 writeline("250 ok")
359 readline("rcpt")
360 writeline("552 ok")
361 readline("data")
362 writeline("354 continue")
363 reader := smtp.NewDataReader(br)
364 io.Copy(io.Discard, reader)
365 writeline("250 ok")
366
367 readline("mail")
368 writeline("250 ok")
369 readline("rcpt")
370 writeline("250 ok")
371 readline("data")
372 writeline("354 continue")
373 reader = smtp.NewDataReader(br)
374 io.Copy(io.Discard, reader)
375 writeline("250 ok")
376
377 readline("quit")
378 writeline("221 ok")
379 }
380
381 moxCert := fakeCert(t, "mail.mox.example", false)
382 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
383 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
384 attempt := 0
385 return func(server net.Conn) {
386 attempt++
387
388 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
389 // cyclic dependencies.
390 fmt.Fprintf(server, "220 mail.mox.example\r\n")
391 br := bufio.NewReader(server)
392
393 readline := func(cmd string) {
394 line, err := br.ReadString('\n')
395 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
396 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
397 }
398 }
399 writeline := func(s string) {
400 fmt.Fprintf(server, "%s\r\n", s)
401 }
402
403 readline("ehlo")
404 writeline("250-mail.mox.example")
405 writeline("250 starttls")
406 if nstarttls == 0 || attempt <= nstarttls {
407 readline("starttls")
408 writeline("220 ok")
409 tlsConn := tls.Server(server, tlsConfig)
410 err := tlsConn.Handshake()
411 if err != nil {
412 return
413 }
414 server = tlsConn
415 br = bufio.NewReader(server)
416
417 readline("ehlo")
418 if requiretls {
419 writeline("250-mail.mox.example")
420 writeline("250 requiretls")
421 } else {
422 writeline("250 mail.mox.example")
423 }
424 }
425 readline("mail")
426 writeline("250 ok")
427 readline("rcpt")
428 writeline("250 ok")
429 readline("data")
430 writeline("354 continue")
431 reader := smtp.NewDataReader(br)
432 io.Copy(io.Discard, reader)
433 writeline("250 ok")
434 readline("quit")
435 writeline("221 ok")
436 }
437 }
438
439 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
440 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
441 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
442 }
443
444 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
445 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
446 // cyclic dependencies.
447 fmt.Fprintf(server, "220 mail.mox.example\r\n")
448 br := bufio.NewReader(server)
449 br.ReadString('\n') // Should be EHLO.
450 fmt.Fprintf(server, "250-localhost\r\n")
451 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
452 br.ReadString('\n') // Should be AUTH PLAIN
453 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
454 br.ReadString('\n') // Should be MAIL FROM.
455 fmt.Fprintf(server, "250 ok\r\n")
456 for i := 0; i < nrcpt; i++ {
457 br.ReadString('\n') // Should be RCPT TO.
458 fmt.Fprintf(server, "250 ok\r\n")
459 }
460 br.ReadString('\n') // Should be DATA.
461 fmt.Fprintf(server, "354 continue\r\n")
462 reader := smtp.NewDataReader(br)
463 io.Copy(io.Discard, reader)
464 fmt.Fprintf(server, "250 ok\r\n")
465 br.ReadString('\n') // Should be QUIT.
466 fmt.Fprintf(server, "221 ok\r\n")
467 }
468 fakeSubmitServer := func(server net.Conn) {
469 nfakeSubmitServer(server, 1)
470 }
471 fakeSubmitServer2Rcpts := func(server net.Conn) {
472 nfakeSubmitServer(server, 2)
473 }
474
475 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
476 t.Helper()
477
478 var pipes []net.Conn
479 defer func() {
480 for _, conn := range pipes {
481 conn.Close()
482 }
483 }()
484
485 var connMu sync.Mutex
486 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
487 connMu.Lock()
488 defer connMu.Unlock()
489
490 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
491 // client-side to the invocation dial, for the attempted delivery from the queue.
492 server, client := net.Pipe()
493 pipes = append(pipes, server, client)
494 go fakeServer(server)
495
496 _, wasNetDialer = dialer.(*net.Dialer)
497
498 return client, nil
499 }
500 defer func() {
501 smtpclient.DialHook = nil
502 }()
503
504 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
505 tcheck(t, err, "get inbox")
506
507 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
508 tcheck(t, err, "querying messages in inbox")
509
510 launchWork(pkglog, resolver, map[string]struct{}{})
511
512 // Wait for all results.
513 timer.Reset(time.Second)
514 for i := 0; i < nresults; i++ {
515 select {
516 case <-deliveryResults:
517 case <-timer.C:
518 t.Fatalf("no dial within 1s")
519 }
520 }
521
522 // Check that queue is now empty.
523 xmsgs, err := List(ctxbg, Filter{}, Sort{})
524 tcheck(t, err, "list queue")
525 tcompare(t, len(xmsgs), 0)
526
527 // And that we possibly got a DSN delivered.
528 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
529 tcheck(t, err, "querying messages in inbox")
530 if expectDSN && ninbox != inboxCount+1 {
531 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
532 } else if !expectDSN && ninbox != inboxCount {
533 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
534 }
535
536 return wasNetDialer
537 }
538 testDeliver := func(fakeServer func(conn net.Conn)) bool {
539 t.Helper()
540 return testQueue(false, fakeServer, 1)
541 }
542 testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
543 t.Helper()
544 return testQueue(false, fakeServer, nresults)
545 }
546 testDSN := func(fakeServer func(conn net.Conn)) bool {
547 t.Helper()
548 return testQueue(true, fakeServer, 1)
549 }
550
551 // Test direct delivery.
552 wasNetDialer := testDeliver(fakeSMTPServer)
553 if !wasNetDialer {
554 t.Fatalf("expected net.Dialer as dialer")
555 }
556
557 // Single delivery to two recipients at same domain, expecting single connection
558 // and single transaction.
559 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
560 qml := []Msg{qm0, qm0} // Same NextAttempt.
561 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
562 tcheck(t, err, "add messages to queue for delivery")
563 testDeliver(fakeSMTPServer2Rcpts)
564
565 // Single enqueue to two recipients at different domain, expecting two connections.
566 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
567 otherpath := otheraddr.Path()
568 t0 := time.Now()
569 qml = []Msg{
570 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
571 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
572 }
573 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
574 tcheck(t, err, "add messages to queue for delivery")
575 conns := ConnectionCounter()
576 testDeliverN(fakeSMTPServer, 2)
577 nconns := ConnectionCounter()
578 if nconns != conns+2 {
579 t.Errorf("saw %d connections, expected 2", nconns-conns)
580 }
581
582 // Single enqueue with two recipients at same domain, but with smtp server that has
583 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
584 qml = []Msg{qm0, qm0}
585 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
586 tcheck(t, err, "add messages to queue for delivery")
587 testDeliver(fakeSMTPServerLimitRcpt1)
588
589 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
590 // 2nd recipient, so we expect a single connection with two transactions.
591 qml = []Msg{qm0, qm0}
592 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
593 tcheck(t, err, "add messages to queue for delivery")
594 testDeliver(fakeSMTPServerRcpt1)
595
596 // Add a message to be delivered with submit because of its route.
597 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
598 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
599 err = Add(ctxbg, pkglog, "mjl", mf, qm)
600 tcheck(t, err, "add message to queue for delivery")
601 wasNetDialer = testDeliver(fakeSubmitServer)
602 if !wasNetDialer {
603 t.Fatalf("expected net.Dialer as dialer")
604 }
605
606 // Two messages for submission.
607 qml = []Msg{qm, qm}
608 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
609 tcheck(t, err, "add messages to queue for delivery")
610 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
611 if !wasNetDialer {
612 t.Fatalf("expected net.Dialer as dialer")
613 }
614
615 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
616 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
617 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
618 tcheck(t, err, "add message to queue for delivery")
619 transportSubmitTLS := "submittls"
620 n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
621 tcheck(t, err, "set transport")
622 if n != 1 {
623 t.Fatalf("TransportSet changed %d messages, expected 1", n)
624 }
625 // Make fake cert, and make it trusted.
626 cert := fakeCert(t, "submission.example", false)
627 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
628 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
629 tlsConfig := tls.Config{
630 Certificates: []tls.Certificate{cert},
631 }
632 wasNetDialer = testDeliver(func(conn net.Conn) {
633 conn = tls.Server(conn, &tlsConfig)
634 fakeSubmitServer(conn)
635 })
636 if !wasNetDialer {
637 t.Fatalf("expected net.Dialer as dialer")
638 }
639
640 // Various failure reasons.
641 fdNotTrusted := tlsrpt.FailureDetails{
642 ResultType: tlsrpt.ResultCertificateNotTrusted,
643 SendingMTAIP: "", // Missing due to pipe.
644 ReceivingMXHostname: "mail.mox.example",
645 ReceivingMXHelo: "mail.mox.example",
646 ReceivingIP: "", // Missing due to pipe.
647 FailedSessionCount: 1,
648 FailureReasonCode: "",
649 }
650 fdTLSAUnusable := tlsrpt.FailureDetails{
651 ResultType: tlsrpt.ResultTLSAInvalid,
652 ReceivingMXHostname: "mail.mox.example",
653 FailedSessionCount: 0,
654 FailureReasonCode: "all-unusable-records+ignored",
655 }
656 fdBadProtocol := tlsrpt.FailureDetails{
657 ResultType: tlsrpt.ResultValidationFailure,
658 ReceivingMXHostname: "mail.mox.example",
659 ReceivingMXHelo: "mail.mox.example",
660 FailedSessionCount: 1,
661 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
662 }
663
664 // Add a message to be delivered with socks.
665 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
666 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
667 tcheck(t, err, "add message to queue for delivery")
668 n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
669 tcheck(t, err, "TransportSet")
670 if n != 1 {
671 t.Fatalf("TransportSet changed %d messages, expected 1", n)
672 }
673 kick(1, qml[0].ID)
674 wasNetDialer = testDeliver(fakeSMTPServer)
675 if wasNetDialer {
676 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
677 }
678
679 // Add message to be delivered with opportunistic TLS verification.
680 clearTLSResults(t)
681 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
682 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
683 tcheck(t, err, "add message to queue for delivery")
684 kick(1, qml[0].ID)
685 testDeliver(fakeSMTPSTARTTLSServer)
686 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
687 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
688
689 // Test fallback to plain text with TLS handshake fails.
690 clearTLSResults(t)
691 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
692 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
693 tcheck(t, err, "add message to queue for delivery")
694 kick(1, qml[0].ID)
695 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
696 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
697 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
698
699 // Add message to be delivered with DANE verification.
700 clearTLSResults(t)
701 resolver.AllAuthentic = true
702 resolver.TLSA = map[string][]adns.TLSA{
703 "_25._tcp.mail.mox.example.": {
704 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
705 },
706 }
707 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
708 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
709 tcheck(t, err, "add message to queue for delivery")
710 kick(1, qml[0].ID)
711 testDeliver(fakeSMTPSTARTTLSServer)
712 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
713 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
714
715 // We should know starttls/requiretls by now.
716 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
717 err = acc.DB.Get(ctxbg, &rdt)
718 tcheck(t, err, "get recipientdomaintls")
719 tcompare(t, rdt.STARTTLS, true)
720 tcompare(t, rdt.RequireTLS, true)
721
722 // Add message to be delivered with verified TLS and REQUIRETLS.
723 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
724 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
725 tcheck(t, err, "add message to queue for delivery")
726 kick(1, qml[0].ID)
727 testDeliver(fakeSMTPSTARTTLSServer)
728
729 // Check that message is delivered with all unusable DANE records.
730 clearTLSResults(t)
731 resolver.TLSA = map[string][]adns.TLSA{
732 "_25._tcp.mail.mox.example.": {
733 {},
734 },
735 }
736 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
737 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
738 tcheck(t, err, "add message to queue for delivery")
739 kick(1, qml[0].ID)
740 testDeliver(fakeSMTPSTARTTLSServer)
741 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
742 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
743
744 // Check that message is delivered with insecure TLSA records. They should be
745 // ignored and regular STARTTLS tried.
746 clearTLSResults(t)
747 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
748 resolver.TLSA = map[string][]adns.TLSA{
749 "_25._tcp.mail.mox.example.": {
750 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
751 },
752 }
753 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
754 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
755 tcheck(t, err, "add message to queue for delivery")
756 kick(1, qml[0].ID)
757 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
758 resolver.Inauthentic = nil
759 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
760 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
761
762 // STARTTLS failed, so not known supported.
763 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
764 err = acc.DB.Get(ctxbg, &rdt)
765 tcheck(t, err, "get recipientdomaintls")
766 tcompare(t, rdt.STARTTLS, false)
767 tcompare(t, rdt.RequireTLS, false)
768
769 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
770 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
771 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
772 tcheck(t, err, "add message to queue for delivery")
773 kick(1, qml[0].ID)
774 testDeliver(fakeSMTPSTARTTLSServer)
775
776 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
777 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
778 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
779 tcheck(t, err, "add message to queue for delivery")
780 kick(1, qml[0].ID)
781 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
782
783 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
784 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
785 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
786 tcheck(t, err, "add message to queue for delivery")
787 kick(1, qml[0].ID)
788 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
789
790 // Restore pre-DANE behaviour.
791 resolver.AllAuthentic = false
792 resolver.TLSA = nil
793
794 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
795 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
796 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
797 tcheck(t, err, "add message to queue for delivery")
798 kick(1, qml[0].ID)
799 // Based on DNS lookups, there won't be any dialing or SMTP connection.
800 testDSN(func(conn net.Conn) {})
801
802 // Add another message that we'll fail to deliver entirely.
803 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
804 err = Add(ctxbg, pkglog, "mjl", mf, qm)
805 tcheck(t, err, "add message to queue for delivery")
806
807 msgs, err = List(ctxbg, Filter{}, Sort{})
808 tcheck(t, err, "list queue")
809 if len(msgs) != 1 {
810 t.Fatalf("queue has %d messages, expected 1", len(msgs))
811 }
812 msg = msgs[0]
813
814 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
815 server, client := net.Pipe()
816 go func() {
817 fn(server)
818 server.Close()
819 }()
820 return client, func() {
821 server.Close()
822 client.Close()
823 }
824 }
825
826 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
827 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
828 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
829 defer func() {
830 cleanup2()
831 cleanup3()
832 cleanup4()
833 }()
834
835 seq := 0
836 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
837 seq++
838 switch seq {
839 default:
840 return nil, fmt.Errorf("connect error from test")
841 case 2:
842 return conn2, nil
843 case 3:
844 return conn3, nil
845 case 4:
846 return conn4, nil
847 }
848 }
849 defer func() {
850 smtpclient.DialHook = nil
851 }()
852
853 comm := store.RegisterComm(acc)
854 defer comm.Unregister()
855
856 for i := 1; i < 8; i++ {
857 if i == 4 {
858 resolver.AllAuthentic = true
859 resolver.TLSA = map[string][]adns.TLSA{
860 "_25._tcp.mail.mox.example.": {
861 // Non-matching zero CertAssoc, should cause failure.
862 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
863 },
864 }
865 } else {
866 resolver.AllAuthentic = false
867 resolver.TLSA = nil
868 }
869 go deliver(pkglog, resolver, msg)
870 <-deliveryResults
871 err = DB.Get(ctxbg, &msg)
872 tcheck(t, err, "get msg")
873 if msg.Attempts != i {
874 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
875 }
876 if msg.Attempts == 5 {
877 timer.Reset(time.Second)
878 changes := make(chan struct{}, 1)
879 go func() {
880 comm.Get()
881 changes <- struct{}{}
882 }()
883 select {
884 case <-changes:
885 case <-timer.C:
886 t.Fatalf("no dsn in 1s")
887 }
888 }
889 }
890
891 // Trigger final failure.
892 go deliver(pkglog, resolver, msg)
893 <-deliveryResults
894 err = DB.Get(ctxbg, &msg)
895 if err != bstore.ErrAbsent {
896 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
897 }
898
899 timer.Reset(time.Second)
900 changes := make(chan struct{}, 1)
901 go func() {
902 comm.Get()
903 changes <- struct{}{}
904 }()
905 select {
906 case <-changes:
907 case <-timer.C:
908 t.Fatalf("no dsn in 1s")
909 }
910
911 // We shouldn't have any more work to do.
912 msgs, err = List(ctxbg, Filter{}, Sort{})
913 tcheck(t, err, "list messages at end of test")
914 tcompare(t, len(msgs), 0)
915}
916
917func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
918 result.Summary.TotalSuccessfulSessionCount += success
919 result.Summary.TotalFailureSessionCount += failure
920 return result
921}
922
923func clearTLSResults(t *testing.T) {
924 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
925 tcheck(t, err, "delete tls results")
926}
927
928func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
929 t.Helper()
930 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
931 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
932 result, err := q.Get()
933 tcheck(t, err, "get tls result")
934 tcompare(t, result.RecipientDomain, expRecipientDomain)
935 tcompare(t, result.IsHost, expIsHost)
936
937 // Before comparing, compensate for go1.20 vs go1.21 difference.
938 for i, r := range result.Results {
939 for j, fd := range r.FailureDetails {
940 if fd.FailureReasonCode == "tls-remote-alert-70" {
941 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
942 }
943 }
944 }
945 tcompare(t, result.Results, expResults)
946}
947
948// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
949// retired list if configured, with a proper result, that webhooks are scheduled,
950// and that cleaning up works.
951func TestRetiredHooks(t *testing.T) {
952 _, cleanup := setup(t)
953 defer cleanup()
954 err := Init()
955 tcheck(t, err, "queue init")
956
957 addr, err := smtp.ParseAddress("mjl@mox.example")
958 tcheck(t, err, "parse address")
959 path := addr.Path()
960
961 mf := prepareFile(t)
962 defer os.Remove(mf.Name())
963 defer mf.Close()
964
965 resolver := dns.MockResolver{
966 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
967 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
968 }
969
970 testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
971 t.Helper()
972
973 _, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
974 tcheck(t, err, "clearing retired messages")
975 _, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
976 tcheck(t, err, "clearing hooks")
977
978 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
979 qm.Extra = map[string]string{"a": "123"}
980 err = Add(ctxbg, pkglog, account, mf, qm)
981 tcheck(t, err, "add to queue")
982
983 action()
984
985 // Should be no messages left in queue.
986 msgs, err := List(ctxbg, Filter{}, Sort{})
987 tcheck(t, err, "list messages")
988 tcompare(t, len(msgs), 0)
989
990 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
991 tcheck(t, err, "list retired messages")
992 hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
993 tcheck(t, err, "list hooks")
994 if expResult == nil {
995 tcompare(t, len(retireds), 0)
996 tcompare(t, len(hooks), 0)
997 } else {
998 tcompare(t, len(retireds), 1)
999 mr := retireds[0]
1000 tcompare(t, len(mr.Results) > 0, true)
1001 lr := mr.LastResult()
1002 lr.Start = time.Time{}
1003 lr.Duration = 0
1004 tcompare(t, lr.Error == "", expResult.Error == "")
1005 lr.Error = expResult.Error
1006 tcompare(t, lr, *expResult)
1007
1008 // Compare added webhook.
1009 tcompare(t, len(hooks), 1)
1010 h := hooks[0]
1011 var out webhook.Outgoing
1012 dec := json.NewDecoder(strings.NewReader(h.Payload))
1013 dec.DisallowUnknownFields()
1014 err := dec.Decode(&out)
1015 tcheck(t, err, "unmarshal outgoing webhook payload")
1016 tcompare(t, out.Error == "", expResult.Error == "")
1017 out.WebhookQueued = time.Time{}
1018 out.Error = ""
1019 var ecode string
1020 if expResult.Secode != "" {
1021 ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
1022 }
1023 expOut := webhook.Outgoing{
1024 Event: webhook.OutgoingEvent(expEvent),
1025 Suppressing: expSuppressing,
1026 QueueMsgID: mr.ID,
1027 FromID: mr.FromID,
1028 MessageID: mr.MessageID,
1029 Subject: mr.Subject,
1030 SMTPCode: expResult.Code,
1031 SMTPEnhancedCode: ecode,
1032 Extra: mr.Extra,
1033 }
1034 tcompare(t, out, expOut)
1035 h.ID = 0
1036 h.Payload = ""
1037 h.Submitted = time.Time{}
1038 h.NextAttempt = time.Time{}
1039 exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
1040 tcompare(t, h, exph)
1041 }
1042 }
1043
1044 makeLaunchAction := func(handler func(conn net.Conn)) func() {
1045 return func() {
1046 server, client := net.Pipe()
1047 defer server.Close()
1048
1049 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1050 go handler(server)
1051 return client, nil
1052 }
1053 defer func() {
1054 smtpclient.DialHook = nil
1055 }()
1056
1057 // Trigger delivery attempt.
1058 n := launchWork(pkglog, resolver, map[string]struct{}{})
1059 tcompare(t, n, 1)
1060
1061 // Wait until delivery has finished.
1062 tm := time.NewTimer(5 * time.Second)
1063 defer tm.Stop()
1064 select {
1065 case <-tm.C:
1066 t.Fatalf("delivery didn't happen within 5s")
1067 case <-deliveryResults:
1068 }
1069 }
1070 }
1071
1072 smtpAccept := func(conn net.Conn) {
1073 br := bufio.NewReader(conn)
1074 readline := func(cmd string) {
1075 line, err := br.ReadString('\n')
1076 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1077 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1078 }
1079 }
1080 writeline := func(s string) {
1081 fmt.Fprintf(conn, "%s\r\n", s)
1082 }
1083
1084 writeline("220 mail.mox.example")
1085 readline("ehlo")
1086 writeline("250 mail.mox.example")
1087
1088 readline("mail")
1089 writeline("250 ok")
1090 readline("rcpt")
1091 writeline("250 ok")
1092 readline("data")
1093 writeline("354 continue")
1094 reader := smtp.NewDataReader(br)
1095 io.Copy(io.Discard, reader)
1096 writeline("250 ok")
1097 readline("quit")
1098 writeline("250 ok")
1099 }
1100 smtpReject := func(code int) func(conn net.Conn) {
1101 return func(conn net.Conn) {
1102 br := bufio.NewReader(conn)
1103 readline := func(cmd string) {
1104 line, err := br.ReadString('\n')
1105 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1106 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1107 }
1108 }
1109 writeline := func(s string) {
1110 fmt.Fprintf(conn, "%s\r\n", s)
1111 }
1112
1113 writeline("220 mail.mox.example")
1114 readline("ehlo")
1115 writeline("250-mail.mox.example")
1116 writeline("250 enhancedstatuscodes")
1117
1118 readline("mail")
1119 writeline(fmt.Sprintf("%d 5.1.0 nok", code))
1120 readline("quit")
1121 writeline("250 ok")
1122 }
1123 }
1124
1125 testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
1126 testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{}, string(webhook.EventDelivered), false)
1127 // 554 is generic, doesn't immediately cause suppression.
1128 testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
1129 testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
1130 // 550 causes immediate suppression, check for it in webhook.
1131 testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
1132 testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
1133 // Try to deliver to suppressed addresses.
1134 launch := func() {
1135 n := launchWork(pkglog, resolver, map[string]struct{}{})
1136 tcompare(t, n, 1)
1137 <-deliveryResults
1138 }
1139 testAction("mjl", launch, nil, "", false)
1140 testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
1141
1142 queueFail := func() {
1143 n, err := Fail(ctxbg, pkglog, Filter{})
1144 tcheck(t, err, "cancel delivery with failure dsn")
1145 tcompare(t, n, 1)
1146 }
1147 queueDrop := func() {
1148 n, err := Drop(ctxbg, pkglog, Filter{})
1149 tcheck(t, err, "cancel delivery without failure dsn")
1150 tcompare(t, n, 1)
1151 }
1152 testAction("mjl", queueFail, nil, "", false)
1153 testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
1154 testAction("mjl", queueDrop, nil, "", false)
1155 testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
1156
1157 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1158 tcheck(t, err, "list retired messages")
1159 tcompare(t, len(retireds), 1)
1160
1161 cleanupMsgRetiredSingle(pkglog)
1162 retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1163 tcheck(t, err, "list retired messages")
1164 tcompare(t, len(retireds), 0)
1165}
1166
1167// test Start and that it attempts to deliver.
1168func TestQueueStart(t *testing.T) {
1169 // Override dial function. We'll make connecting fail and check the attempt.
1170 resolver := dns.MockResolver{
1171 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
1172 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
1173 }
1174 dialed := make(chan struct{}, 1)
1175 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1176 dialed <- struct{}{}
1177 return nil, fmt.Errorf("failure from test")
1178 }
1179 defer func() {
1180 smtpclient.DialHook = nil
1181 }()
1182
1183 _, cleanup := setup(t)
1184 defer cleanup()
1185
1186 done := make(chan struct{})
1187 defer func() {
1188 mox.ShutdownCancel()
1189 // Wait for message and hooks deliverers and cleaners.
1190 <-done
1191 <-done
1192 <-done
1193 <-done
1194 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
1195 }()
1196 err := Start(resolver, done)
1197 tcheck(t, err, "queue start")
1198
1199 checkDialed := func(need bool) {
1200 t.Helper()
1201 d := time.Second / 10
1202 if need {
1203 d = time.Second
1204 }
1205 timer := time.NewTimer(d)
1206 defer timer.Stop()
1207 select {
1208 case <-dialed:
1209 if !need {
1210 t.Fatalf("unexpected dial attempt")
1211 }
1212 case <-timer.C:
1213 if need {
1214 t.Fatalf("expected to see a dial attempt")
1215 }
1216 }
1217 }
1218
1219 // HoldRule to mark mark all messages sent by mjl on hold, including existing
1220 // messages.
1221 hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
1222 tcheck(t, err, "add hold rule")
1223
1224 // All zero HoldRule holds all deliveries, and marks all on hold.
1225 hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
1226 tcheck(t, err, "add hold rule")
1227
1228 hrl, err := HoldRuleList(ctxbg)
1229 tcheck(t, err, "listing hold rules")
1230 tcompare(t, hrl, []HoldRule{hr0, hr1})
1231
1232 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1233 mf := prepareFile(t)
1234 defer os.Remove(mf.Name())
1235 defer mf.Close()
1236 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1237 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1238 tcheck(t, err, "add message to queue for delivery")
1239 checkDialed(false) // No delivery attempt yet.
1240
1241 n, err := Count(ctxbg)
1242 tcheck(t, err, "count messages in queue")
1243 tcompare(t, n, 1)
1244
1245 // Take message off hold.
1246 n, err = HoldSet(ctxbg, Filter{}, false)
1247 tcheck(t, err, "taking message off hold")
1248 tcompare(t, n, 1)
1249 checkDialed(true)
1250
1251 // Remove hold rules.
1252 err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
1253 tcheck(t, err, "removing hold rule")
1254 err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
1255 tcheck(t, err, "removing hold rule")
1256 // Check it is gone.
1257 hrl, err = HoldRuleList(ctxbg)
1258 tcheck(t, err, "listing hold rules")
1259 tcompare(t, len(hrl), 0)
1260
1261 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1262 msgqueueKick()
1263 checkDialed(false)
1264
1265 // Set new next attempt, should see another attempt.
1266 n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
1267 tcheck(t, err, "kick queue")
1268 if n != 1 {
1269 t.Fatalf("kick changed %d messages, expected 1", n)
1270 }
1271 checkDialed(true)
1272
1273 // Submit another, should be delivered immediately without HoldRule.
1274 path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1275 mf = prepareFile(t)
1276 defer os.Remove(mf.Name())
1277 defer mf.Close()
1278 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1279 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1280 tcheck(t, err, "add message to queue for delivery")
1281 checkDialed(true) // Immediate.
1282}
1283
1284func TestListFilterSort(t *testing.T) {
1285 _, cleanup := setup(t)
1286 defer cleanup()
1287 err := Init()
1288 tcheck(t, err, "queue init")
1289
1290 // insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
1291
1292 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1293 mf := prepareFile(t)
1294 defer os.Remove(mf.Name())
1295 defer mf.Close()
1296
1297 now := time.Now().Round(0)
1298 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
1299 qm.Queued = now
1300 qm1 := qm
1301 qm1.Queued = now.Add(-time.Second)
1302 qm1.NextAttempt = now.Add(time.Minute)
1303 qml := []Msg{qm, qm, qm, qm, qm, qm1}
1304 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
1305 tcheck(t, err, "add messages to queue")
1306 qm1 = qml[len(qml)-1]
1307
1308 qmlrev := slices.Clone(qml)
1309 slices.Reverse(qmlrev)
1310
1311 // Ascending by nextattempt,id.
1312 l, err := List(ctxbg, Filter{}, Sort{Asc: true})
1313 tcheck(t, err, "list messages")
1314 tcompare(t, l, qml)
1315
1316 // Descending by nextattempt,id.
1317 l, err = List(ctxbg, Filter{}, Sort{})
1318 tcheck(t, err, "list messages")
1319 tcompare(t, l, qmlrev)
1320
1321 // Descending by queued,id.
1322 l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
1323 tcheck(t, err, "list messages")
1324 ql := append(append([]Msg{}, qmlrev[1:]...), qml[5])
1325 tcompare(t, l, ql)
1326
1327 // Filter by all fields to get a single.
1328 no := false
1329 allfilters := Filter{
1330 Max: 2,
1331 IDs: []int64{qm1.ID},
1332 Account: "mjl",
1333 From: path.XString(true),
1334 To: path.XString(true),
1335 Hold: &no,
1336 Submitted: "<1s",
1337 NextAttempt: ">1s",
1338 }
1339 l, err = List(ctxbg, allfilters, Sort{})
1340 tcheck(t, err, "list single")
1341 tcompare(t, l, []Msg{qm1})
1342
1343 // Paginated NextAttmpt asc.
1344 var lastID int64
1345 var last any
1346 l = nil
1347 for {
1348 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
1349 tcheck(t, err, "list paginated")
1350 l = append(l, nl...)
1351 if len(nl) == 0 {
1352 break
1353 }
1354 tcompare(t, len(nl), 1)
1355 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1356 }
1357 tcompare(t, l, qml)
1358
1359 // Paginated NextAttempt desc.
1360 l = nil
1361 lastID = 0
1362 last = ""
1363 for {
1364 nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
1365 tcheck(t, err, "list paginated")
1366 l = append(l, nl...)
1367 if len(nl) == 0 {
1368 break
1369 }
1370 tcompare(t, len(nl), 1)
1371 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1372 }
1373 tcompare(t, l, qmlrev)
1374
1375 // Paginated Queued desc.
1376 l = nil
1377 lastID = 0
1378 last = ""
1379 for {
1380 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
1381 tcheck(t, err, "list paginated")
1382 l = append(l, nl...)
1383 if len(nl) == 0 {
1384 break
1385 }
1386 tcompare(t, len(nl), 1)
1387 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1388 }
1389 tcompare(t, l, ql)
1390
1391 // Paginated Queued asc.
1392 l = nil
1393 lastID = 0
1394 last = ""
1395 for {
1396 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
1397 tcheck(t, err, "list paginated")
1398 l = append(l, nl...)
1399 if len(nl) == 0 {
1400 break
1401 }
1402 tcompare(t, len(nl), 1)
1403 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1404 }
1405 qlrev := slices.Clone(ql)
1406 slices.Reverse(qlrev)
1407 tcompare(t, l, qlrev)
1408
1409 // Retire messages and do similar but more basic tests. The code is similar.
1410 var mrl []MsgRetired
1411 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
1412 for _, m := range qml {
1413 mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
1414 err := tx.Insert(&mr)
1415 tcheck(t, err, "inserting retired message")
1416 mrl = append(mrl, mr)
1417 }
1418 return nil
1419 })
1420 tcheck(t, err, "adding retired messages")
1421
1422 // Paginated LastActivity desc.
1423 var lr []MsgRetired
1424 lastID = 0
1425 last = ""
1426 l = nil
1427 for {
1428 nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
1429 tcheck(t, err, "list paginated")
1430 lr = append(lr, nl...)
1431 if len(nl) == 0 {
1432 break
1433 }
1434 tcompare(t, len(nl), 1)
1435 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
1436 }
1437 mrlrev := slices.Clone(mrl)
1438 slices.Reverse(mrlrev)
1439 tcompare(t, lr, mrlrev)
1440
1441 // Filter by all fields to get a single.
1442 allretiredfilters := RetiredFilter{
1443 Max: 2,
1444 IDs: []int64{mrlrev[0].ID},
1445 Account: "mjl",
1446 From: path.XString(true),
1447 To: path.XString(true),
1448 Submitted: "<1s",
1449 LastActivity: ">1s",
1450 }
1451 lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
1452 tcheck(t, err, "list single")
1453 tcompare(t, lr, []MsgRetired{mrlrev[0]})
1454}
1455
1456// Just a cert that appears valid.
1457func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1458 notAfter := time.Now()
1459 if expired {
1460 notAfter = notAfter.Add(-time.Hour)
1461 } else {
1462 notAfter = notAfter.Add(time.Hour)
1463 }
1464
1465 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1466 template := &x509.Certificate{
1467 SerialNumber: big.NewInt(1), // Required field...
1468 DNSNames: []string{name},
1469 NotBefore: time.Now().Add(-time.Hour),
1470 NotAfter: notAfter,
1471 }
1472 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1473 if err != nil {
1474 t.Fatalf("making certificate: %s", err)
1475 }
1476 cert, err := x509.ParseCertificate(localCertBuf)
1477 if err != nil {
1478 t.Fatalf("parsing generated certificate: %s", err)
1479 }
1480 c := tls.Certificate{
1481 Certificate: [][]byte{localCertBuf},
1482 PrivateKey: privKey,
1483 Leaf: cert,
1484 }
1485 return c
1486}
1487