1package queue
2
3import (
4 "bufio"
5 "context"
6 "crypto/ed25519"
7 cryptorand "crypto/rand"
8 "crypto/sha256"
9 "crypto/tls"
10 "crypto/x509"
11 "encoding/json"
12 "fmt"
13 "io"
14 "math/big"
15 "net"
16 "os"
17 "path/filepath"
18 "reflect"
19 "slices"
20 "strings"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/mjl-/adns"
26 "github.com/mjl-/bstore"
27
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/mtastsdb"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/smtpclient"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/tlsrpt"
36 "github.com/mjl-/mox/tlsrptdb"
37 "github.com/mjl-/mox/webhook"
38)
39
40var ctxbg = context.Background()
41var pkglog = mlog.New("queue", nil)
42
43func tcheck(t *testing.T, err error, msg string) {
44 if err != nil {
45 t.Helper()
46 t.Fatalf("%s: %s", msg, err)
47 }
48}
49
50func tcompare(t *testing.T, got, exp any) {
51 t.Helper()
52 if !reflect.DeepEqual(got, exp) {
53 t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
54 }
55}
56
57func setup(t *testing.T) (*store.Account, func()) {
58 // Prepare config so email can be delivered to mjl@mox.example.
59 os.RemoveAll("../testdata/queue/data")
60 log := mlog.New("queue", nil)
61 mox.Context = ctxbg
62 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
63 mox.MustLoadConfig(true, false)
64 err := Init()
65 tcheck(t, err, "queue init")
66 err = mtastsdb.Init(false)
67 tcheck(t, err, "mtastsdb init")
68 err = tlsrptdb.Init()
69 tcheck(t, err, "tlsrptdb init")
70 acc, err := store.OpenAccount(log, "mjl")
71 tcheck(t, err, "open account")
72 err = acc.SetPassword(log, "testtest")
73 tcheck(t, err, "set password")
74 switchStop := store.Switchboard()
75 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
76 return acc, func() {
77 acc.Close()
78 acc.CheckClosed()
79 mox.ShutdownCancel()
80 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
81 Shutdown()
82 err := mtastsdb.Close()
83 tcheck(t, err, "mtastsdb close")
84 err = tlsrptdb.Close()
85 tcheck(t, err, "tlsrptdb close")
86 switchStop()
87 }
88}
89
90var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
91To: <mjl@mox.example>
92Subject: test
93
94test email
95`, "\n", "\r\n")
96
97func prepareFile(t *testing.T) *os.File {
98 t.Helper()
99 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
100 tcheck(t, err, "create temp message for delivery to queue")
101 _, err = msgFile.Write([]byte(testmsg))
102 tcheck(t, err, "write message file")
103 return msgFile
104}
105
106func TestQueue(t *testing.T) {
107 acc, cleanup := setup(t)
108 defer cleanup()
109
110 idfilter := func(msgID int64) Filter {
111 return Filter{IDs: []int64{msgID}}
112 }
113
114 kick := func(expn int, id int64) {
115 t.Helper()
116 n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
117 tcheck(t, err, "kick queue")
118 if n != expn {
119 t.Fatalf("kick changed %d messages, expected %d", n, expn)
120 }
121 }
122
123 msgs, err := List(ctxbg, Filter{}, Sort{})
124 tcheck(t, err, "listing messages in queue")
125 if len(msgs) != 0 {
126 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
127 }
128
129 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
130 mf := prepareFile(t)
131 defer os.Remove(mf.Name())
132 defer mf.Close()
133
134 var qm Msg
135
136 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
137 err = Add(ctxbg, pkglog, "mjl", mf, qm)
138 tcheck(t, err, "add message to queue for delivery")
139
140 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
141 err = Add(ctxbg, pkglog, "mjl", mf, qm)
142 tcheck(t, err, "add message to queue for delivery")
143
144 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
145 err = Add(ctxbg, pkglog, "mjl", mf, qm)
146 tcheck(t, err, "add message to queue for delivery")
147
148 msgs, err = List(ctxbg, Filter{}, Sort{})
149 tcheck(t, err, "listing queue")
150 if len(msgs) != 3 {
151 t.Fatalf("got msgs %v, expected 1", msgs)
152 }
153
154 yes := true
155 n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
156 tcheck(t, err, "requiretlsset")
157 tcompare(t, n, 1)
158
159 msg := msgs[0]
160 if msg.Attempts != 0 {
161 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
162 }
163 n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
164 tcheck(t, err, "drop")
165 if n != 1 {
166 t.Fatalf("dropped %d, expected 1", n)
167 }
168 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
169 t.Fatalf("dropped message not removed from file system")
170 }
171
172 // Fail a message, check the account has a message afterwards, the DSN.
173 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
174 tcheck(t, err, "count messages in account")
175 tcompare(t, n, 0)
176 n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
177 tcheck(t, err, "fail")
178 if n != 1 {
179 t.Fatalf("failed %d, expected 1", n)
180 }
181 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
182 tcheck(t, err, "count messages in account")
183 tcompare(t, n, 1)
184
185 // Check filter through various List calls. Other code uses the same filtering function.
186 filter := func(f Filter, expn int) {
187 t.Helper()
188 l, err := List(ctxbg, f, Sort{})
189 tcheck(t, err, "list messages")
190 tcompare(t, len(l), expn)
191 }
192 filter(Filter{}, 1)
193 filter(Filter{Account: "mjl"}, 1)
194 filter(Filter{Account: "bogus"}, 0)
195 filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
196 filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
197 filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
198 filter(Filter{From: "mjl@"}, 1)
199 filter(Filter{From: "bogus@"}, 0)
200 filter(Filter{To: "mjl@"}, 1)
201 filter(Filter{To: "bogus@"}, 0)
202 filter(Filter{Hold: &yes}, 0)
203 no := false
204 filter(Filter{Hold: &no}, 1)
205 filter(Filter{Submitted: "<now"}, 1)
206 filter(Filter{Submitted: ">now"}, 0)
207 filter(Filter{NextAttempt: "<1m"}, 1)
208 filter(Filter{NextAttempt: ">1m"}, 0)
209 var empty string
210 bogus := "bogus"
211 filter(Filter{Transport: &empty}, 1)
212 filter(Filter{Transport: &bogus}, 0)
213
214 next := nextWork(ctxbg, pkglog, nil)
215 if next > 0 {
216 t.Fatalf("nextWork in %s, should be now", next)
217 }
218 busy := map[string]struct{}{"mox.example": {}}
219 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
220 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
221 }
222 if nn := launchWork(pkglog, nil, busy); nn != 0 {
223 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
224 }
225
226 mailDomain := dns.Domain{ASCII: "mox.example"}
227 mailHost := dns.Domain{ASCII: "mail.mox.example"}
228 resolver := dns.MockResolver{
229 A: map[string][]string{
230 "mail.mox.example.": {"127.0.0.1"},
231 "submission.example.": {"127.0.0.1"},
232 },
233 MX: map[string][]*net.MX{
234 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
235 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
236 },
237 }
238
239 // Try a failing delivery attempt.
240 var ndial int
241 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
242 ndial++
243 return nil, fmt.Errorf("failure from test")
244 }
245 defer func() {
246 smtpclient.DialHook = nil
247 }()
248
249 n = launchWork(pkglog, resolver, map[string]struct{}{})
250 tcompare(t, n, 1)
251
252 // Wait until we see the dial and the failed attempt.
253 timer := time.NewTimer(time.Second)
254 defer timer.Stop()
255 select {
256 case <-deliveryResults:
257 tcompare(t, ndial, 1)
258 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
259 tcheck(t, err, "get")
260 tcompare(t, m.Attempts, 1)
261 case <-timer.C:
262 t.Fatalf("no delivery within 1s")
263 }
264
265 // OpenMessage.
266 _, err = OpenMessage(ctxbg, msg.ID+1)
267 if err != bstore.ErrAbsent {
268 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
269 }
270 reader, err := OpenMessage(ctxbg, msg.ID)
271 tcheck(t, err, "open message")
272 defer reader.Close()
273 msgbuf, err := io.ReadAll(reader)
274 tcheck(t, err, "read message")
275 if string(msgbuf) != testmsg {
276 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
277 }
278
279 // Reduce by more than first attempt interval of 7.5 minutes.
280 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
281 tcheck(t, err, "kick")
282 if n != 0 {
283 t.Fatalf("kick %d, expected 0", n)
284 }
285 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
286 tcheck(t, err, "kick")
287 if n != 1 {
288 t.Fatalf("kicked %d, expected 1", n)
289 }
290
291 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
292 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
293 // cyclic dependencies.
294 fmt.Fprintf(server, "220 mail.mox.example\r\n")
295 br := bufio.NewReader(server)
296
297 readline := func(cmd string) {
298 line, err := br.ReadString('\n')
299 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
300 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
301 }
302 }
303 writeline := func(s string) {
304 fmt.Fprintf(server, "%s\r\n", s)
305 }
306
307 readline("ehlo")
308 writeline("250-mail.mox.example")
309 for _, ext := range extensions {
310 writeline("250-" + ext)
311 }
312 writeline("250 pipelining")
313 for tx := 0; tx < ntx; tx++ {
314 readline("mail")
315 writeline("250 ok")
316 for i := 0; i < rcpts; i++ {
317 readline("rcpt")
318 if onercpt && i > 0 {
319 writeline("552 ok")
320 } else {
321 writeline("250 ok")
322 }
323 }
324 readline("data")
325 writeline("354 continue")
326 reader := smtp.NewDataReader(br)
327 io.Copy(io.Discard, reader)
328 writeline("250 ok")
329 }
330 readline("quit")
331 writeline("221 ok")
332 }
333 fakeSMTPServer := func(server net.Conn) {
334 nfakeSMTPServer(server, 1, 1, false, nil)
335 }
336 fakeSMTPServer2Rcpts := func(server net.Conn) {
337 nfakeSMTPServer(server, 2, 1, false, nil)
338 }
339 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
340 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
341 }
342 // Server that returns an error after first recipient. We expect another
343 // transaction to deliver the second message.
344 fakeSMTPServerRcpt1 := func(server net.Conn) {
345 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
346 // cyclic dependencies.
347 fmt.Fprintf(server, "220 mail.mox.example\r\n")
348 br := bufio.NewReader(server)
349
350 readline := func(cmd string) {
351 line, err := br.ReadString('\n')
352 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
353 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
354 }
355 }
356 writeline := func(s string) {
357 fmt.Fprintf(server, "%s\r\n", s)
358 }
359
360 readline("ehlo")
361 writeline("250-mail.mox.example")
362 writeline("250 pipelining")
363
364 readline("mail")
365 writeline("250 ok")
366 readline("rcpt")
367 writeline("250 ok")
368 readline("rcpt")
369 writeline("552 ok")
370 readline("data")
371 writeline("354 continue")
372 reader := smtp.NewDataReader(br)
373 io.Copy(io.Discard, reader)
374 writeline("250 ok")
375
376 readline("mail")
377 writeline("250 ok")
378 readline("rcpt")
379 writeline("250 ok")
380 readline("data")
381 writeline("354 continue")
382 reader = smtp.NewDataReader(br)
383 io.Copy(io.Discard, reader)
384 writeline("250 ok")
385
386 readline("quit")
387 writeline("221 ok")
388 }
389
390 moxCert := fakeCert(t, "mail.mox.example", false)
391 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
392 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
393 attempt := 0
394 return func(server net.Conn) {
395 attempt++
396
397 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
398 // cyclic dependencies.
399 fmt.Fprintf(server, "220 mail.mox.example\r\n")
400 br := bufio.NewReader(server)
401
402 readline := func(cmd string) {
403 line, err := br.ReadString('\n')
404 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
405 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
406 }
407 }
408 writeline := func(s string) {
409 fmt.Fprintf(server, "%s\r\n", s)
410 }
411
412 readline("ehlo")
413 writeline("250-mail.mox.example")
414 writeline("250 starttls")
415 if nstarttls == 0 || attempt <= nstarttls {
416 readline("starttls")
417 writeline("220 ok")
418 tlsConn := tls.Server(server, tlsConfig)
419 err := tlsConn.Handshake()
420 if err != nil {
421 return
422 }
423 server = tlsConn
424 br = bufio.NewReader(server)
425
426 readline("ehlo")
427 if requiretls {
428 writeline("250-mail.mox.example")
429 writeline("250 requiretls")
430 } else {
431 writeline("250 mail.mox.example")
432 }
433 }
434 readline("mail")
435 writeline("250 ok")
436 readline("rcpt")
437 writeline("250 ok")
438 readline("data")
439 writeline("354 continue")
440 reader := smtp.NewDataReader(br)
441 io.Copy(io.Discard, reader)
442 writeline("250 ok")
443 readline("quit")
444 writeline("221 ok")
445 }
446 }
447
448 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
449 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
450 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
451 }
452
453 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
454 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
455 // cyclic dependencies.
456 fmt.Fprintf(server, "220 mail.mox.example\r\n")
457 br := bufio.NewReader(server)
458 br.ReadString('\n') // Should be EHLO.
459 fmt.Fprintf(server, "250-localhost\r\n")
460 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
461 br.ReadString('\n') // Should be AUTH PLAIN
462 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
463 br.ReadString('\n') // Should be MAIL FROM.
464 fmt.Fprintf(server, "250 ok\r\n")
465 for i := 0; i < nrcpt; i++ {
466 br.ReadString('\n') // Should be RCPT TO.
467 fmt.Fprintf(server, "250 ok\r\n")
468 }
469 br.ReadString('\n') // Should be DATA.
470 fmt.Fprintf(server, "354 continue\r\n")
471 reader := smtp.NewDataReader(br)
472 io.Copy(io.Discard, reader)
473 fmt.Fprintf(server, "250 ok\r\n")
474 br.ReadString('\n') // Should be QUIT.
475 fmt.Fprintf(server, "221 ok\r\n")
476 }
477 fakeSubmitServer := func(server net.Conn) {
478 nfakeSubmitServer(server, 1)
479 }
480 fakeSubmitServer2Rcpts := func(server net.Conn) {
481 nfakeSubmitServer(server, 2)
482 }
483
484 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
485 t.Helper()
486
487 var pipes []net.Conn
488 defer func() {
489 for _, conn := range pipes {
490 conn.Close()
491 }
492 }()
493
494 var connMu sync.Mutex
495 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
496 connMu.Lock()
497 defer connMu.Unlock()
498
499 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
500 // client-side to the invocation dial, for the attempted delivery from the queue.
501 server, client := net.Pipe()
502 pipes = append(pipes, server, client)
503 go fakeServer(server)
504
505 _, wasNetDialer = dialer.(*net.Dialer)
506
507 return client, nil
508 }
509 defer func() {
510 smtpclient.DialHook = nil
511 }()
512
513 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
514 tcheck(t, err, "get inbox")
515
516 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
517 tcheck(t, err, "querying messages in inbox")
518
519 launchWork(pkglog, resolver, map[string]struct{}{})
520
521 // Wait for all results.
522 timer.Reset(time.Second)
523 for i := 0; i < nresults; i++ {
524 select {
525 case <-deliveryResults:
526 case <-timer.C:
527 t.Fatalf("no dial within 1s")
528 }
529 }
530
531 // Check that queue is now empty.
532 xmsgs, err := List(ctxbg, Filter{}, Sort{})
533 tcheck(t, err, "list queue")
534 tcompare(t, len(xmsgs), 0)
535
536 // And that we possibly got a DSN delivered.
537 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
538 tcheck(t, err, "querying messages in inbox")
539 if expectDSN && ninbox != inboxCount+1 {
540 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
541 } else if !expectDSN && ninbox != inboxCount {
542 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
543 }
544
545 return wasNetDialer
546 }
547 testDeliver := func(fakeServer func(conn net.Conn)) bool {
548 t.Helper()
549 return testQueue(false, fakeServer, 1)
550 }
551 testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
552 t.Helper()
553 return testQueue(false, fakeServer, nresults)
554 }
555 testDSN := func(fakeServer func(conn net.Conn)) bool {
556 t.Helper()
557 return testQueue(true, fakeServer, 1)
558 }
559
560 // Test direct delivery.
561 wasNetDialer := testDeliver(fakeSMTPServer)
562 if !wasNetDialer {
563 t.Fatalf("expected net.Dialer as dialer")
564 }
565
566 // Single delivery to two recipients at same domain, expecting single connection
567 // and single transaction.
568 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
569 qml := []Msg{qm0, qm0} // Same NextAttempt.
570 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
571 tcheck(t, err, "add messages to queue for delivery")
572 testDeliver(fakeSMTPServer2Rcpts)
573
574 // Single enqueue to two recipients at different domain, expecting two connections.
575 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
576 otherpath := otheraddr.Path()
577 t0 := time.Now()
578 qml = []Msg{
579 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
580 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
581 }
582 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
583 tcheck(t, err, "add messages to queue for delivery")
584 conns := ConnectionCounter()
585 testDeliverN(fakeSMTPServer, 2)
586 nconns := ConnectionCounter()
587 if nconns != conns+2 {
588 t.Errorf("saw %d connections, expected 2", nconns-conns)
589 }
590
591 // Single enqueue with two recipients at same domain, but with smtp server that has
592 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
593 qml = []Msg{qm0, qm0}
594 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
595 tcheck(t, err, "add messages to queue for delivery")
596 testDeliver(fakeSMTPServerLimitRcpt1)
597
598 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
599 // 2nd recipient, so we expect a single connection with two transactions.
600 qml = []Msg{qm0, qm0}
601 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
602 tcheck(t, err, "add messages to queue for delivery")
603 testDeliver(fakeSMTPServerRcpt1)
604
605 // Add a message to be delivered with submit because of its route.
606 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
607 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
608 err = Add(ctxbg, pkglog, "mjl", mf, qm)
609 tcheck(t, err, "add message to queue for delivery")
610 wasNetDialer = testDeliver(fakeSubmitServer)
611 if !wasNetDialer {
612 t.Fatalf("expected net.Dialer as dialer")
613 }
614
615 // Two messages for submission.
616 qml = []Msg{qm, qm}
617 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
618 tcheck(t, err, "add messages to queue for delivery")
619 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
620 if !wasNetDialer {
621 t.Fatalf("expected net.Dialer as dialer")
622 }
623
624 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
625 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
626 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
627 tcheck(t, err, "add message to queue for delivery")
628 transportSubmitTLS := "submittls"
629 n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
630 tcheck(t, err, "set transport")
631 if n != 1 {
632 t.Fatalf("TransportSet changed %d messages, expected 1", n)
633 }
634 // Make fake cert, and make it trusted.
635 cert := fakeCert(t, "submission.example", false)
636 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
637 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
638 tlsConfig := tls.Config{
639 Certificates: []tls.Certificate{cert},
640 }
641 wasNetDialer = testDeliver(func(conn net.Conn) {
642 conn = tls.Server(conn, &tlsConfig)
643 fakeSubmitServer(conn)
644 })
645 if !wasNetDialer {
646 t.Fatalf("expected net.Dialer as dialer")
647 }
648
649 // Various failure reasons.
650 fdNotTrusted := tlsrpt.FailureDetails{
651 ResultType: tlsrpt.ResultCertificateNotTrusted,
652 SendingMTAIP: "", // Missing due to pipe.
653 ReceivingMXHostname: "mail.mox.example",
654 ReceivingMXHelo: "mail.mox.example",
655 ReceivingIP: "", // Missing due to pipe.
656 FailedSessionCount: 1,
657 FailureReasonCode: "",
658 }
659 fdTLSAUnusable := tlsrpt.FailureDetails{
660 ResultType: tlsrpt.ResultTLSAInvalid,
661 ReceivingMXHostname: "mail.mox.example",
662 FailedSessionCount: 0,
663 FailureReasonCode: "all-unusable-records+ignored",
664 }
665 fdBadProtocol := tlsrpt.FailureDetails{
666 ResultType: tlsrpt.ResultValidationFailure,
667 ReceivingMXHostname: "mail.mox.example",
668 ReceivingMXHelo: "mail.mox.example",
669 FailedSessionCount: 1,
670 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
671 }
672
673 // Add a message to be delivered with socks.
674 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
675 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
676 tcheck(t, err, "add message to queue for delivery")
677 n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
678 tcheck(t, err, "TransportSet")
679 if n != 1 {
680 t.Fatalf("TransportSet changed %d messages, expected 1", n)
681 }
682 kick(1, qml[0].ID)
683 wasNetDialer = testDeliver(fakeSMTPServer)
684 if wasNetDialer {
685 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
686 }
687
688 // Add message to be delivered with opportunistic TLS verification.
689 clearTLSResults(t)
690 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
691 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
692 tcheck(t, err, "add message to queue for delivery")
693 kick(1, qml[0].ID)
694 testDeliver(fakeSMTPSTARTTLSServer)
695 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
696 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
697
698 // Test fallback to plain text with TLS handshake fails.
699 clearTLSResults(t)
700 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
701 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
702 tcheck(t, err, "add message to queue for delivery")
703 kick(1, qml[0].ID)
704 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
705 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
706 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
707
708 // Add message to be delivered with DANE verification.
709 clearTLSResults(t)
710 resolver.AllAuthentic = true
711 resolver.TLSA = map[string][]adns.TLSA{
712 "_25._tcp.mail.mox.example.": {
713 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
714 },
715 }
716 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
717 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
718 tcheck(t, err, "add message to queue for delivery")
719 kick(1, qml[0].ID)
720 testDeliver(fakeSMTPSTARTTLSServer)
721 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
722 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
723
724 // We should know starttls/requiretls by now.
725 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
726 err = acc.DB.Get(ctxbg, &rdt)
727 tcheck(t, err, "get recipientdomaintls")
728 tcompare(t, rdt.STARTTLS, true)
729 tcompare(t, rdt.RequireTLS, true)
730
731 // Add message to be delivered with verified TLS and REQUIRETLS.
732 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
733 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
734 tcheck(t, err, "add message to queue for delivery")
735 kick(1, qml[0].ID)
736 testDeliver(fakeSMTPSTARTTLSServer)
737
738 // Check that message is delivered with all unusable DANE records.
739 clearTLSResults(t)
740 resolver.TLSA = map[string][]adns.TLSA{
741 "_25._tcp.mail.mox.example.": {
742 {},
743 },
744 }
745 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
746 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
747 tcheck(t, err, "add message to queue for delivery")
748 kick(1, qml[0].ID)
749 testDeliver(fakeSMTPSTARTTLSServer)
750 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
751 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
752
753 // Check that message is delivered with insecure TLSA records. They should be
754 // ignored and regular STARTTLS tried.
755 clearTLSResults(t)
756 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
757 resolver.TLSA = map[string][]adns.TLSA{
758 "_25._tcp.mail.mox.example.": {
759 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
760 },
761 }
762 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
763 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
764 tcheck(t, err, "add message to queue for delivery")
765 kick(1, qml[0].ID)
766 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
767 resolver.Inauthentic = nil
768 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
769 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
770
771 // STARTTLS failed, so not known supported.
772 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
773 err = acc.DB.Get(ctxbg, &rdt)
774 tcheck(t, err, "get recipientdomaintls")
775 tcompare(t, rdt.STARTTLS, false)
776 tcompare(t, rdt.RequireTLS, false)
777
778 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
779 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
780 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
781 tcheck(t, err, "add message to queue for delivery")
782 kick(1, qml[0].ID)
783 testDeliver(fakeSMTPSTARTTLSServer)
784
785 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
786 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
787 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
788 tcheck(t, err, "add message to queue for delivery")
789 kick(1, qml[0].ID)
790 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
791
792 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
793 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
794 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
795 tcheck(t, err, "add message to queue for delivery")
796 kick(1, qml[0].ID)
797 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
798
799 // Restore pre-DANE behaviour.
800 resolver.AllAuthentic = false
801 resolver.TLSA = nil
802
803 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
804 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
805 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
806 tcheck(t, err, "add message to queue for delivery")
807 kick(1, qml[0].ID)
808 // Based on DNS lookups, there won't be any dialing or SMTP connection.
809 testDSN(func(conn net.Conn) {})
810
811 // Add another message that we'll fail to deliver entirely.
812 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
813 err = Add(ctxbg, pkglog, "mjl", mf, qm)
814 tcheck(t, err, "add message to queue for delivery")
815
816 msgs, err = List(ctxbg, Filter{}, Sort{})
817 tcheck(t, err, "list queue")
818 if len(msgs) != 1 {
819 t.Fatalf("queue has %d messages, expected 1", len(msgs))
820 }
821 msg = msgs[0]
822
823 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
824 server, client := net.Pipe()
825 go func() {
826 fn(server)
827 server.Close()
828 }()
829 return client, func() {
830 server.Close()
831 client.Close()
832 }
833 }
834
835 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
836 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
837 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
838 defer func() {
839 cleanup2()
840 cleanup3()
841 cleanup4()
842 }()
843
844 seq := 0
845 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
846 seq++
847 switch seq {
848 default:
849 return nil, fmt.Errorf("connect error from test")
850 case 2:
851 return conn2, nil
852 case 3:
853 return conn3, nil
854 case 4:
855 return conn4, nil
856 }
857 }
858 defer func() {
859 smtpclient.DialHook = nil
860 }()
861
862 comm := store.RegisterComm(acc)
863 defer comm.Unregister()
864
865 for i := 1; i < 8; i++ {
866 if i == 4 {
867 resolver.AllAuthentic = true
868 resolver.TLSA = map[string][]adns.TLSA{
869 "_25._tcp.mail.mox.example.": {
870 // Non-matching zero CertAssoc, should cause failure.
871 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
872 },
873 }
874 } else {
875 resolver.AllAuthentic = false
876 resolver.TLSA = nil
877 }
878 go deliver(pkglog, resolver, msg)
879 <-deliveryResults
880 err = DB.Get(ctxbg, &msg)
881 tcheck(t, err, "get msg")
882 if msg.Attempts != i {
883 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
884 }
885 if msg.Attempts == 5 {
886 timer.Reset(time.Second)
887 changes := make(chan struct{}, 1)
888 go func() {
889 comm.Get()
890 changes <- struct{}{}
891 }()
892 select {
893 case <-changes:
894 case <-timer.C:
895 t.Fatalf("no dsn in 1s")
896 }
897 }
898 }
899
900 // Trigger final failure.
901 go deliver(pkglog, resolver, msg)
902 <-deliveryResults
903 err = DB.Get(ctxbg, &msg)
904 if err != bstore.ErrAbsent {
905 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
906 }
907
908 timer.Reset(time.Second)
909 changes := make(chan struct{}, 1)
910 go func() {
911 comm.Get()
912 changes <- struct{}{}
913 }()
914 select {
915 case <-changes:
916 case <-timer.C:
917 t.Fatalf("no dsn in 1s")
918 }
919
920 // We shouldn't have any more work to do.
921 msgs, err = List(ctxbg, Filter{}, Sort{})
922 tcheck(t, err, "list messages at end of test")
923 tcompare(t, len(msgs), 0)
924}
925
926func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
927 result.Summary.TotalSuccessfulSessionCount += success
928 result.Summary.TotalFailureSessionCount += failure
929 return result
930}
931
932func clearTLSResults(t *testing.T) {
933 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
934 tcheck(t, err, "delete tls results")
935}
936
937func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
938 t.Helper()
939 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
940 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
941 result, err := q.Get()
942 tcheck(t, err, "get tls result")
943 tcompare(t, result.RecipientDomain, expRecipientDomain)
944 tcompare(t, result.IsHost, expIsHost)
945
946 // Before comparing, compensate for go1.20 vs go1.21 difference.
947 for i, r := range result.Results {
948 for j, fd := range r.FailureDetails {
949 if fd.FailureReasonCode == "tls-remote-alert-70" {
950 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
951 }
952 }
953 }
954 tcompare(t, result.Results, expResults)
955}
956
957// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
958// retired list if configured, with a proper result, that webhooks are scheduled,
959// and that cleaning up works.
960func TestRetiredHooks(t *testing.T) {
961 _, cleanup := setup(t)
962 defer cleanup()
963
964 addr, err := smtp.ParseAddress("mjl@mox.example")
965 tcheck(t, err, "parse address")
966 path := addr.Path()
967
968 mf := prepareFile(t)
969 defer os.Remove(mf.Name())
970 defer mf.Close()
971
972 resolver := dns.MockResolver{
973 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
974 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
975 }
976
977 testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
978 t.Helper()
979
980 _, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
981 tcheck(t, err, "clearing retired messages")
982 _, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
983 tcheck(t, err, "clearing hooks")
984
985 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
986 qm.Extra = map[string]string{"a": "123"}
987 err = Add(ctxbg, pkglog, account, mf, qm)
988 tcheck(t, err, "add to queue")
989
990 action()
991
992 // Should be no messages left in queue.
993 msgs, err := List(ctxbg, Filter{}, Sort{})
994 tcheck(t, err, "list messages")
995 tcompare(t, len(msgs), 0)
996
997 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
998 tcheck(t, err, "list retired messages")
999 hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
1000 tcheck(t, err, "list hooks")
1001 if expResult == nil {
1002 tcompare(t, len(retireds), 0)
1003 tcompare(t, len(hooks), 0)
1004 } else {
1005 tcompare(t, len(retireds), 1)
1006 mr := retireds[0]
1007 tcompare(t, len(mr.Results) > 0, true)
1008 lr := mr.LastResult()
1009 lr.Start = time.Time{}
1010 lr.Duration = 0
1011 tcompare(t, lr.Error == "", expResult.Error == "")
1012 lr.Error = expResult.Error
1013 tcompare(t, lr, *expResult)
1014
1015 // Compare added webhook.
1016 tcompare(t, len(hooks), 1)
1017 h := hooks[0]
1018 var out webhook.Outgoing
1019 dec := json.NewDecoder(strings.NewReader(h.Payload))
1020 dec.DisallowUnknownFields()
1021 err := dec.Decode(&out)
1022 tcheck(t, err, "unmarshal outgoing webhook payload")
1023 tcompare(t, out.Error == "", expResult.Error == "")
1024 out.WebhookQueued = time.Time{}
1025 out.Error = ""
1026 var ecode string
1027 if expResult.Secode != "" {
1028 ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
1029 }
1030 var code int // Only set for errors.
1031 if expResult.Code != 250 {
1032 code = expResult.Code
1033 }
1034 expOut := webhook.Outgoing{
1035 Event: webhook.OutgoingEvent(expEvent),
1036 Suppressing: expSuppressing,
1037 QueueMsgID: mr.ID,
1038 FromID: mr.FromID,
1039 MessageID: mr.MessageID,
1040 Subject: mr.Subject,
1041 SMTPCode: code,
1042 SMTPEnhancedCode: ecode,
1043 Extra: mr.Extra,
1044 }
1045 tcompare(t, out, expOut)
1046 h.ID = 0
1047 h.Payload = ""
1048 h.Submitted = time.Time{}
1049 h.NextAttempt = time.Time{}
1050 exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
1051 tcompare(t, h, exph)
1052 }
1053 }
1054
1055 makeLaunchAction := func(handler func(conn net.Conn)) func() {
1056 return func() {
1057 server, client := net.Pipe()
1058 defer server.Close()
1059
1060 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1061 go handler(server)
1062 return client, nil
1063 }
1064 defer func() {
1065 smtpclient.DialHook = nil
1066 }()
1067
1068 // Trigger delivery attempt.
1069 n := launchWork(pkglog, resolver, map[string]struct{}{})
1070 tcompare(t, n, 1)
1071
1072 // Wait until delivery has finished.
1073 tm := time.NewTimer(5 * time.Second)
1074 defer tm.Stop()
1075 select {
1076 case <-tm.C:
1077 t.Fatalf("delivery didn't happen within 5s")
1078 case <-deliveryResults:
1079 }
1080 }
1081 }
1082
1083 smtpAccept := func(conn net.Conn) {
1084 br := bufio.NewReader(conn)
1085 readline := func(cmd string) {
1086 line, err := br.ReadString('\n')
1087 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1088 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1089 }
1090 }
1091 writeline := func(s string) {
1092 fmt.Fprintf(conn, "%s\r\n", s)
1093 }
1094
1095 writeline("220 mail.mox.example")
1096 readline("ehlo")
1097 writeline("250 mail.mox.example")
1098
1099 readline("mail")
1100 writeline("250 ok")
1101 readline("rcpt")
1102 writeline("250 ok")
1103 readline("data")
1104 writeline("354 continue")
1105 reader := smtp.NewDataReader(br)
1106 io.Copy(io.Discard, reader)
1107 writeline("250 ok")
1108 readline("quit")
1109 writeline("250 ok")
1110 }
1111 smtpReject := func(code int) func(conn net.Conn) {
1112 return func(conn net.Conn) {
1113 br := bufio.NewReader(conn)
1114 readline := func(cmd string) {
1115 line, err := br.ReadString('\n')
1116 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1117 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1118 }
1119 }
1120 writeline := func(s string) {
1121 fmt.Fprintf(conn, "%s\r\n", s)
1122 }
1123
1124 writeline("220 mail.mox.example")
1125 readline("ehlo")
1126 writeline("250-mail.mox.example")
1127 writeline("250 enhancedstatuscodes")
1128
1129 readline("mail")
1130 writeline(fmt.Sprintf("%d 5.1.0 nok", code))
1131 readline("quit")
1132 writeline("250 ok")
1133 }
1134 }
1135
1136 testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
1137 testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{Code: 250, Success: true}, string(webhook.EventDelivered), false)
1138 // 554 is generic, doesn't immediately cause suppression.
1139 testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
1140 testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
1141 // 550 causes immediate suppression, check for it in webhook.
1142 testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
1143 testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
1144 // Try to deliver to suppressed addresses.
1145 launch := func() {
1146 n := launchWork(pkglog, resolver, map[string]struct{}{})
1147 tcompare(t, n, 1)
1148 <-deliveryResults
1149 }
1150 testAction("mjl", launch, nil, "", false)
1151 testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
1152
1153 queueFail := func() {
1154 n, err := Fail(ctxbg, pkglog, Filter{})
1155 tcheck(t, err, "cancel delivery with failure dsn")
1156 tcompare(t, n, 1)
1157 }
1158 queueDrop := func() {
1159 n, err := Drop(ctxbg, pkglog, Filter{})
1160 tcheck(t, err, "cancel delivery without failure dsn")
1161 tcompare(t, n, 1)
1162 }
1163 testAction("mjl", queueFail, nil, "", false)
1164 testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
1165 testAction("mjl", queueDrop, nil, "", false)
1166 testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
1167
1168 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1169 tcheck(t, err, "list retired messages")
1170 tcompare(t, len(retireds), 1)
1171
1172 cleanupMsgRetiredSingle(pkglog)
1173 retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1174 tcheck(t, err, "list retired messages")
1175 tcompare(t, len(retireds), 0)
1176}
1177
1178// test Start and that it attempts to deliver.
1179func TestQueueStart(t *testing.T) {
1180 // Override dial function. We'll make connecting fail and check the attempt.
1181 resolver := dns.MockResolver{
1182 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
1183 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
1184 }
1185 dialed := make(chan struct{}, 1)
1186 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1187 dialed <- struct{}{}
1188 return nil, fmt.Errorf("failure from test")
1189 }
1190 defer func() {
1191 smtpclient.DialHook = nil
1192 }()
1193
1194 _, cleanup := setup(t)
1195 defer cleanup()
1196
1197 done := make(chan struct{})
1198 defer func() {
1199 mox.ShutdownCancel()
1200 // Wait for message and hooks deliverers and cleaners.
1201 <-done
1202 <-done
1203 <-done
1204 <-done
1205 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
1206 }()
1207 Shutdown() // DB was opened already. Start will open it again. Just close it before.
1208 err := Start(resolver, done)
1209 tcheck(t, err, "queue start")
1210
1211 checkDialed := func(need bool) {
1212 t.Helper()
1213 d := time.Second / 10
1214 if need {
1215 d = time.Second
1216 }
1217 timer := time.NewTimer(d)
1218 defer timer.Stop()
1219 select {
1220 case <-dialed:
1221 if !need {
1222 t.Fatalf("unexpected dial attempt")
1223 }
1224 case <-timer.C:
1225 if need {
1226 t.Fatalf("expected to see a dial attempt")
1227 }
1228 }
1229 }
1230
1231 // HoldRule to mark mark all messages sent by mjl on hold, including existing
1232 // messages.
1233 hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
1234 tcheck(t, err, "add hold rule")
1235
1236 // All zero HoldRule holds all deliveries, and marks all on hold.
1237 hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
1238 tcheck(t, err, "add hold rule")
1239
1240 hrl, err := HoldRuleList(ctxbg)
1241 tcheck(t, err, "listing hold rules")
1242 tcompare(t, hrl, []HoldRule{hr0, hr1})
1243
1244 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1245 mf := prepareFile(t)
1246 defer os.Remove(mf.Name())
1247 defer mf.Close()
1248 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1249 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1250 tcheck(t, err, "add message to queue for delivery")
1251 checkDialed(false) // No delivery attempt yet.
1252
1253 n, err := Count(ctxbg)
1254 tcheck(t, err, "count messages in queue")
1255 tcompare(t, n, 1)
1256
1257 // Take message off hold.
1258 n, err = HoldSet(ctxbg, Filter{}, false)
1259 tcheck(t, err, "taking message off hold")
1260 tcompare(t, n, 1)
1261 checkDialed(true)
1262
1263 // Remove hold rules.
1264 err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
1265 tcheck(t, err, "removing hold rule")
1266 err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
1267 tcheck(t, err, "removing hold rule")
1268 // Check it is gone.
1269 hrl, err = HoldRuleList(ctxbg)
1270 tcheck(t, err, "listing hold rules")
1271 tcompare(t, len(hrl), 0)
1272
1273 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1274 msgqueueKick()
1275 checkDialed(false)
1276
1277 // Set new next attempt, should see another attempt.
1278 n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
1279 tcheck(t, err, "kick queue")
1280 if n != 1 {
1281 t.Fatalf("kick changed %d messages, expected 1", n)
1282 }
1283 checkDialed(true)
1284
1285 // Submit another, should be delivered immediately without HoldRule.
1286 path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1287 mf = prepareFile(t)
1288 defer os.Remove(mf.Name())
1289 defer mf.Close()
1290 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1291 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1292 tcheck(t, err, "add message to queue for delivery")
1293 checkDialed(true) // Immediate.
1294}
1295
1296func TestListFilterSort(t *testing.T) {
1297 _, cleanup := setup(t)
1298 defer cleanup()
1299
1300 // insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
1301
1302 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1303 mf := prepareFile(t)
1304 defer os.Remove(mf.Name())
1305 defer mf.Close()
1306
1307 now := time.Now().Round(0)
1308 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
1309 qm.Queued = now
1310 qm1 := qm
1311 qm1.Queued = now.Add(-time.Second)
1312 qm1.NextAttempt = now.Add(time.Minute)
1313 qml := []Msg{qm, qm, qm, qm, qm, qm1}
1314 err := Add(ctxbg, pkglog, "mjl", mf, qml...)
1315 tcheck(t, err, "add messages to queue")
1316 qm1 = qml[len(qml)-1]
1317
1318 qmlrev := slices.Clone(qml)
1319 slices.Reverse(qmlrev)
1320
1321 // Ascending by nextattempt,id.
1322 l, err := List(ctxbg, Filter{}, Sort{Asc: true})
1323 tcheck(t, err, "list messages")
1324 tcompare(t, l, qml)
1325
1326 // Descending by nextattempt,id.
1327 l, err = List(ctxbg, Filter{}, Sort{})
1328 tcheck(t, err, "list messages")
1329 tcompare(t, l, qmlrev)
1330
1331 // Descending by queued,id.
1332 l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
1333 tcheck(t, err, "list messages")
1334 ql := append(append([]Msg{}, qmlrev[1:]...), qml[5])
1335 tcompare(t, l, ql)
1336
1337 // Filter by all fields to get a single.
1338 no := false
1339 allfilters := Filter{
1340 Max: 2,
1341 IDs: []int64{qm1.ID},
1342 Account: "mjl",
1343 From: path.XString(true),
1344 To: path.XString(true),
1345 Hold: &no,
1346 Submitted: "<1s",
1347 NextAttempt: ">1s",
1348 }
1349 l, err = List(ctxbg, allfilters, Sort{})
1350 tcheck(t, err, "list single")
1351 tcompare(t, l, []Msg{qm1})
1352
1353 // Paginated NextAttmpt asc.
1354 var lastID int64
1355 var last any
1356 l = nil
1357 for {
1358 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
1359 tcheck(t, err, "list paginated")
1360 l = append(l, nl...)
1361 if len(nl) == 0 {
1362 break
1363 }
1364 tcompare(t, len(nl), 1)
1365 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1366 }
1367 tcompare(t, l, qml)
1368
1369 // Paginated NextAttempt desc.
1370 l = nil
1371 lastID = 0
1372 last = ""
1373 for {
1374 nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
1375 tcheck(t, err, "list paginated")
1376 l = append(l, nl...)
1377 if len(nl) == 0 {
1378 break
1379 }
1380 tcompare(t, len(nl), 1)
1381 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1382 }
1383 tcompare(t, l, qmlrev)
1384
1385 // Paginated Queued desc.
1386 l = nil
1387 lastID = 0
1388 last = ""
1389 for {
1390 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
1391 tcheck(t, err, "list paginated")
1392 l = append(l, nl...)
1393 if len(nl) == 0 {
1394 break
1395 }
1396 tcompare(t, len(nl), 1)
1397 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1398 }
1399 tcompare(t, l, ql)
1400
1401 // Paginated Queued asc.
1402 l = nil
1403 lastID = 0
1404 last = ""
1405 for {
1406 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
1407 tcheck(t, err, "list paginated")
1408 l = append(l, nl...)
1409 if len(nl) == 0 {
1410 break
1411 }
1412 tcompare(t, len(nl), 1)
1413 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1414 }
1415 qlrev := slices.Clone(ql)
1416 slices.Reverse(qlrev)
1417 tcompare(t, l, qlrev)
1418
1419 // Retire messages and do similar but more basic tests. The code is similar.
1420 var mrl []MsgRetired
1421 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
1422 for _, m := range qml {
1423 mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
1424 err := tx.Insert(&mr)
1425 tcheck(t, err, "inserting retired message")
1426 mrl = append(mrl, mr)
1427 }
1428 return nil
1429 })
1430 tcheck(t, err, "adding retired messages")
1431
1432 // Paginated LastActivity desc.
1433 var lr []MsgRetired
1434 lastID = 0
1435 last = ""
1436 l = nil
1437 for {
1438 nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
1439 tcheck(t, err, "list paginated")
1440 lr = append(lr, nl...)
1441 if len(nl) == 0 {
1442 break
1443 }
1444 tcompare(t, len(nl), 1)
1445 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
1446 }
1447 mrlrev := slices.Clone(mrl)
1448 slices.Reverse(mrlrev)
1449 tcompare(t, lr, mrlrev)
1450
1451 // Filter by all fields to get a single.
1452 allretiredfilters := RetiredFilter{
1453 Max: 2,
1454 IDs: []int64{mrlrev[0].ID},
1455 Account: "mjl",
1456 From: path.XString(true),
1457 To: path.XString(true),
1458 Submitted: "<1s",
1459 LastActivity: ">1s",
1460 }
1461 lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
1462 tcheck(t, err, "list single")
1463 tcompare(t, lr, []MsgRetired{mrlrev[0]})
1464}
1465
1466// Just a cert that appears valid.
1467func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1468 notAfter := time.Now()
1469 if expired {
1470 notAfter = notAfter.Add(-time.Hour)
1471 } else {
1472 notAfter = notAfter.Add(time.Hour)
1473 }
1474
1475 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1476 template := &x509.Certificate{
1477 SerialNumber: big.NewInt(1), // Required field...
1478 DNSNames: []string{name},
1479 NotBefore: time.Now().Add(-time.Hour),
1480 NotAfter: notAfter,
1481 }
1482 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1483 if err != nil {
1484 t.Fatalf("making certificate: %s", err)
1485 }
1486 cert, err := x509.ParseCertificate(localCertBuf)
1487 if err != nil {
1488 t.Fatalf("parsing generated certificate: %s", err)
1489 }
1490 c := tls.Certificate{
1491 Certificate: [][]byte{localCertBuf},
1492 PrivateKey: privKey,
1493 Leaf: cert,
1494 }
1495 return c
1496}
1497