7 cryptorand "crypto/rand"
25 "github.com/mjl-/adns"
26 "github.com/mjl-/bstore"
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/mtastsdb"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/smtpclient"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/tlsrpt"
36 "github.com/mjl-/mox/tlsrptdb"
37 "github.com/mjl-/mox/webhook"
40var ctxbg = context.Background()
41var pkglog = mlog.New("queue", nil)
43func tcheck(t *testing.T, err error, msg string) {
46 t.Fatalf("%s: %s", msg, err)
50func tcompare(t *testing.T, got, exp any) {
52 if !reflect.DeepEqual(got, exp) {
53 t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
57func setup(t *testing.T) (*store.Account, func()) {
58 // Prepare config so email can be delivered to mjl@mox.example.
59 os.RemoveAll("../testdata/queue/data")
60 log := mlog.New("queue", nil)
62 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
63 mox.MustLoadConfig(true, false)
64 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
66 tcheck(t, err, "queue init")
67 err = mtastsdb.Init(false)
68 tcheck(t, err, "mtastsdb init")
70 tcheck(t, err, "tlsrptdb init")
71 switchStop := store.Switchboard()
73 acc, err := store.OpenAccount(log, "mjl", false)
74 tcheck(t, err, "open account")
75 err = acc.SetPassword(log, "testtest")
76 tcheck(t, err, "set password")
82 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
84 err := mtastsdb.Close()
85 tcheck(t, err, "mtastsdb close")
86 err = tlsrptdb.Close()
87 tcheck(t, err, "tlsrptdb close")
92var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
99func prepareFile(t *testing.T) *os.File {
101 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
102 tcheck(t, err, "create temp message for delivery to queue")
103 _, err = msgFile.Write([]byte(testmsg))
104 tcheck(t, err, "write message file")
108func TestQueue(t *testing.T) {
109 acc, cleanup := setup(t)
112 idfilter := func(msgID int64) Filter {
113 return Filter{IDs: []int64{msgID}}
116 kick := func(expn int, id int64) {
118 n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
119 tcheck(t, err, "kick queue")
121 t.Fatalf("kick changed %d messages, expected %d", n, expn)
125 msgs, err := List(ctxbg, Filter{}, Sort{})
126 tcheck(t, err, "listing messages in queue")
128 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
131 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
133 defer os.Remove(mf.Name())
138 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
139 err = Add(ctxbg, pkglog, "mjl", mf, qm)
140 tcheck(t, err, "add message to queue for delivery")
142 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
143 err = Add(ctxbg, pkglog, "mjl", mf, qm)
144 tcheck(t, err, "add message to queue for delivery")
146 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
147 err = Add(ctxbg, pkglog, "mjl", mf, qm)
148 tcheck(t, err, "add message to queue for delivery")
150 msgs, err = List(ctxbg, Filter{}, Sort{})
151 tcheck(t, err, "listing queue")
153 t.Fatalf("got msgs %v, expected 1", msgs)
157 n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
158 tcheck(t, err, "requiretlsset")
162 if msg.Attempts != 0 {
163 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
165 n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
166 tcheck(t, err, "drop")
168 t.Fatalf("dropped %d, expected 1", n)
170 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
171 t.Fatalf("dropped message not removed from file system")
174 // Fail a message, check the account has a message afterwards, the DSN.
175 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
176 tcheck(t, err, "count messages in account")
178 n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
179 tcheck(t, err, "fail")
181 t.Fatalf("failed %d, expected 1", n)
183 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
184 tcheck(t, err, "count messages in account")
187 // Check filter through various List calls. Other code uses the same filtering function.
188 filter := func(f Filter, expn int) {
190 l, err := List(ctxbg, f, Sort{})
191 tcheck(t, err, "list messages")
192 tcompare(t, len(l), expn)
195 filter(Filter{Account: "mjl"}, 1)
196 filter(Filter{Account: "bogus"}, 0)
197 filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
198 filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
199 filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
200 filter(Filter{From: "mjl@"}, 1)
201 filter(Filter{From: "bogus@"}, 0)
202 filter(Filter{To: "mjl@"}, 1)
203 filter(Filter{To: "bogus@"}, 0)
204 filter(Filter{Hold: &yes}, 0)
206 filter(Filter{Hold: &no}, 1)
207 filter(Filter{Submitted: "<now"}, 1)
208 filter(Filter{Submitted: ">now"}, 0)
209 filter(Filter{NextAttempt: "<1m"}, 1)
210 filter(Filter{NextAttempt: ">1m"}, 0)
213 filter(Filter{Transport: &empty}, 1)
214 filter(Filter{Transport: &bogus}, 0)
216 next := nextWork(ctxbg, pkglog, nil)
218 t.Fatalf("nextWork in %s, should be now", next)
220 busy := map[string]struct{}{"mox.example": {}}
221 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
222 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
224 if nn := launchWork(pkglog, nil, busy); nn != 0 {
225 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
228 mailDomain := dns.Domain{ASCII: "mox.example"}
229 mailHost := dns.Domain{ASCII: "mail.mox.example"}
230 resolver := dns.MockResolver{
231 A: map[string][]string{
232 "mail.mox.example.": {"127.0.0.1"},
233 "submission.example.": {"127.0.0.1"},
235 MX: map[string][]*net.MX{
236 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
237 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
241 // Try a failing delivery attempt.
243 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
245 return nil, fmt.Errorf("failure from test")
248 smtpclient.DialHook = nil
251 n = launchWork(pkglog, resolver, map[string]struct{}{})
254 // Wait until we see the dial and the failed attempt.
255 timer := time.NewTimer(time.Second)
258 case <-deliveryResults:
259 tcompare(t, ndial, 1)
260 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
261 tcheck(t, err, "get")
262 tcompare(t, m.Attempts, 1)
264 t.Fatalf("no delivery within 1s")
268 _, err = OpenMessage(ctxbg, msg.ID+1)
269 if err != bstore.ErrAbsent {
270 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
272 reader, err := OpenMessage(ctxbg, msg.ID)
273 tcheck(t, err, "open message")
275 msgbuf, err := io.ReadAll(reader)
276 tcheck(t, err, "read message")
277 if string(msgbuf) != testmsg {
278 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
281 // Reduce by more than first attempt interval of 7.5 minutes.
282 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
283 tcheck(t, err, "kick")
285 t.Fatalf("kick %d, expected 0", n)
287 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
288 tcheck(t, err, "kick")
290 t.Fatalf("kicked %d, expected 1", n)
293 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
294 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
295 // cyclic dependencies.
296 fmt.Fprintf(server, "220 mail.mox.example\r\n")
297 br := bufio.NewReader(server)
299 readline := func(cmd string) {
300 line, err := br.ReadString('\n')
301 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
302 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
305 writeline := func(s string) {
306 fmt.Fprintf(server, "%s\r\n", s)
310 writeline("250-mail.mox.example")
311 for _, ext := range extensions {
312 writeline("250-" + ext)
314 writeline("250 pipelining")
318 for i := range rcpts {
320 if onercpt && i > 0 {
327 writeline("354 continue")
328 reader := smtp.NewDataReader(br)
329 io.Copy(io.Discard, reader)
335 fakeSMTPServer := func(server net.Conn) {
336 nfakeSMTPServer(server, 1, 1, false, nil)
338 fakeSMTPServer2Rcpts := func(server net.Conn) {
339 nfakeSMTPServer(server, 2, 1, false, nil)
341 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
342 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
344 // Server that returns an error after first recipient. We expect another
345 // transaction to deliver the second message.
346 fakeSMTPServerRcpt1 := func(server net.Conn) {
347 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
348 // cyclic dependencies.
349 fmt.Fprintf(server, "220 mail.mox.example\r\n")
350 br := bufio.NewReader(server)
352 readline := func(cmd string) {
353 line, err := br.ReadString('\n')
354 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
355 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
358 writeline := func(s string) {
359 fmt.Fprintf(server, "%s\r\n", s)
363 writeline("250-mail.mox.example")
364 writeline("250 pipelining")
373 writeline("354 continue")
374 reader := smtp.NewDataReader(br)
375 io.Copy(io.Discard, reader)
383 writeline("354 continue")
384 reader = smtp.NewDataReader(br)
385 io.Copy(io.Discard, reader)
392 moxCert := fakeCert(t, "mail.mox.example", false)
393 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
394 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
396 return func(server net.Conn) {
399 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
400 // cyclic dependencies.
401 fmt.Fprintf(server, "220 mail.mox.example\r\n")
402 br := bufio.NewReader(server)
404 readline := func(cmd string) {
405 line, err := br.ReadString('\n')
406 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
407 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
410 writeline := func(s string) {
411 fmt.Fprintf(server, "%s\r\n", s)
415 writeline("250-mail.mox.example")
416 writeline("250 starttls")
417 if nstarttls == 0 || attempt <= nstarttls {
420 tlsConn := tls.Server(server, tlsConfig)
421 err := tlsConn.Handshake()
426 br = bufio.NewReader(server)
430 writeline("250-mail.mox.example")
431 writeline("250 requiretls")
433 writeline("250 mail.mox.example")
441 writeline("354 continue")
442 reader := smtp.NewDataReader(br)
443 io.Copy(io.Discard, reader)
450 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
451 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
452 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
455 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
456 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
457 // cyclic dependencies.
458 fmt.Fprintf(server, "220 mail.mox.example\r\n")
459 br := bufio.NewReader(server)
460 br.ReadString('\n') // Should be EHLO.
461 fmt.Fprintf(server, "250-localhost\r\n")
462 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
463 br.ReadString('\n') // Should be AUTH PLAIN
464 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
465 br.ReadString('\n') // Should be MAIL FROM.
466 fmt.Fprintf(server, "250 ok\r\n")
468 br.ReadString('\n') // Should be RCPT TO.
469 fmt.Fprintf(server, "250 ok\r\n")
471 br.ReadString('\n') // Should be DATA.
472 fmt.Fprintf(server, "354 continue\r\n")
473 reader := smtp.NewDataReader(br)
474 io.Copy(io.Discard, reader)
475 fmt.Fprintf(server, "250 ok\r\n")
476 br.ReadString('\n') // Should be QUIT.
477 fmt.Fprintf(server, "221 ok\r\n")
479 fakeSubmitServer := func(server net.Conn) {
480 nfakeSubmitServer(server, 1)
482 fakeSubmitServer2Rcpts := func(server net.Conn) {
483 nfakeSubmitServer(server, 2)
486 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
491 for _, conn := range pipes {
496 var connMu sync.Mutex
497 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
499 defer connMu.Unlock()
501 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
502 // client-side to the invocation dial, for the attempted delivery from the queue.
503 server, client := net.Pipe()
504 pipes = append(pipes, server, client)
505 go fakeServer(server)
507 _, wasNetDialer = dialer.(*net.Dialer)
512 smtpclient.DialHook = nil
515 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
516 tcheck(t, err, "get inbox")
518 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
519 tcheck(t, err, "querying messages in inbox")
521 launchWork(pkglog, resolver, map[string]struct{}{})
523 // Wait for all results.
524 timer.Reset(time.Second)
527 case <-deliveryResults:
529 t.Fatalf("no dial within 1s")
533 // Check that queue is now empty.
534 xmsgs, err := List(ctxbg, Filter{}, Sort{})
535 tcheck(t, err, "list queue")
536 tcompare(t, len(xmsgs), 0)
538 // And that we possibly got a DSN delivered.
539 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
540 tcheck(t, err, "querying messages in inbox")
541 if expectDSN && ninbox != inboxCount+1 {
542 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
543 } else if !expectDSN && ninbox != inboxCount {
544 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
549 testDeliver := func(fakeServer func(conn net.Conn)) bool {
551 return testQueue(false, fakeServer, 1)
553 testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
555 return testQueue(false, fakeServer, nresults)
557 testDSN := func(fakeServer func(conn net.Conn)) bool {
559 return testQueue(true, fakeServer, 1)
562 // Test direct delivery.
563 wasNetDialer := testDeliver(fakeSMTPServer)
565 t.Fatalf("expected net.Dialer as dialer")
568 // Single delivery to two recipients at same domain, expecting single connection
569 // and single transaction.
570 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
571 qml := []Msg{qm0, qm0} // Same NextAttempt.
572 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
573 tcheck(t, err, "add messages to queue for delivery")
574 testDeliver(fakeSMTPServer2Rcpts)
576 // Single enqueue to two recipients at different domain, expecting two connections.
577 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
578 otherpath := otheraddr.Path()
581 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
582 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
584 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
585 tcheck(t, err, "add messages to queue for delivery")
586 conns := ConnectionCounter()
587 testDeliverN(fakeSMTPServer, 2)
588 nconns := ConnectionCounter()
589 if nconns != conns+2 {
590 t.Errorf("saw %d connections, expected 2", nconns-conns)
593 // Single enqueue with two recipients at same domain, but with smtp server that has
594 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
595 qml = []Msg{qm0, qm0}
596 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
597 tcheck(t, err, "add messages to queue for delivery")
598 testDeliver(fakeSMTPServerLimitRcpt1)
600 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
601 // 2nd recipient, so we expect a single connection with two transactions.
602 qml = []Msg{qm0, qm0}
603 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
604 tcheck(t, err, "add messages to queue for delivery")
605 testDeliver(fakeSMTPServerRcpt1)
607 // Add a message to be delivered with submit because of its route.
608 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
609 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
610 err = Add(ctxbg, pkglog, "mjl", mf, qm)
611 tcheck(t, err, "add message to queue for delivery")
612 wasNetDialer = testDeliver(fakeSubmitServer)
614 t.Fatalf("expected net.Dialer as dialer")
617 // Two messages for submission.
619 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
620 tcheck(t, err, "add messages to queue for delivery")
621 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
623 t.Fatalf("expected net.Dialer as dialer")
626 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
627 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
628 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
629 tcheck(t, err, "add message to queue for delivery")
630 transportSubmitTLS := "submittls"
631 n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
632 tcheck(t, err, "set transport")
634 t.Fatalf("TransportSet changed %d messages, expected 1", n)
636 // Make fake cert, and make it trusted.
637 cert := fakeCert(t, "submission.example", false)
638 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
639 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
640 tlsConfig := tls.Config{
641 Certificates: []tls.Certificate{cert},
643 wasNetDialer = testDeliver(func(conn net.Conn) {
644 conn = tls.Server(conn, &tlsConfig)
645 fakeSubmitServer(conn)
648 t.Fatalf("expected net.Dialer as dialer")
651 // Various failure reasons.
652 fdNotTrusted := tlsrpt.FailureDetails{
653 ResultType: tlsrpt.ResultCertificateNotTrusted,
654 SendingMTAIP: "", // Missing due to pipe.
655 ReceivingMXHostname: "mail.mox.example",
656 ReceivingMXHelo: "mail.mox.example",
657 ReceivingIP: "", // Missing due to pipe.
658 FailedSessionCount: 1,
659 FailureReasonCode: "",
661 fdTLSAUnusable := tlsrpt.FailureDetails{
662 ResultType: tlsrpt.ResultTLSAInvalid,
663 ReceivingMXHostname: "mail.mox.example",
664 FailedSessionCount: 0,
665 FailureReasonCode: "all-unusable-records+ignored",
667 fdBadProtocol := tlsrpt.FailureDetails{
668 ResultType: tlsrpt.ResultValidationFailure,
669 ReceivingMXHostname: "mail.mox.example",
670 ReceivingMXHelo: "mail.mox.example",
671 FailedSessionCount: 1,
672 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
675 // Add a message to be delivered with socks.
676 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
677 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
678 tcheck(t, err, "add message to queue for delivery")
679 n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
680 tcheck(t, err, "TransportSet")
682 t.Fatalf("TransportSet changed %d messages, expected 1", n)
685 wasNetDialer = testDeliver(fakeSMTPServer)
687 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
690 // Add message to be delivered with opportunistic TLS verification.
692 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
693 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
694 tcheck(t, err, "add message to queue for delivery")
696 testDeliver(fakeSMTPSTARTTLSServer)
697 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
698 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
700 // Test fallback to plain text with TLS handshake fails.
702 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
703 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
704 tcheck(t, err, "add message to queue for delivery")
706 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
707 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
708 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
710 // Add message to be delivered with DANE verification.
712 resolver.AllAuthentic = true
713 resolver.TLSA = map[string][]adns.TLSA{
714 "_25._tcp.mail.mox.example.": {
715 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
718 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
719 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
720 tcheck(t, err, "add message to queue for delivery")
722 testDeliver(fakeSMTPSTARTTLSServer)
723 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
724 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
726 // We should know starttls/requiretls by now.
727 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
728 err = acc.DB.Get(ctxbg, &rdt)
729 tcheck(t, err, "get recipientdomaintls")
730 tcompare(t, rdt.STARTTLS, true)
731 tcompare(t, rdt.RequireTLS, true)
733 // Add message to be delivered with verified TLS and REQUIRETLS.
734 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
735 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
736 tcheck(t, err, "add message to queue for delivery")
738 testDeliver(fakeSMTPSTARTTLSServer)
740 // Check that message is delivered with all unusable DANE records.
742 resolver.TLSA = map[string][]adns.TLSA{
743 "_25._tcp.mail.mox.example.": {
747 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
748 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
749 tcheck(t, err, "add message to queue for delivery")
751 testDeliver(fakeSMTPSTARTTLSServer)
752 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
753 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
755 // Check that message is delivered with insecure TLSA records. They should be
756 // ignored and regular STARTTLS tried.
758 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
759 resolver.TLSA = map[string][]adns.TLSA{
760 "_25._tcp.mail.mox.example.": {
761 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
764 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
765 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
766 tcheck(t, err, "add message to queue for delivery")
768 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
769 resolver.Inauthentic = nil
770 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
771 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
773 // STARTTLS failed, so not known supported.
774 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
775 err = acc.DB.Get(ctxbg, &rdt)
776 tcheck(t, err, "get recipientdomaintls")
777 tcompare(t, rdt.STARTTLS, false)
778 tcompare(t, rdt.RequireTLS, false)
780 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
781 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
782 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
783 tcheck(t, err, "add message to queue for delivery")
785 testDeliver(fakeSMTPSTARTTLSServer)
787 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
788 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
789 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
790 tcheck(t, err, "add message to queue for delivery")
792 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
794 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
795 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
796 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
797 tcheck(t, err, "add message to queue for delivery")
799 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
801 // Restore pre-DANE behaviour.
802 resolver.AllAuthentic = false
805 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
806 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
807 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
808 tcheck(t, err, "add message to queue for delivery")
810 // Based on DNS lookups, there won't be any dialing or SMTP connection.
811 testDSN(func(conn net.Conn) {})
813 // Add another message that we'll fail to deliver entirely.
814 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
815 err = Add(ctxbg, pkglog, "mjl", mf, qm)
816 tcheck(t, err, "add message to queue for delivery")
818 msgs, err = List(ctxbg, Filter{}, Sort{})
819 tcheck(t, err, "list queue")
821 t.Fatalf("queue has %d messages, expected 1", len(msgs))
825 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
826 server, client := net.Pipe()
831 return client, func() {
837 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
838 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
839 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
847 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
851 return nil, fmt.Errorf("connect error from test")
861 smtpclient.DialHook = nil
864 comm := store.RegisterComm(acc)
865 defer comm.Unregister()
867 for i := 1; i < 8; i++ {
869 resolver.AllAuthentic = true
870 resolver.TLSA = map[string][]adns.TLSA{
871 "_25._tcp.mail.mox.example.": {
872 // Non-matching zero CertAssoc, should cause failure.
873 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
877 resolver.AllAuthentic = false
880 go deliver(pkglog, resolver, msg)
882 err = DB.Get(ctxbg, &msg)
883 tcheck(t, err, "get msg")
884 if msg.Attempts != i {
885 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
887 if msg.Attempts == 5 {
888 timer.Reset(time.Second)
889 changes := make(chan struct{}, 1)
892 changes <- struct{}{}
897 t.Fatalf("no dsn in 1s")
902 // Trigger final failure.
903 go deliver(pkglog, resolver, msg)
905 err = DB.Get(ctxbg, &msg)
906 if err != bstore.ErrAbsent {
907 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
910 timer.Reset(time.Second)
911 changes := make(chan struct{}, 1)
914 changes <- struct{}{}
919 t.Fatalf("no dsn in 1s")
922 // We shouldn't have any more work to do.
923 msgs, err = List(ctxbg, Filter{}, Sort{})
924 tcheck(t, err, "list messages at end of test")
925 tcompare(t, len(msgs), 0)
928func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
929 result.Summary.TotalSuccessfulSessionCount += success
930 result.Summary.TotalFailureSessionCount += failure
934func clearTLSResults(t *testing.T) {
935 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
936 tcheck(t, err, "delete tls results")
939func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
941 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
942 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
943 result, err := q.Get()
944 tcheck(t, err, "get tls result")
945 tcompare(t, result.RecipientDomain, expRecipientDomain)
946 tcompare(t, result.IsHost, expIsHost)
948 // Before comparing, compensate for go1.20 vs go1.21 difference.
949 for i, r := range result.Results {
950 for j, fd := range r.FailureDetails {
951 if fd.FailureReasonCode == "tls-remote-alert-70" {
952 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
956 tcompare(t, result.Results, expResults)
959// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
960// retired list if configured, with a proper result, that webhooks are scheduled,
961// and that cleaning up works.
962func TestRetiredHooks(t *testing.T) {
963 _, cleanup := setup(t)
966 addr, err := smtp.ParseAddress("mjl@mox.example")
967 tcheck(t, err, "parse address")
971 defer os.Remove(mf.Name())
974 resolver := dns.MockResolver{
975 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
976 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
979 testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
982 _, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
983 tcheck(t, err, "clearing retired messages")
984 _, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
985 tcheck(t, err, "clearing hooks")
987 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
988 qm.Extra = map[string]string{"a": "123"}
989 err = Add(ctxbg, pkglog, account, mf, qm)
990 tcheck(t, err, "add to queue")
994 // Should be no messages left in queue.
995 msgs, err := List(ctxbg, Filter{}, Sort{})
996 tcheck(t, err, "list messages")
997 tcompare(t, len(msgs), 0)
999 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1000 tcheck(t, err, "list retired messages")
1001 hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
1002 tcheck(t, err, "list hooks")
1003 if expResult == nil {
1004 tcompare(t, len(retireds), 0)
1005 tcompare(t, len(hooks), 0)
1007 tcompare(t, len(retireds), 1)
1009 tcompare(t, len(mr.Results) > 0, true)
1010 lr := mr.LastResult()
1011 lr.Start = time.Time{}
1013 tcompare(t, lr.Error == "", expResult.Error == "")
1014 lr.Error = expResult.Error
1015 tcompare(t, lr, *expResult)
1017 // Compare added webhook.
1018 tcompare(t, len(hooks), 1)
1020 var out webhook.Outgoing
1021 dec := json.NewDecoder(strings.NewReader(h.Payload))
1022 dec.DisallowUnknownFields()
1023 err := dec.Decode(&out)
1024 tcheck(t, err, "unmarshal outgoing webhook payload")
1025 tcompare(t, out.Error == "", expResult.Error == "")
1026 out.WebhookQueued = time.Time{}
1029 if expResult.Secode != "" {
1030 ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
1032 var code int // Only set for errors.
1033 if expResult.Code != 250 {
1034 code = expResult.Code
1036 expOut := webhook.Outgoing{
1037 Event: webhook.OutgoingEvent(expEvent),
1038 Suppressing: expSuppressing,
1041 MessageID: mr.MessageID,
1042 Subject: mr.Subject,
1044 SMTPEnhancedCode: ecode,
1047 tcompare(t, out, expOut)
1050 h.Submitted = time.Time{}
1051 h.NextAttempt = time.Time{}
1052 exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
1053 tcompare(t, h, exph)
1057 makeLaunchAction := func(handler func(conn net.Conn)) func() {
1059 server, client := net.Pipe()
1060 defer server.Close()
1062 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1067 smtpclient.DialHook = nil
1070 // Trigger delivery attempt.
1071 n := launchWork(pkglog, resolver, map[string]struct{}{})
1074 // Wait until delivery has finished.
1075 tm := time.NewTimer(5 * time.Second)
1079 t.Fatalf("delivery didn't happen within 5s")
1080 case <-deliveryResults:
1085 smtpAccept := func(conn net.Conn) {
1086 br := bufio.NewReader(conn)
1087 readline := func(cmd string) {
1088 line, err := br.ReadString('\n')
1089 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1090 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1093 writeline := func(s string) {
1094 fmt.Fprintf(conn, "%s\r\n", s)
1097 writeline("220 mail.mox.example")
1099 writeline("250 mail.mox.example")
1106 writeline("354 continue")
1107 reader := smtp.NewDataReader(br)
1108 io.Copy(io.Discard, reader)
1113 smtpReject := func(code int) func(conn net.Conn) {
1114 return func(conn net.Conn) {
1115 br := bufio.NewReader(conn)
1116 readline := func(cmd string) {
1117 line, err := br.ReadString('\n')
1118 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1119 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1122 writeline := func(s string) {
1123 fmt.Fprintf(conn, "%s\r\n", s)
1126 writeline("220 mail.mox.example")
1128 writeline("250-mail.mox.example")
1129 writeline("250 enhancedstatuscodes")
1132 writeline(fmt.Sprintf("%d 5.1.0 nok", code))
1138 testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
1139 testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{Code: 250, Success: true}, string(webhook.EventDelivered), false)
1140 // 554 is generic, doesn't immediately cause suppression.
1141 testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
1142 testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
1143 // 550 causes immediate suppression, check for it in webhook.
1144 testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
1145 testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
1146 // Try to deliver to suppressed addresses.
1148 n := launchWork(pkglog, resolver, map[string]struct{}{})
1152 testAction("mjl", launch, nil, "", false)
1153 testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
1155 queueFail := func() {
1156 n, err := Fail(ctxbg, pkglog, Filter{})
1157 tcheck(t, err, "cancel delivery with failure dsn")
1160 queueDrop := func() {
1161 n, err := Drop(ctxbg, pkglog, Filter{})
1162 tcheck(t, err, "cancel delivery without failure dsn")
1165 testAction("mjl", queueFail, nil, "", false)
1166 testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
1167 testAction("mjl", queueDrop, nil, "", false)
1168 testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
1170 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1171 tcheck(t, err, "list retired messages")
1172 tcompare(t, len(retireds), 1)
1174 cleanupMsgRetiredSingle(pkglog)
1175 retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1176 tcheck(t, err, "list retired messages")
1177 tcompare(t, len(retireds), 0)
1180// test Start and that it attempts to deliver.
1181func TestQueueStart(t *testing.T) {
1182 // Override dial function. We'll make connecting fail and check the attempt.
1183 resolver := dns.MockResolver{
1184 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
1185 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
1187 dialed := make(chan struct{}, 1)
1188 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1189 dialed <- struct{}{}
1190 return nil, fmt.Errorf("failure from test")
1193 smtpclient.DialHook = nil
1196 _, cleanup := setup(t)
1199 done := make(chan struct{})
1201 mox.ShutdownCancel()
1202 // Wait for message and hooks deliverers and cleaners.
1207 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
1209 Shutdown() // DB was opened already. Start will open it again. Just close it before.
1210 err := Start(resolver, done)
1211 tcheck(t, err, "queue start")
1213 checkDialed := func(need bool) {
1215 d := time.Second / 10
1219 timer := time.NewTimer(d)
1224 t.Fatalf("unexpected dial attempt")
1228 t.Fatalf("expected to see a dial attempt")
1233 // HoldRule to mark mark all messages sent by mjl on hold, including existing
1235 hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
1236 tcheck(t, err, "add hold rule")
1238 // All zero HoldRule holds all deliveries, and marks all on hold.
1239 hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
1240 tcheck(t, err, "add hold rule")
1242 hrl, err := HoldRuleList(ctxbg)
1243 tcheck(t, err, "listing hold rules")
1244 tcompare(t, hrl, []HoldRule{hr0, hr1})
1246 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1247 mf := prepareFile(t)
1248 defer os.Remove(mf.Name())
1250 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1251 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1252 tcheck(t, err, "add message to queue for delivery")
1253 checkDialed(false) // No delivery attempt yet.
1255 n, err := Count(ctxbg)
1256 tcheck(t, err, "count messages in queue")
1259 // Take message off hold.
1260 n, err = HoldSet(ctxbg, Filter{}, false)
1261 tcheck(t, err, "taking message off hold")
1265 // Remove hold rules.
1266 err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
1267 tcheck(t, err, "removing hold rule")
1268 err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
1269 tcheck(t, err, "removing hold rule")
1270 // Check it is gone.
1271 hrl, err = HoldRuleList(ctxbg)
1272 tcheck(t, err, "listing hold rules")
1273 tcompare(t, len(hrl), 0)
1275 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1279 // Set new next attempt, should see another attempt.
1280 n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
1281 tcheck(t, err, "kick queue")
1283 t.Fatalf("kick changed %d messages, expected 1", n)
1287 // Submit another, should be delivered immediately without HoldRule.
1288 path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1290 defer os.Remove(mf.Name())
1292 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1293 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1294 tcheck(t, err, "add message to queue for delivery")
1295 checkDialed(true) // Immediate.
1298func TestListFilterSort(t *testing.T) {
1299 _, cleanup := setup(t)
1302 // insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
1304 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1305 mf := prepareFile(t)
1306 defer os.Remove(mf.Name())
1309 now := time.Now().Round(0)
1310 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
1313 qm1.Queued = now.Add(-time.Second)
1314 qm1.NextAttempt = now.Add(time.Minute)
1315 qml := []Msg{qm, qm, qm, qm, qm, qm1}
1316 err := Add(ctxbg, pkglog, "mjl", mf, qml...)
1317 tcheck(t, err, "add messages to queue")
1318 qm1 = qml[len(qml)-1]
1320 qmlrev := slices.Clone(qml)
1321 slices.Reverse(qmlrev)
1323 // Ascending by nextattempt,id.
1324 l, err := List(ctxbg, Filter{}, Sort{Asc: true})
1325 tcheck(t, err, "list messages")
1328 // Descending by nextattempt,id.
1329 l, err = List(ctxbg, Filter{}, Sort{})
1330 tcheck(t, err, "list messages")
1331 tcompare(t, l, qmlrev)
1333 // Descending by queued,id.
1334 l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
1335 tcheck(t, err, "list messages")
1336 ql := append(slices.Clone(qmlrev[1:]), qml[5])
1339 // Filter by all fields to get a single.
1341 allfilters := Filter{
1343 IDs: []int64{qm1.ID},
1345 From: path.XString(true),
1346 To: path.XString(true),
1351 l, err = List(ctxbg, allfilters, Sort{})
1352 tcheck(t, err, "list single")
1353 tcompare(t, l, []Msg{qm1})
1355 // Paginated NextAttmpt asc.
1360 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
1361 tcheck(t, err, "list paginated")
1362 l = append(l, nl...)
1366 tcompare(t, len(nl), 1)
1367 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1371 // Paginated NextAttempt desc.
1376 nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
1377 tcheck(t, err, "list paginated")
1378 l = append(l, nl...)
1382 tcompare(t, len(nl), 1)
1383 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1385 tcompare(t, l, qmlrev)
1387 // Paginated Queued desc.
1392 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
1393 tcheck(t, err, "list paginated")
1394 l = append(l, nl...)
1398 tcompare(t, len(nl), 1)
1399 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1403 // Paginated Queued asc.
1408 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
1409 tcheck(t, err, "list paginated")
1410 l = append(l, nl...)
1414 tcompare(t, len(nl), 1)
1415 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1417 qlrev := slices.Clone(ql)
1418 slices.Reverse(qlrev)
1419 tcompare(t, l, qlrev)
1421 // Retire messages and do similar but more basic tests. The code is similar.
1422 var mrl []MsgRetired
1423 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
1424 for _, m := range qml {
1425 mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
1426 err := tx.Insert(&mr)
1427 tcheck(t, err, "inserting retired message")
1428 mrl = append(mrl, mr)
1432 tcheck(t, err, "adding retired messages")
1434 // Paginated LastActivity desc.
1439 nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
1440 tcheck(t, err, "list paginated")
1441 lr = append(lr, nl...)
1445 tcompare(t, len(nl), 1)
1446 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
1448 mrlrev := slices.Clone(mrl)
1449 slices.Reverse(mrlrev)
1450 tcompare(t, lr, mrlrev)
1452 // Filter by all fields to get a single.
1453 allretiredfilters := RetiredFilter{
1455 IDs: []int64{mrlrev[0].ID},
1457 From: path.XString(true),
1458 To: path.XString(true),
1460 LastActivity: ">1s",
1462 lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
1463 tcheck(t, err, "list single")
1464 tcompare(t, lr, []MsgRetired{mrlrev[0]})
1467// Just a cert that appears valid.
1468func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1469 notAfter := time.Now()
1471 notAfter = notAfter.Add(-time.Hour)
1473 notAfter = notAfter.Add(time.Hour)
1476 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1477 template := &x509.Certificate{
1478 SerialNumber: big.NewInt(1), // Required field...
1479 DNSNames: []string{name},
1480 NotBefore: time.Now().Add(-time.Hour),
1483 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1485 t.Fatalf("making certificate: %s", err)
1487 cert, err := x509.ParseCertificate(localCertBuf)
1489 t.Fatalf("parsing generated certificate: %s", err)
1491 c := tls.Certificate{
1492 Certificate: [][]byte{localCertBuf},
1493 PrivateKey: privKey,