1package queue
2
3import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "net/http/httptest"
9 "slices"
10 "strings"
11 "testing"
12 "time"
13
14 "github.com/mjl-/bstore"
15
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
21)
22
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
26 defer cleanup()
27
28 accret, err := store.OpenAccount(pkglog, "retired")
29 tcheck(t, err, "open account for retired")
30 defer func() {
31 accret.Close()
32 accret.CheckClosed()
33 }()
34
35 testIncoming := func(a *store.Account, expIn bool) {
36 t.Helper()
37
38 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
39 tcheck(t, err, "clean up hooks")
40
41 mr := bytes.NewReader([]byte(testmsg))
42 now := time.Now().Round(0)
43 m := store.Message{
44 ID: 123,
45 RemoteIP: "::1",
46 MailFrom: "sender@remote.example",
47 MailFromLocalpart: "sender",
48 MailFromDomain: "remote.example",
49 RcptToLocalpart: "rcpt",
50 RcptToDomain: "mox.example",
51 MsgFromLocalpart: "mjl",
52 MsgFromDomain: "mox.example",
53 MsgFromOrgDomain: "mox.example",
54 EHLOValidated: true,
55 MailFromValidated: true,
56 MsgFromValidated: true,
57 EHLOValidation: store.ValidationPass,
58 MailFromValidation: store.ValidationPass,
59 MsgFromValidation: store.ValidationDMARC,
60 DKIMDomains: []string{"remote.example"},
61 Received: now,
62 Size: int64(len(testmsg)),
63 }
64 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
65 tcheck(t, err, "parsing message")
66
67 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
68 tcheck(t, err, "pass incoming message")
69
70 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
71 tcheck(t, err, "list hooks")
72 if !expIn {
73 tcompare(t, len(hl), 0)
74 return
75 }
76 tcompare(t, len(hl), 1)
77 h := hl[0]
78 tcompare(t, h.IsIncoming, true)
79 var in webhook.Incoming
80 dec := json.NewDecoder(strings.NewReader(h.Payload))
81 err = dec.Decode(&in)
82 tcheck(t, err, "decode incoming webhook")
83 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
84
85 expIncoming := webhook.Incoming{
86 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
87 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
88 CC: []webhook.NameAddress{},
89 BCC: []webhook.NameAddress{},
90 ReplyTo: []webhook.NameAddress{},
91 References: []string{},
92 Subject: "test",
93 Text: "test email\n",
94
95 Structure: webhook.PartStructure(&part),
96 Meta: webhook.IncomingMeta{
97 MsgID: m.ID,
98 MailFrom: m.MailFrom,
99 MailFromValidated: m.MailFromValidated,
100 MsgFromValidated: m.MsgFromValidated,
101 RcptTo: "rcpt@mox.example",
102 DKIMVerifiedDomains: []string{"remote.example"},
103 RemoteIP: "::1",
104 Received: m.Received,
105 MailboxName: "Inbox",
106 Automated: false,
107 },
108 }
109 tcompare(t, in, expIncoming)
110 }
111
112 testIncoming(acc, false)
113 testIncoming(accret, true)
114}
115
116// Test with fromid and various DSNs, and delivery.
117func TestFromIDIncomingDelivery(t *testing.T) {
118 acc, cleanup := setup(t)
119 defer cleanup()
120
121 accret, err := store.OpenAccount(pkglog, "retired")
122 tcheck(t, err, "open account for retired")
123 defer func() {
124 accret.Close()
125 accret.CheckClosed()
126 }()
127
128 // Account that only gets webhook calls, but no retired webhooks.
129 acchook, err := store.OpenAccount(pkglog, "hook")
130 tcheck(t, err, "open account for hook")
131 defer func() {
132 acchook.Close()
133 acchook.CheckClosed()
134 }()
135
136 addr, err := smtp.ParseAddress("mjl@mox.example")
137 tcheck(t, err, "parse address")
138 path := addr.Path()
139
140 now := time.Now().Round(0)
141 m := store.Message{
142 ID: 123,
143 RemoteIP: "::1",
144 MailFrom: "sender@remote.example",
145 MailFromLocalpart: "sender",
146 MailFromDomain: "remote.example",
147 RcptToLocalpart: "rcpt",
148 RcptToDomain: "mox.example",
149 MsgFromLocalpart: "mjl",
150 MsgFromDomain: "mox.example",
151 MsgFromOrgDomain: "mox.example",
152 EHLOValidated: true,
153 MailFromValidated: true,
154 MsgFromValidated: true,
155 EHLOValidation: store.ValidationPass,
156 MailFromValidation: store.ValidationPass,
157 MsgFromValidation: store.ValidationDMARC,
158 DKIMDomains: []string{"remote.example"},
159 Received: now,
160 DSN: true,
161 }
162
163 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
164 t.Helper()
165
166 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
167 tcheck(t, err, "clean up hooks")
168 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
169 tcheck(t, err, "clean up retired messages")
170
171 qmr := MsgRetired{
172 SenderAccount: a.Name,
173 SenderLocalpart: "sender",
174 SenderDomainStr: "remote.example",
175 RecipientLocalpart: "rcpt",
176 RecipientDomain: path.IPDomain,
177 RecipientDomainStr: "mox.example",
178 RecipientAddress: "rcpt@mox.example",
179 Success: true,
180 KeepUntil: now.Add(time.Minute),
181 }
182 m.RcptToLocalpart = "mjl"
183 qmr.FromID = retiredFromID
184 m.Size = int64(len(rawmsg))
185 m.RcptToLocalpart += smtp.Localpart("+unique")
186
187 err = DB.Insert(ctxbg, &qmr)
188 tcheck(t, err, "insert retired message to match")
189
190 if expOut != nil {
191 expOut.QueueMsgID = qmr.ID
192 }
193
194 mr := bytes.NewReader(rawmsg)
195 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
196 tcheck(t, err, "parsing message")
197
198 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
199 tcheck(t, err, "pass incoming message")
200
201 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
202 tcheck(t, err, "list hooks")
203 if !expIn && expOut == nil {
204 tcompare(t, len(hl), 0)
205 return
206 }
207 tcompare(t, len(hl), 1)
208 h := hl[0]
209 tcompare(t, h.IsIncoming, expIn)
210 if expIn {
211 return
212 }
213 var out webhook.Outgoing
214 dec := json.NewDecoder(strings.NewReader(h.Payload))
215 err = dec.Decode(&out)
216 tcheck(t, err, "decode outgoing webhook")
217
218 out.WebhookQueued = time.Time{}
219 tcompare(t, &out, expOut)
220 }
221
222 dsncompose := func(m *dsn.Message) []byte {
223 buf, err := m.Compose(pkglog, false)
224 tcheck(t, err, "compose dsn")
225 return buf
226 }
227 makedsn := func(action dsn.Action) *dsn.Message {
228 return &dsn.Message{
229 From: path,
230 To: path,
231 TextBody: "explanation",
232 MessageID: "<dsnmsgid@localhost>",
233 ReportingMTA: "localhost",
234 Recipients: []dsn.Recipient{
235 {
236 FinalRecipient: path,
237 Action: action,
238 Status: "5.0.0.",
239 DiagnosticCodeSMTP: "554 5.0.0 error",
240 },
241 },
242 }
243 }
244
245 msgfailed := dsncompose(makedsn(dsn.Failed))
246
247 // No FromID to match against, so we get a webhook for a new incoming message.
248 testIncoming(acc, msgfailed, "", false, nil)
249 testIncoming(accret, msgfailed, "mismatch", true, nil)
250
251 // DSN with multiple recipients are treated as unrecognized dsns.
252 multidsn := makedsn(dsn.Delivered)
253 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
254 msgmultidsn := dsncompose(multidsn)
255 testIncoming(acc, msgmultidsn, "unique", false, nil)
256 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
257 Event: webhook.EventUnrecognized,
258 DSN: true,
259 FromID: "unique",
260 })
261
262 msgdelayed := dsncompose(makedsn(dsn.Delayed))
263 testIncoming(acc, msgdelayed, "unique", false, nil)
264 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
265 Event: webhook.EventDelayed,
266 DSN: true,
267 FromID: "unique",
268 SMTPCode: 554,
269 SMTPEnhancedCode: "5.0.0",
270 })
271
272 msgrelayed := dsncompose(makedsn(dsn.Relayed))
273 testIncoming(acc, msgrelayed, "unique", false, nil)
274 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
275 Event: webhook.EventRelayed,
276 DSN: true,
277 FromID: "unique",
278 SMTPCode: 554,
279 SMTPEnhancedCode: "5.0.0",
280 })
281
282 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
283 testIncoming(acc, msgunrecognized, "unique", false, nil)
284 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
285 Event: webhook.EventUnrecognized,
286 DSN: true,
287 FromID: "unique",
288 })
289
290 // Not a DSN but to fromid address also causes "unrecognized".
291 msgunrecognized2 := []byte(testmsg)
292 testIncoming(acc, msgunrecognized2, "unique", false, nil)
293 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
294 Event: webhook.EventUnrecognized,
295 DSN: false,
296 FromID: "unique",
297 })
298
299 msgdelivered := dsncompose(makedsn(dsn.Delivered))
300 testIncoming(acc, msgdelivered, "unique", false, nil)
301 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
302 Event: webhook.EventDelivered,
303 DSN: true,
304 FromID: "unique",
305 // This is what DSN claims.
306 SMTPCode: 554,
307 SMTPEnhancedCode: "5.0.0",
308 })
309
310 testIncoming(acc, msgfailed, "unique", false, nil)
311 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
312 Event: webhook.EventFailed,
313 DSN: true,
314 FromID: "unique",
315 SMTPCode: 554,
316 SMTPEnhancedCode: "5.0.0",
317 })
318
319 // We still have a webhook in the queue from the test above.
320 // Try to get the hook delivered. We'll try various error handling cases and superseding.
321
322 qsize, err := HookQueueSize(ctxbg)
323 tcheck(t, err, "hook queue size")
324 tcompare(t, qsize, 1)
325
326 var handler http.HandlerFunc
327 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
328 w.WriteHeader(http.StatusInternalServerError)
329 fmt.Fprintln(w, "server error")
330 })
331 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
332 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
333 http.Error(w, "unauthorized", http.StatusUnauthorized)
334 return
335 }
336 if r.Header.Get("X-Mox-Webhook-ID") == "" {
337 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
338 return
339 }
340 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
341 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
342 return
343 }
344 fmt.Fprintln(w, "ok")
345 })
346 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
347 handler.ServeHTTP(w, r)
348 }))
349 defer hs.Close()
350
351 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
352 tcheck(t, err, "get hook from queue")
353
354 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
355 if next > 0 {
356 t.Fatalf("next scheduled work should be immediate, is %v", next)
357 }
358
359 // Respond with an error and see a retry is scheduled.
360 h.URL = hs.URL
361 // Update hook URL in database, so we can call hookLaunchWork. We'll call
362 // hookDeliver for later attempts.
363 err = DB.Update(ctxbg, &h)
364 tcheck(t, err, "update hook url")
365 handler = handleError
366 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
367 <-hookDeliveryResults
368 err = DB.Get(ctxbg, &h)
369 tcheck(t, err, "get hook after failed delivery attempt")
370 tcompare(t, h.Attempts, 1)
371 tcompare(t, len(h.Results), 1)
372 tcompare(t, h.LastResult().Success, false)
373 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
374 tcompare(t, h.LastResult().Response, "server error\n")
375
376 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
377 if next <= 0 {
378 t.Fatalf("next scheduled work is immediate, shoud be in the future")
379 }
380
381 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
382 tcheck(t, err, "schedule hook to now")
383 tcompare(t, n, 1)
384 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
385 tcheck(t, err, "schedule hook to now")
386 tcompare(t, n, 1)
387 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
388 if next > 0 {
389 t.Fatalf("next scheduled work should be immediate, is %v", next)
390 }
391
392 handler = handleOK
393 hookDeliver(pkglog, h)
394 <-hookDeliveryResults
395 err = DB.Get(ctxbg, &h)
396 tcompare(t, err, bstore.ErrAbsent)
397 hr := HookRetired{ID: h.ID}
398 err = DB.Get(ctxbg, &hr)
399 tcheck(t, err, "get retired hook after delivery")
400 tcompare(t, hr.Attempts, 2)
401 tcompare(t, len(hr.Results), 2)
402 tcompare(t, hr.LastResult().Success, true)
403 tcompare(t, hr.LastResult().Code, http.StatusOK)
404 tcompare(t, hr.LastResult().Response, "ok\n")
405
406 // Check that cleaning up retired webhooks works.
407 cleanupHookRetiredSingle(pkglog)
408 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
409 tcheck(t, err, "listing retired hooks")
410 tcompare(t, len(hrl), 0)
411
412 // Helper to get a representative webhook added to the queue.
413 addHook := func(a *store.Account) {
414 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
415 Event: webhook.EventFailed,
416 DSN: true,
417 FromID: "unique",
418 SMTPCode: 554,
419 SMTPEnhancedCode: "5.0.0",
420 })
421 }
422
423 // Keep attempting and failing delivery until we give up.
424 addHook(accret)
425 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
426 tcheck(t, err, "get added hook")
427 h.URL = hs.URL
428 handler = handleError
429 for i := 0; i < len(hookIntervals); i++ {
430 hookDeliver(pkglog, h)
431 <-hookDeliveryResults
432 err := DB.Get(ctxbg, &h)
433 tcheck(t, err, "get hook")
434 tcompare(t, h.Attempts, i+1)
435 }
436 // Final attempt.
437 hookDeliver(pkglog, h)
438 <-hookDeliveryResults
439 err = DB.Get(ctxbg, &h)
440 tcompare(t, err, bstore.ErrAbsent)
441 hr = HookRetired{ID: h.ID}
442 err = DB.Get(ctxbg, &hr)
443 tcheck(t, err, "get retired hook after failure")
444 tcompare(t, hr.Attempts, len(hookIntervals)+1)
445 tcompare(t, len(hr.Results), len(hookIntervals)+1)
446 tcompare(t, hr.LastResult().Success, false)
447 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
448 tcompare(t, hr.LastResult().Response, "server error\n")
449
450 // Check account "hook" doesn't get retired webhooks.
451 addHook(acchook)
452 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
453 tcheck(t, err, "get added hook")
454 handler = handleOK
455 h.URL = hs.URL
456 hookDeliver(pkglog, h)
457 <-hookDeliveryResults
458 err = DB.Get(ctxbg, &h)
459 tcompare(t, err, bstore.ErrAbsent)
460 hr = HookRetired{ID: h.ID}
461 err = DB.Get(ctxbg, &hr)
462 tcompare(t, err, bstore.ErrAbsent)
463
464 // HookCancel
465 addHook(accret)
466 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
467 tcheck(t, err, "get added hook")
468 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
469 tcheck(t, err, "canceling hook")
470 tcompare(t, n, 1)
471 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
472 tcheck(t, err, "list hook")
473 tcompare(t, len(l), 0)
474
475 // Superseding: When a webhook is scheduled for a message that already has a
476 // pending webhook, the previous webhook should be removed/retired.
477 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
478 tcheck(t, err, "clean up retired webhooks")
479 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
480 tcheck(t, err, "clean up retired messages")
481 qmr := MsgRetired{
482 SenderAccount: accret.Name,
483 SenderLocalpart: "sender",
484 SenderDomainStr: "remote.example",
485 RecipientLocalpart: "rcpt",
486 RecipientDomain: path.IPDomain,
487 RecipientDomainStr: "mox.example",
488 RecipientAddress: "rcpt@mox.example",
489 Success: true,
490 KeepUntil: now.Add(time.Minute),
491 FromID: "unique",
492 }
493 err = DB.Insert(ctxbg, &qmr)
494 tcheck(t, err, "insert retired message to match")
495 m.RcptToLocalpart = "mjl"
496 m.Size = int64(len(msgdelayed))
497 m.RcptToLocalpart += smtp.Localpart("+unique")
498
499 mr := bytes.NewReader(msgdelayed)
500 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
501 tcheck(t, err, "parsing message")
502
503 // Cause first webhook.
504 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
505 tcheck(t, err, "pass incoming message")
506 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
507 tcheck(t, err, "get hook")
508
509 // Cause second webhook for same message. First should now be retired and marked as superseded.
510 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
511 tcheck(t, err, "pass incoming message again")
512 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
513 tcheck(t, err, "get hook")
514 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
515 tcheck(t, err, "get retired hook")
516 tcompare(t, h.ID, hr.ID)
517 tcompare(t, hr.SupersededByID, h2.ID)
518 tcompare(t, h2.ID > h.ID, true)
519}
520
521func TestHookListFilterSort(t *testing.T) {
522 _, cleanup := setup(t)
523 defer cleanup()
524
525 now := time.Now().Round(0)
526 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
527 h1 := h
528 h1.Submitted = now.Add(-time.Second)
529 h1.NextAttempt = now.Add(time.Minute)
530 hl := []Hook{h, h, h, h, h, h1}
531 err := DB.Write(ctxbg, func(tx *bstore.Tx) error {
532 for i := range hl {
533 err := hookInsert(tx, &hl[i], now, time.Minute)
534 tcheck(t, err, "insert hook")
535 }
536 return nil
537 })
538 tcheck(t, err, "inserting hooks")
539 h1 = hl[len(hl)-1]
540
541 hlrev := slices.Clone(hl)
542 slices.Reverse(hlrev)
543
544 // Ascending by nextattempt,id.
545 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
546 tcheck(t, err, "list")
547 tcompare(t, l, hl)
548
549 // Descending by nextattempt,id.
550 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
551 tcheck(t, err, "list")
552 tcompare(t, l, hlrev)
553
554 // Descending by submitted,id.
555 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
556 tcheck(t, err, "list")
557 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
558 tcompare(t, l, ll)
559
560 // Filter by all fields to get a single.
561 allfilters := HookFilter{
562 Max: 2,
563 IDs: []int64{h1.ID},
564 Account: "mjl",
565 Submitted: "<1s",
566 NextAttempt: ">1s",
567 Event: "delivered",
568 }
569 l, err = HookList(ctxbg, allfilters, HookSort{})
570 tcheck(t, err, "list single")
571 tcompare(t, l, []Hook{h1})
572
573 // Paginated NextAttmpt asc.
574 var lastID int64
575 var last any
576 l = nil
577 for {
578 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
579 tcheck(t, err, "list paginated")
580 l = append(l, nl...)
581 if len(nl) == 0 {
582 break
583 }
584 tcompare(t, len(nl), 1)
585 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
586 }
587 tcompare(t, l, hl)
588
589 // Paginated NextAttempt desc.
590 l = nil
591 lastID = 0
592 last = ""
593 for {
594 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
595 tcheck(t, err, "list paginated")
596 l = append(l, nl...)
597 if len(nl) == 0 {
598 break
599 }
600 tcompare(t, len(nl), 1)
601 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
602 }
603 tcompare(t, l, hlrev)
604
605 // Paginated Submitted desc.
606 l = nil
607 lastID = 0
608 last = ""
609 for {
610 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
611 tcheck(t, err, "list paginated")
612 l = append(l, nl...)
613 if len(nl) == 0 {
614 break
615 }
616 tcompare(t, len(nl), 1)
617 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
618 }
619 tcompare(t, l, ll)
620
621 // Paginated Submitted asc.
622 l = nil
623 lastID = 0
624 last = ""
625 for {
626 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
627 tcheck(t, err, "list paginated")
628 l = append(l, nl...)
629 if len(nl) == 0 {
630 break
631 }
632 tcompare(t, len(nl), 1)
633 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
634 }
635 llrev := slices.Clone(ll)
636 slices.Reverse(llrev)
637 tcompare(t, l, llrev)
638
639 // Retire messages and do similar but more basic tests. The code is similar.
640 var hrl []HookRetired
641 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
642 for _, h := range hl {
643 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
644 err := tx.Insert(&hr)
645 tcheck(t, err, "inserting retired")
646 hrl = append(hrl, hr)
647 }
648 return nil
649 })
650 tcheck(t, err, "adding retired")
651
652 // Paginated LastActivity desc.
653 var lr []HookRetired
654 lastID = 0
655 last = ""
656 l = nil
657 for {
658 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
659 tcheck(t, err, "list paginated")
660 lr = append(lr, nl...)
661 if len(nl) == 0 {
662 break
663 }
664 tcompare(t, len(nl), 1)
665 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
666 }
667 hrlrev := slices.Clone(hrl)
668 slices.Reverse(hrlrev)
669 tcompare(t, lr, hrlrev)
670
671 // Filter by all fields to get a single.
672 allretiredfilters := HookRetiredFilter{
673 Max: 2,
674 IDs: []int64{hrlrev[0].ID},
675 Account: "mjl",
676 Submitted: "<1s",
677 LastActivity: ">1s",
678 Event: "delivered",
679 }
680 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
681 tcheck(t, err, "list single")
682 tcompare(t, lr, []HookRetired{hrlrev[0]})
683}
684