7 cryptorand "crypto/rand"
25 "github.com/mjl-/adns"
26 "github.com/mjl-/bstore"
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/mtastsdb"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/smtpclient"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/tlsrpt"
36 "github.com/mjl-/mox/tlsrptdb"
37 "github.com/mjl-/mox/webhook"
40var ctxbg = context.Background()
41var pkglog = mlog.New("queue", nil)
43func tcheck(t *testing.T, err error, msg string) {
46 t.Fatalf("%s: %s", msg, err)
50func tcompare(t *testing.T, got, exp any) {
52 if !reflect.DeepEqual(got, exp) {
53 t.Fatalf("got:\n%#v\nexpected:\n%#v", got, exp)
57func setup(t *testing.T) (*store.Account, func()) {
58 // Prepare config so email can be delivered to mjl@mox.example.
59 os.RemoveAll("../testdata/queue/data")
60 log := mlog.New("queue", nil)
62 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
63 mox.MustLoadConfig(true, false)
65 tcheck(t, err, "queue init")
66 err = mtastsdb.Init(false)
67 tcheck(t, err, "mtastsdb init")
69 tcheck(t, err, "tlsrptdb init")
70 acc, err := store.OpenAccount(log, "mjl")
71 tcheck(t, err, "open account")
72 err = acc.SetPassword(log, "testtest")
73 tcheck(t, err, "set password")
74 switchStop := store.Switchboard()
75 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
80 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
82 err := mtastsdb.Close()
83 tcheck(t, err, "mtastsdb close")
84 err = tlsrptdb.Close()
85 tcheck(t, err, "tlsrptdb close")
90var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
97func prepareFile(t *testing.T) *os.File {
99 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
100 tcheck(t, err, "create temp message for delivery to queue")
101 _, err = msgFile.Write([]byte(testmsg))
102 tcheck(t, err, "write message file")
106func TestQueue(t *testing.T) {
107 acc, cleanup := setup(t)
110 idfilter := func(msgID int64) Filter {
111 return Filter{IDs: []int64{msgID}}
114 kick := func(expn int, id int64) {
116 n, err := NextAttemptSet(ctxbg, idfilter(id), time.Now())
117 tcheck(t, err, "kick queue")
119 t.Fatalf("kick changed %d messages, expected %d", n, expn)
123 msgs, err := List(ctxbg, Filter{}, Sort{})
124 tcheck(t, err, "listing messages in queue")
126 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
129 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
131 defer os.Remove(mf.Name())
136 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
137 err = Add(ctxbg, pkglog, "mjl", mf, qm)
138 tcheck(t, err, "add message to queue for delivery")
140 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
141 err = Add(ctxbg, pkglog, "mjl", mf, qm)
142 tcheck(t, err, "add message to queue for delivery")
144 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
145 err = Add(ctxbg, pkglog, "mjl", mf, qm)
146 tcheck(t, err, "add message to queue for delivery")
148 msgs, err = List(ctxbg, Filter{}, Sort{})
149 tcheck(t, err, "listing queue")
151 t.Fatalf("got msgs %v, expected 1", msgs)
155 n, err := RequireTLSSet(ctxbg, Filter{IDs: []int64{msgs[2].ID}}, &yes)
156 tcheck(t, err, "requiretlsset")
160 if msg.Attempts != 0 {
161 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
163 n, err = Drop(ctxbg, pkglog, Filter{IDs: []int64{msgs[1].ID}})
164 tcheck(t, err, "drop")
166 t.Fatalf("dropped %d, expected 1", n)
168 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
169 t.Fatalf("dropped message not removed from file system")
172 // Fail a message, check the account has a message afterwards, the DSN.
173 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
174 tcheck(t, err, "count messages in account")
176 n, err = Fail(ctxbg, pkglog, Filter{IDs: []int64{msgs[2].ID}})
177 tcheck(t, err, "fail")
179 t.Fatalf("failed %d, expected 1", n)
181 n, err = bstore.QueryDB[store.Message](ctxbg, acc.DB).Count()
182 tcheck(t, err, "count messages in account")
185 // Check filter through various List calls. Other code uses the same filtering function.
186 filter := func(f Filter, expn int) {
188 l, err := List(ctxbg, f, Sort{})
189 tcheck(t, err, "list messages")
190 tcompare(t, len(l), expn)
193 filter(Filter{Account: "mjl"}, 1)
194 filter(Filter{Account: "bogus"}, 0)
195 filter(Filter{IDs: []int64{msgs[0].ID}}, 1)
196 filter(Filter{IDs: []int64{msgs[2].ID}}, 0) // Removed.
197 filter(Filter{IDs: []int64{msgs[2].ID + 1}}, 0) // Never existed.
198 filter(Filter{From: "mjl@"}, 1)
199 filter(Filter{From: "bogus@"}, 0)
200 filter(Filter{To: "mjl@"}, 1)
201 filter(Filter{To: "bogus@"}, 0)
202 filter(Filter{Hold: &yes}, 0)
204 filter(Filter{Hold: &no}, 1)
205 filter(Filter{Submitted: "<now"}, 1)
206 filter(Filter{Submitted: ">now"}, 0)
207 filter(Filter{NextAttempt: "<1m"}, 1)
208 filter(Filter{NextAttempt: ">1m"}, 0)
211 filter(Filter{Transport: &empty}, 1)
212 filter(Filter{Transport: &bogus}, 0)
214 next := nextWork(ctxbg, pkglog, nil)
216 t.Fatalf("nextWork in %s, should be now", next)
218 busy := map[string]struct{}{"mox.example": {}}
219 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
220 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
222 if nn := launchWork(pkglog, nil, busy); nn != 0 {
223 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
226 mailDomain := dns.Domain{ASCII: "mox.example"}
227 mailHost := dns.Domain{ASCII: "mail.mox.example"}
228 resolver := dns.MockResolver{
229 A: map[string][]string{
230 "mail.mox.example.": {"127.0.0.1"},
231 "submission.example.": {"127.0.0.1"},
233 MX: map[string][]*net.MX{
234 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
235 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
239 // Try a failing delivery attempt.
241 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
243 return nil, fmt.Errorf("failure from test")
246 smtpclient.DialHook = nil
249 n = launchWork(pkglog, resolver, map[string]struct{}{})
252 // Wait until we see the dial and the failed attempt.
253 timer := time.NewTimer(time.Second)
256 case <-deliveryResults:
257 tcompare(t, ndial, 1)
258 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
259 tcheck(t, err, "get")
260 tcompare(t, m.Attempts, 1)
262 t.Fatalf("no delivery within 1s")
266 _, err = OpenMessage(ctxbg, msg.ID+1)
267 if err != bstore.ErrAbsent {
268 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
270 reader, err := OpenMessage(ctxbg, msg.ID)
271 tcheck(t, err, "open message")
273 msgbuf, err := io.ReadAll(reader)
274 tcheck(t, err, "read message")
275 if string(msgbuf) != testmsg {
276 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
279 // Reduce by more than first attempt interval of 7.5 minutes.
280 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID+1), -10*time.Minute)
281 tcheck(t, err, "kick")
283 t.Fatalf("kick %d, expected 0", n)
285 n, err = NextAttemptAdd(ctxbg, idfilter(msg.ID), -10*time.Minute)
286 tcheck(t, err, "kick")
288 t.Fatalf("kicked %d, expected 1", n)
291 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
292 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
293 // cyclic dependencies.
294 fmt.Fprintf(server, "220 mail.mox.example\r\n")
295 br := bufio.NewReader(server)
297 readline := func(cmd string) {
298 line, err := br.ReadString('\n')
299 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
300 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
303 writeline := func(s string) {
304 fmt.Fprintf(server, "%s\r\n", s)
308 writeline("250-mail.mox.example")
309 for _, ext := range extensions {
310 writeline("250-" + ext)
312 writeline("250 pipelining")
313 for tx := 0; tx < ntx; tx++ {
316 for i := 0; i < rcpts; i++ {
318 if onercpt && i > 0 {
325 writeline("354 continue")
326 reader := smtp.NewDataReader(br)
327 io.Copy(io.Discard, reader)
333 fakeSMTPServer := func(server net.Conn) {
334 nfakeSMTPServer(server, 1, 1, false, nil)
336 fakeSMTPServer2Rcpts := func(server net.Conn) {
337 nfakeSMTPServer(server, 2, 1, false, nil)
339 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
340 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
342 // Server that returns an error after first recipient. We expect another
343 // transaction to deliver the second message.
344 fakeSMTPServerRcpt1 := func(server net.Conn) {
345 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
346 // cyclic dependencies.
347 fmt.Fprintf(server, "220 mail.mox.example\r\n")
348 br := bufio.NewReader(server)
350 readline := func(cmd string) {
351 line, err := br.ReadString('\n')
352 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
353 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
356 writeline := func(s string) {
357 fmt.Fprintf(server, "%s\r\n", s)
361 writeline("250-mail.mox.example")
362 writeline("250 pipelining")
371 writeline("354 continue")
372 reader := smtp.NewDataReader(br)
373 io.Copy(io.Discard, reader)
381 writeline("354 continue")
382 reader = smtp.NewDataReader(br)
383 io.Copy(io.Discard, reader)
390 moxCert := fakeCert(t, "mail.mox.example", false)
391 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
392 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
394 return func(server net.Conn) {
397 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
398 // cyclic dependencies.
399 fmt.Fprintf(server, "220 mail.mox.example\r\n")
400 br := bufio.NewReader(server)
402 readline := func(cmd string) {
403 line, err := br.ReadString('\n')
404 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
405 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
408 writeline := func(s string) {
409 fmt.Fprintf(server, "%s\r\n", s)
413 writeline("250-mail.mox.example")
414 writeline("250 starttls")
415 if nstarttls == 0 || attempt <= nstarttls {
418 tlsConn := tls.Server(server, tlsConfig)
419 err := tlsConn.Handshake()
424 br = bufio.NewReader(server)
428 writeline("250-mail.mox.example")
429 writeline("250 requiretls")
431 writeline("250 mail.mox.example")
439 writeline("354 continue")
440 reader := smtp.NewDataReader(br)
441 io.Copy(io.Discard, reader)
448 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
449 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
450 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
453 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
454 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
455 // cyclic dependencies.
456 fmt.Fprintf(server, "220 mail.mox.example\r\n")
457 br := bufio.NewReader(server)
458 br.ReadString('\n') // Should be EHLO.
459 fmt.Fprintf(server, "250-localhost\r\n")
460 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
461 br.ReadString('\n') // Should be AUTH PLAIN
462 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
463 br.ReadString('\n') // Should be MAIL FROM.
464 fmt.Fprintf(server, "250 ok\r\n")
465 for i := 0; i < nrcpt; i++ {
466 br.ReadString('\n') // Should be RCPT TO.
467 fmt.Fprintf(server, "250 ok\r\n")
469 br.ReadString('\n') // Should be DATA.
470 fmt.Fprintf(server, "354 continue\r\n")
471 reader := smtp.NewDataReader(br)
472 io.Copy(io.Discard, reader)
473 fmt.Fprintf(server, "250 ok\r\n")
474 br.ReadString('\n') // Should be QUIT.
475 fmt.Fprintf(server, "221 ok\r\n")
477 fakeSubmitServer := func(server net.Conn) {
478 nfakeSubmitServer(server, 1)
480 fakeSubmitServer2Rcpts := func(server net.Conn) {
481 nfakeSubmitServer(server, 2)
484 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn), nresults int) (wasNetDialer bool) {
489 for _, conn := range pipes {
494 var connMu sync.Mutex
495 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
497 defer connMu.Unlock()
499 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
500 // client-side to the invocation dial, for the attempted delivery from the queue.
501 server, client := net.Pipe()
502 pipes = append(pipes, server, client)
503 go fakeServer(server)
505 _, wasNetDialer = dialer.(*net.Dialer)
510 smtpclient.DialHook = nil
513 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
514 tcheck(t, err, "get inbox")
516 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
517 tcheck(t, err, "querying messages in inbox")
519 launchWork(pkglog, resolver, map[string]struct{}{})
521 // Wait for all results.
522 timer.Reset(time.Second)
523 for i := 0; i < nresults; i++ {
525 case <-deliveryResults:
527 t.Fatalf("no dial within 1s")
531 // Check that queue is now empty.
532 xmsgs, err := List(ctxbg, Filter{}, Sort{})
533 tcheck(t, err, "list queue")
534 tcompare(t, len(xmsgs), 0)
536 // And that we possibly got a DSN delivered.
537 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
538 tcheck(t, err, "querying messages in inbox")
539 if expectDSN && ninbox != inboxCount+1 {
540 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
541 } else if !expectDSN && ninbox != inboxCount {
542 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
547 testDeliver := func(fakeServer func(conn net.Conn)) bool {
549 return testQueue(false, fakeServer, 1)
551 testDeliverN := func(fakeServer func(conn net.Conn), nresults int) bool {
553 return testQueue(false, fakeServer, nresults)
555 testDSN := func(fakeServer func(conn net.Conn)) bool {
557 return testQueue(true, fakeServer, 1)
560 // Test direct delivery.
561 wasNetDialer := testDeliver(fakeSMTPServer)
563 t.Fatalf("expected net.Dialer as dialer")
566 // Single delivery to two recipients at same domain, expecting single connection
567 // and single transaction.
568 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
569 qml := []Msg{qm0, qm0} // Same NextAttempt.
570 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
571 tcheck(t, err, "add messages to queue for delivery")
572 testDeliver(fakeSMTPServer2Rcpts)
574 // Single enqueue to two recipients at different domain, expecting two connections.
575 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
576 otherpath := otheraddr.Path()
579 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
580 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0, "test"),
582 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
583 tcheck(t, err, "add messages to queue for delivery")
584 conns := ConnectionCounter()
585 testDeliverN(fakeSMTPServer, 2)
586 nconns := ConnectionCounter()
587 if nconns != conns+2 {
588 t.Errorf("saw %d connections, expected 2", nconns-conns)
591 // Single enqueue with two recipients at same domain, but with smtp server that has
592 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
593 qml = []Msg{qm0, qm0}
594 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
595 tcheck(t, err, "add messages to queue for delivery")
596 testDeliver(fakeSMTPServerLimitRcpt1)
598 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
599 // 2nd recipient, so we expect a single connection with two transactions.
600 qml = []Msg{qm0, qm0}
601 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
602 tcheck(t, err, "add messages to queue for delivery")
603 testDeliver(fakeSMTPServerRcpt1)
605 // Add a message to be delivered with submit because of its route.
606 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
607 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
608 err = Add(ctxbg, pkglog, "mjl", mf, qm)
609 tcheck(t, err, "add message to queue for delivery")
610 wasNetDialer = testDeliver(fakeSubmitServer)
612 t.Fatalf("expected net.Dialer as dialer")
615 // Two messages for submission.
617 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
618 tcheck(t, err, "add messages to queue for delivery")
619 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
621 t.Fatalf("expected net.Dialer as dialer")
624 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
625 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")}
626 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
627 tcheck(t, err, "add message to queue for delivery")
628 transportSubmitTLS := "submittls"
629 n, err = TransportSet(ctxbg, Filter{IDs: []int64{qml[0].ID}}, transportSubmitTLS)
630 tcheck(t, err, "set transport")
632 t.Fatalf("TransportSet changed %d messages, expected 1", n)
634 // Make fake cert, and make it trusted.
635 cert := fakeCert(t, "submission.example", false)
636 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
637 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
638 tlsConfig := tls.Config{
639 Certificates: []tls.Certificate{cert},
641 wasNetDialer = testDeliver(func(conn net.Conn) {
642 conn = tls.Server(conn, &tlsConfig)
643 fakeSubmitServer(conn)
646 t.Fatalf("expected net.Dialer as dialer")
649 // Various failure reasons.
650 fdNotTrusted := tlsrpt.FailureDetails{
651 ResultType: tlsrpt.ResultCertificateNotTrusted,
652 SendingMTAIP: "", // Missing due to pipe.
653 ReceivingMXHostname: "mail.mox.example",
654 ReceivingMXHelo: "mail.mox.example",
655 ReceivingIP: "", // Missing due to pipe.
656 FailedSessionCount: 1,
657 FailureReasonCode: "",
659 fdTLSAUnusable := tlsrpt.FailureDetails{
660 ResultType: tlsrpt.ResultTLSAInvalid,
661 ReceivingMXHostname: "mail.mox.example",
662 FailedSessionCount: 0,
663 FailureReasonCode: "all-unusable-records+ignored",
665 fdBadProtocol := tlsrpt.FailureDetails{
666 ResultType: tlsrpt.ResultValidationFailure,
667 ReceivingMXHostname: "mail.mox.example",
668 ReceivingMXHelo: "mail.mox.example",
669 FailedSessionCount: 1,
670 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
673 // Add a message to be delivered with socks.
674 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now(), "test")}
675 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
676 tcheck(t, err, "add message to queue for delivery")
677 n, err = TransportSet(ctxbg, idfilter(qml[0].ID), "socks")
678 tcheck(t, err, "TransportSet")
680 t.Fatalf("TransportSet changed %d messages, expected 1", n)
683 wasNetDialer = testDeliver(fakeSMTPServer)
685 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
688 // Add message to be delivered with opportunistic TLS verification.
690 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now(), "test")}
691 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
692 tcheck(t, err, "add message to queue for delivery")
694 testDeliver(fakeSMTPSTARTTLSServer)
695 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
696 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
698 // Test fallback to plain text with TLS handshake fails.
700 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now(), "test")}
701 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
702 tcheck(t, err, "add message to queue for delivery")
704 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
705 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
706 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
708 // Add message to be delivered with DANE verification.
710 resolver.AllAuthentic = true
711 resolver.TLSA = map[string][]adns.TLSA{
712 "_25._tcp.mail.mox.example.": {
713 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
716 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now(), "test")}
717 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
718 tcheck(t, err, "add message to queue for delivery")
720 testDeliver(fakeSMTPSTARTTLSServer)
721 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
722 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
724 // We should know starttls/requiretls by now.
725 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
726 err = acc.DB.Get(ctxbg, &rdt)
727 tcheck(t, err, "get recipientdomaintls")
728 tcompare(t, rdt.STARTTLS, true)
729 tcompare(t, rdt.RequireTLS, true)
731 // Add message to be delivered with verified TLS and REQUIRETLS.
732 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now(), "test")}
733 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
734 tcheck(t, err, "add message to queue for delivery")
736 testDeliver(fakeSMTPSTARTTLSServer)
738 // Check that message is delivered with all unusable DANE records.
740 resolver.TLSA = map[string][]adns.TLSA{
741 "_25._tcp.mail.mox.example.": {
745 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now(), "test")}
746 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
747 tcheck(t, err, "add message to queue for delivery")
749 testDeliver(fakeSMTPSTARTTLSServer)
750 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
751 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
753 // Check that message is delivered with insecure TLSA records. They should be
754 // ignored and regular STARTTLS tried.
756 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
757 resolver.TLSA = map[string][]adns.TLSA{
758 "_25._tcp.mail.mox.example.": {
759 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
762 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now(), "test")}
763 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
764 tcheck(t, err, "add message to queue for delivery")
766 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
767 resolver.Inauthentic = nil
768 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
769 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
771 // STARTTLS failed, so not known supported.
772 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
773 err = acc.DB.Get(ctxbg, &rdt)
774 tcheck(t, err, "get recipientdomaintls")
775 tcompare(t, rdt.STARTTLS, false)
776 tcompare(t, rdt.RequireTLS, false)
778 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
779 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now(), "test")}
780 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
781 tcheck(t, err, "add message to queue for delivery")
783 testDeliver(fakeSMTPSTARTTLSServer)
785 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
786 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now(), "test")}
787 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
788 tcheck(t, err, "add message to queue for delivery")
790 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
792 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
793 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now(), "test")}
794 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
795 tcheck(t, err, "add message to queue for delivery")
797 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
799 // Restore pre-DANE behaviour.
800 resolver.AllAuthentic = false
803 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
804 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now(), "test")}
805 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
806 tcheck(t, err, "add message to queue for delivery")
808 // Based on DNS lookups, there won't be any dialing or SMTP connection.
809 testDSN(func(conn net.Conn) {})
811 // Add another message that we'll fail to deliver entirely.
812 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
813 err = Add(ctxbg, pkglog, "mjl", mf, qm)
814 tcheck(t, err, "add message to queue for delivery")
816 msgs, err = List(ctxbg, Filter{}, Sort{})
817 tcheck(t, err, "list queue")
819 t.Fatalf("queue has %d messages, expected 1", len(msgs))
823 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
824 server, client := net.Pipe()
829 return client, func() {
835 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
836 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
837 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
845 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
849 return nil, fmt.Errorf("connect error from test")
859 smtpclient.DialHook = nil
862 comm := store.RegisterComm(acc)
863 defer comm.Unregister()
865 for i := 1; i < 8; i++ {
867 resolver.AllAuthentic = true
868 resolver.TLSA = map[string][]adns.TLSA{
869 "_25._tcp.mail.mox.example.": {
870 // Non-matching zero CertAssoc, should cause failure.
871 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
875 resolver.AllAuthentic = false
878 go deliver(pkglog, resolver, msg)
880 err = DB.Get(ctxbg, &msg)
881 tcheck(t, err, "get msg")
882 if msg.Attempts != i {
883 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
885 if msg.Attempts == 5 {
886 timer.Reset(time.Second)
887 changes := make(chan struct{}, 1)
890 changes <- struct{}{}
895 t.Fatalf("no dsn in 1s")
900 // Trigger final failure.
901 go deliver(pkglog, resolver, msg)
903 err = DB.Get(ctxbg, &msg)
904 if err != bstore.ErrAbsent {
905 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
908 timer.Reset(time.Second)
909 changes := make(chan struct{}, 1)
912 changes <- struct{}{}
917 t.Fatalf("no dsn in 1s")
920 // We shouldn't have any more work to do.
921 msgs, err = List(ctxbg, Filter{}, Sort{})
922 tcheck(t, err, "list messages at end of test")
923 tcompare(t, len(msgs), 0)
926func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
927 result.Summary.TotalSuccessfulSessionCount += success
928 result.Summary.TotalFailureSessionCount += failure
932func clearTLSResults(t *testing.T) {
933 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
934 tcheck(t, err, "delete tls results")
937func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
939 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
940 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
941 result, err := q.Get()
942 tcheck(t, err, "get tls result")
943 tcompare(t, result.RecipientDomain, expRecipientDomain)
944 tcompare(t, result.IsHost, expIsHost)
946 // Before comparing, compensate for go1.20 vs go1.21 difference.
947 for i, r := range result.Results {
948 for j, fd := range r.FailureDetails {
949 if fd.FailureReasonCode == "tls-remote-alert-70" {
950 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
954 tcompare(t, result.Results, expResults)
957// Test delivered/permfailed/suppressed/canceled/dropped messages are stored in the
958// retired list if configured, with a proper result, that webhooks are scheduled,
959// and that cleaning up works.
960func TestRetiredHooks(t *testing.T) {
961 _, cleanup := setup(t)
964 addr, err := smtp.ParseAddress("mjl@mox.example")
965 tcheck(t, err, "parse address")
969 defer os.Remove(mf.Name())
972 resolver := dns.MockResolver{
973 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
974 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
977 testAction := func(account string, action func(), expResult *MsgResult, expEvent string, expSuppressing bool) {
980 _, err := bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
981 tcheck(t, err, "clearing retired messages")
982 _, err = bstore.QueryDB[Hook](ctxbg, DB).Delete()
983 tcheck(t, err, "clearing hooks")
985 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
986 qm.Extra = map[string]string{"a": "123"}
987 err = Add(ctxbg, pkglog, account, mf, qm)
988 tcheck(t, err, "add to queue")
992 // Should be no messages left in queue.
993 msgs, err := List(ctxbg, Filter{}, Sort{})
994 tcheck(t, err, "list messages")
995 tcompare(t, len(msgs), 0)
997 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
998 tcheck(t, err, "list retired messages")
999 hooks, err := HookList(ctxbg, HookFilter{}, HookSort{})
1000 tcheck(t, err, "list hooks")
1001 if expResult == nil {
1002 tcompare(t, len(retireds), 0)
1003 tcompare(t, len(hooks), 0)
1005 tcompare(t, len(retireds), 1)
1007 tcompare(t, len(mr.Results) > 0, true)
1008 lr := mr.LastResult()
1009 lr.Start = time.Time{}
1011 tcompare(t, lr.Error == "", expResult.Error == "")
1012 lr.Error = expResult.Error
1013 tcompare(t, lr, *expResult)
1015 // Compare added webhook.
1016 tcompare(t, len(hooks), 1)
1018 var out webhook.Outgoing
1019 dec := json.NewDecoder(strings.NewReader(h.Payload))
1020 dec.DisallowUnknownFields()
1021 err := dec.Decode(&out)
1022 tcheck(t, err, "unmarshal outgoing webhook payload")
1023 tcompare(t, out.Error == "", expResult.Error == "")
1024 out.WebhookQueued = time.Time{}
1027 if expResult.Secode != "" {
1028 ecode = fmt.Sprintf("%d.%s", expResult.Code/100, expResult.Secode)
1030 var code int // Only set for errors.
1031 if expResult.Code != 250 {
1032 code = expResult.Code
1034 expOut := webhook.Outgoing{
1035 Event: webhook.OutgoingEvent(expEvent),
1036 Suppressing: expSuppressing,
1039 MessageID: mr.MessageID,
1040 Subject: mr.Subject,
1042 SMTPEnhancedCode: ecode,
1045 tcompare(t, out, expOut)
1048 h.Submitted = time.Time{}
1049 h.NextAttempt = time.Time{}
1050 exph := Hook{0, mr.ID, "", mr.MessageID, mr.Subject, mr.Extra, mr.SenderAccount, "http://localhost:1234/outgoing", "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", false, expEvent, "", time.Time{}, 0, time.Time{}, nil}
1051 tcompare(t, h, exph)
1055 makeLaunchAction := func(handler func(conn net.Conn)) func() {
1057 server, client := net.Pipe()
1058 defer server.Close()
1060 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1065 smtpclient.DialHook = nil
1068 // Trigger delivery attempt.
1069 n := launchWork(pkglog, resolver, map[string]struct{}{})
1072 // Wait until delivery has finished.
1073 tm := time.NewTimer(5 * time.Second)
1077 t.Fatalf("delivery didn't happen within 5s")
1078 case <-deliveryResults:
1083 smtpAccept := func(conn net.Conn) {
1084 br := bufio.NewReader(conn)
1085 readline := func(cmd string) {
1086 line, err := br.ReadString('\n')
1087 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1088 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1091 writeline := func(s string) {
1092 fmt.Fprintf(conn, "%s\r\n", s)
1095 writeline("220 mail.mox.example")
1097 writeline("250 mail.mox.example")
1104 writeline("354 continue")
1105 reader := smtp.NewDataReader(br)
1106 io.Copy(io.Discard, reader)
1111 smtpReject := func(code int) func(conn net.Conn) {
1112 return func(conn net.Conn) {
1113 br := bufio.NewReader(conn)
1114 readline := func(cmd string) {
1115 line, err := br.ReadString('\n')
1116 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
1117 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
1120 writeline := func(s string) {
1121 fmt.Fprintf(conn, "%s\r\n", s)
1124 writeline("220 mail.mox.example")
1126 writeline("250-mail.mox.example")
1127 writeline("250 enhancedstatuscodes")
1130 writeline(fmt.Sprintf("%d 5.1.0 nok", code))
1136 testAction("mjl", makeLaunchAction(smtpAccept), nil, "", false)
1137 testAction("retired", makeLaunchAction(smtpAccept), &MsgResult{Code: 250, Success: true}, string(webhook.EventDelivered), false)
1138 // 554 is generic, doesn't immediately cause suppression.
1139 testAction("mjl", makeLaunchAction(smtpReject(554)), nil, "", false)
1140 testAction("retired", makeLaunchAction(smtpReject(554)), &MsgResult{Code: 554, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), false)
1141 // 550 causes immediate suppression, check for it in webhook.
1142 testAction("mjl", makeLaunchAction(smtpReject(550)), nil, "", true)
1143 testAction("retired", makeLaunchAction(smtpReject(550)), &MsgResult{Code: 550, Secode: "1.0", Error: "nonempty"}, string(webhook.EventFailed), true)
1144 // Try to deliver to suppressed addresses.
1146 n := launchWork(pkglog, resolver, map[string]struct{}{})
1150 testAction("mjl", launch, nil, "", false)
1151 testAction("retired", launch, &MsgResult{Error: "nonempty"}, string(webhook.EventSuppressed), false)
1153 queueFail := func() {
1154 n, err := Fail(ctxbg, pkglog, Filter{})
1155 tcheck(t, err, "cancel delivery with failure dsn")
1158 queueDrop := func() {
1159 n, err := Drop(ctxbg, pkglog, Filter{})
1160 tcheck(t, err, "cancel delivery without failure dsn")
1163 testAction("mjl", queueFail, nil, "", false)
1164 testAction("retired", queueFail, &MsgResult{Error: "nonempty"}, string(webhook.EventFailed), false)
1165 testAction("mjl", queueDrop, nil, "", false)
1166 testAction("retired", queueDrop, &MsgResult{Error: "nonempty"}, string(webhook.EventCanceled), false)
1168 retireds, err := RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1169 tcheck(t, err, "list retired messages")
1170 tcompare(t, len(retireds), 1)
1172 cleanupMsgRetiredSingle(pkglog)
1173 retireds, err = RetiredList(ctxbg, RetiredFilter{}, RetiredSort{})
1174 tcheck(t, err, "list retired messages")
1175 tcompare(t, len(retireds), 0)
1178// test Start and that it attempts to deliver.
1179func TestQueueStart(t *testing.T) {
1180 // Override dial function. We'll make connecting fail and check the attempt.
1181 resolver := dns.MockResolver{
1182 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
1183 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
1185 dialed := make(chan struct{}, 1)
1186 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
1187 dialed <- struct{}{}
1188 return nil, fmt.Errorf("failure from test")
1191 smtpclient.DialHook = nil
1194 _, cleanup := setup(t)
1197 done := make(chan struct{})
1199 mox.ShutdownCancel()
1200 // Wait for message and hooks deliverers and cleaners.
1205 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
1207 Shutdown() // DB was opened already. Start will open it again. Just close it before.
1208 err := Start(resolver, done)
1209 tcheck(t, err, "queue start")
1211 checkDialed := func(need bool) {
1213 d := time.Second / 10
1217 timer := time.NewTimer(d)
1222 t.Fatalf("unexpected dial attempt")
1226 t.Fatalf("expected to see a dial attempt")
1231 // HoldRule to mark mark all messages sent by mjl on hold, including existing
1233 hr0, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{Account: "mjl"})
1234 tcheck(t, err, "add hold rule")
1236 // All zero HoldRule holds all deliveries, and marks all on hold.
1237 hr1, err := HoldRuleAdd(ctxbg, pkglog, HoldRule{})
1238 tcheck(t, err, "add hold rule")
1240 hrl, err := HoldRuleList(ctxbg)
1241 tcheck(t, err, "listing hold rules")
1242 tcompare(t, hrl, []HoldRule{hr0, hr1})
1244 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1245 mf := prepareFile(t)
1246 defer os.Remove(mf.Name())
1248 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1249 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1250 tcheck(t, err, "add message to queue for delivery")
1251 checkDialed(false) // No delivery attempt yet.
1253 n, err := Count(ctxbg)
1254 tcheck(t, err, "count messages in queue")
1257 // Take message off hold.
1258 n, err = HoldSet(ctxbg, Filter{}, false)
1259 tcheck(t, err, "taking message off hold")
1263 // Remove hold rules.
1264 err = HoldRuleRemove(ctxbg, pkglog, hr1.ID)
1265 tcheck(t, err, "removing hold rule")
1266 err = HoldRuleRemove(ctxbg, pkglog, hr0.ID)
1267 tcheck(t, err, "removing hold rule")
1268 // Check it is gone.
1269 hrl, err = HoldRuleList(ctxbg)
1270 tcheck(t, err, "listing hold rules")
1271 tcompare(t, len(hrl), 0)
1273 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1277 // Set new next attempt, should see another attempt.
1278 n, err = NextAttemptSet(ctxbg, Filter{From: "@mox.example"}, time.Now())
1279 tcheck(t, err, "kick queue")
1281 t.Fatalf("kick changed %d messages, expected 1", n)
1285 // Submit another, should be delivered immediately without HoldRule.
1286 path = smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1288 defer os.Remove(mf.Name())
1290 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now(), "test")
1291 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1292 tcheck(t, err, "add message to queue for delivery")
1293 checkDialed(true) // Immediate.
1296func TestListFilterSort(t *testing.T) {
1297 _, cleanup := setup(t)
1300 // insert Msgs. insert RetiredMsgs based on that. call list with filters and sort. filter to select a single. filter to paginate one by one, and in reverse.
1302 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1303 mf := prepareFile(t)
1304 defer os.Remove(mf.Name())
1307 now := time.Now().Round(0)
1308 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, now, "test")
1311 qm1.Queued = now.Add(-time.Second)
1312 qm1.NextAttempt = now.Add(time.Minute)
1313 qml := []Msg{qm, qm, qm, qm, qm, qm1}
1314 err := Add(ctxbg, pkglog, "mjl", mf, qml...)
1315 tcheck(t, err, "add messages to queue")
1316 qm1 = qml[len(qml)-1]
1318 qmlrev := slices.Clone(qml)
1319 slices.Reverse(qmlrev)
1321 // Ascending by nextattempt,id.
1322 l, err := List(ctxbg, Filter{}, Sort{Asc: true})
1323 tcheck(t, err, "list messages")
1326 // Descending by nextattempt,id.
1327 l, err = List(ctxbg, Filter{}, Sort{})
1328 tcheck(t, err, "list messages")
1329 tcompare(t, l, qmlrev)
1331 // Descending by queued,id.
1332 l, err = List(ctxbg, Filter{}, Sort{Field: "Queued"})
1333 tcheck(t, err, "list messages")
1334 ql := append(append([]Msg{}, qmlrev[1:]...), qml[5])
1337 // Filter by all fields to get a single.
1339 allfilters := Filter{
1341 IDs: []int64{qm1.ID},
1343 From: path.XString(true),
1344 To: path.XString(true),
1349 l, err = List(ctxbg, allfilters, Sort{})
1350 tcheck(t, err, "list single")
1351 tcompare(t, l, []Msg{qm1})
1353 // Paginated NextAttmpt asc.
1358 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Asc: true, LastID: lastID, Last: last})
1359 tcheck(t, err, "list paginated")
1360 l = append(l, nl...)
1364 tcompare(t, len(nl), 1)
1365 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1369 // Paginated NextAttempt desc.
1374 nl, err := List(ctxbg, Filter{Max: 1}, Sort{LastID: lastID, Last: last})
1375 tcheck(t, err, "list paginated")
1376 l = append(l, nl...)
1380 tcompare(t, len(nl), 1)
1381 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
1383 tcompare(t, l, qmlrev)
1385 // Paginated Queued desc.
1390 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", LastID: lastID, Last: last})
1391 tcheck(t, err, "list paginated")
1392 l = append(l, nl...)
1396 tcompare(t, len(nl), 1)
1397 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1401 // Paginated Queued asc.
1406 nl, err := List(ctxbg, Filter{Max: 1}, Sort{Field: "Queued", Asc: true, LastID: lastID, Last: last})
1407 tcheck(t, err, "list paginated")
1408 l = append(l, nl...)
1412 tcompare(t, len(nl), 1)
1413 lastID, last = nl[0].ID, nl[0].Queued.Format(time.RFC3339Nano)
1415 qlrev := slices.Clone(ql)
1416 slices.Reverse(qlrev)
1417 tcompare(t, l, qlrev)
1419 // Retire messages and do similar but more basic tests. The code is similar.
1420 var mrl []MsgRetired
1421 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
1422 for _, m := range qml {
1423 mr := m.Retired(false, m.NextAttempt, time.Now().Add(time.Minute).Round(0))
1424 err := tx.Insert(&mr)
1425 tcheck(t, err, "inserting retired message")
1426 mrl = append(mrl, mr)
1430 tcheck(t, err, "adding retired messages")
1432 // Paginated LastActivity desc.
1438 nl, err := RetiredList(ctxbg, RetiredFilter{Max: 1}, RetiredSort{LastID: lastID, Last: last})
1439 tcheck(t, err, "list paginated")
1440 lr = append(lr, nl...)
1444 tcompare(t, len(nl), 1)
1445 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
1447 mrlrev := slices.Clone(mrl)
1448 slices.Reverse(mrlrev)
1449 tcompare(t, lr, mrlrev)
1451 // Filter by all fields to get a single.
1452 allretiredfilters := RetiredFilter{
1454 IDs: []int64{mrlrev[0].ID},
1456 From: path.XString(true),
1457 To: path.XString(true),
1459 LastActivity: ">1s",
1461 lr, err = RetiredList(ctxbg, allretiredfilters, RetiredSort{})
1462 tcheck(t, err, "list single")
1463 tcompare(t, lr, []MsgRetired{mrlrev[0]})
1466// Just a cert that appears valid.
1467func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1468 notAfter := time.Now()
1470 notAfter = notAfter.Add(-time.Hour)
1472 notAfter = notAfter.Add(time.Hour)
1475 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1476 template := &x509.Certificate{
1477 SerialNumber: big.NewInt(1), // Required field...
1478 DNSNames: []string{name},
1479 NotBefore: time.Now().Add(-time.Hour),
1482 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1484 t.Fatalf("making certificate: %s", err)
1486 cert, err := x509.ParseCertificate(localCertBuf)
1488 t.Fatalf("parsing generated certificate: %s", err)
1490 c := tls.Certificate{
1491 Certificate: [][]byte{localCertBuf},
1492 PrivateKey: privKey,