1package queue
2
3import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "net/http/httptest"
9 "slices"
10 "strings"
11 "testing"
12 "time"
13
14 "github.com/mjl-/bstore"
15
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
21)
22
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
26 defer cleanup()
27
28 accret, err := store.OpenAccount(pkglog, "retired")
29 tcheck(t, err, "open account for retired")
30 defer func() {
31 accret.Close()
32 accret.CheckClosed()
33 }()
34
35 testIncoming := func(a *store.Account, expIn bool) {
36 t.Helper()
37
38 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
39 tcheck(t, err, "clean up hooks")
40
41 mr := bytes.NewReader([]byte(testmsg))
42 now := time.Now().Round(0)
43 m := store.Message{
44 ID: 123,
45 RemoteIP: "::1",
46 MailFrom: "sender@remote.example",
47 MailFromLocalpart: "sender",
48 MailFromDomain: "remote.example",
49 RcptToLocalpart: "rcpt",
50 RcptToDomain: "mox.example",
51 MsgFromLocalpart: "mjl",
52 MsgFromDomain: "mox.example",
53 MsgFromOrgDomain: "mox.example",
54 EHLOValidated: true,
55 MailFromValidated: true,
56 MsgFromValidated: true,
57 EHLOValidation: store.ValidationPass,
58 MailFromValidation: store.ValidationPass,
59 MsgFromValidation: store.ValidationDMARC,
60 DKIMDomains: []string{"remote.example"},
61 Received: now,
62 Size: int64(len(testmsg)),
63 }
64 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
65 tcheck(t, err, "parsing message")
66
67 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
68 tcheck(t, err, "pass incoming message")
69
70 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
71 tcheck(t, err, "list hooks")
72 if !expIn {
73 tcompare(t, len(hl), 0)
74 return
75 }
76 tcompare(t, len(hl), 1)
77 h := hl[0]
78 tcompare(t, h.IsIncoming, true)
79 var in webhook.Incoming
80 dec := json.NewDecoder(strings.NewReader(h.Payload))
81 err = dec.Decode(&in)
82 tcheck(t, err, "decode incoming webhook")
83 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
84
85 structure, err := PartStructure(pkglog, &part)
86 tcheck(t, err, "part structure")
87
88 expIncoming := webhook.Incoming{
89 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
90 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
91 CC: []webhook.NameAddress{},
92 BCC: []webhook.NameAddress{},
93 ReplyTo: []webhook.NameAddress{},
94 References: []string{},
95 Subject: "test",
96 Text: "test email\n",
97
98 Structure: structure,
99 Meta: webhook.IncomingMeta{
100 MsgID: m.ID,
101 MailFrom: m.MailFrom,
102 MailFromValidated: m.MailFromValidated,
103 MsgFromValidated: m.MsgFromValidated,
104 RcptTo: "rcpt@mox.example",
105 DKIMVerifiedDomains: []string{"remote.example"},
106 RemoteIP: "::1",
107 Received: m.Received,
108 MailboxName: "Inbox",
109 Automated: false,
110 },
111 }
112 tcompare(t, in, expIncoming)
113 }
114
115 testIncoming(acc, false)
116 testIncoming(accret, true)
117}
118
119// Test with fromid and various DSNs, and delivery.
120func TestFromIDIncomingDelivery(t *testing.T) {
121 acc, cleanup := setup(t)
122 defer cleanup()
123
124 accret, err := store.OpenAccount(pkglog, "retired")
125 tcheck(t, err, "open account for retired")
126 defer func() {
127 accret.Close()
128 accret.CheckClosed()
129 }()
130
131 // Account that only gets webhook calls, but no retired webhooks.
132 acchook, err := store.OpenAccount(pkglog, "hook")
133 tcheck(t, err, "open account for hook")
134 defer func() {
135 acchook.Close()
136 acchook.CheckClosed()
137 }()
138
139 addr, err := smtp.ParseAddress("mjl@mox.example")
140 tcheck(t, err, "parse address")
141 path := addr.Path()
142
143 now := time.Now().Round(0)
144 m := store.Message{
145 ID: 123,
146 RemoteIP: "::1",
147 MailFrom: "sender@remote.example",
148 MailFromLocalpart: "sender",
149 MailFromDomain: "remote.example",
150 RcptToLocalpart: "rcpt",
151 RcptToDomain: "mox.example",
152 MsgFromLocalpart: "mjl",
153 MsgFromDomain: "mox.example",
154 MsgFromOrgDomain: "mox.example",
155 EHLOValidated: true,
156 MailFromValidated: true,
157 MsgFromValidated: true,
158 EHLOValidation: store.ValidationPass,
159 MailFromValidation: store.ValidationPass,
160 MsgFromValidation: store.ValidationDMARC,
161 DKIMDomains: []string{"remote.example"},
162 Received: now,
163 DSN: true,
164 }
165
166 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
167 t.Helper()
168
169 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
170 tcheck(t, err, "clean up hooks")
171 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
172 tcheck(t, err, "clean up retired messages")
173
174 qmr := MsgRetired{
175 SenderAccount: a.Name,
176 SenderLocalpart: "sender",
177 SenderDomainStr: "remote.example",
178 RecipientLocalpart: "rcpt",
179 RecipientDomain: path.IPDomain,
180 RecipientDomainStr: "mox.example",
181 RecipientAddress: "rcpt@mox.example",
182 Success: true,
183 KeepUntil: now.Add(time.Minute),
184 }
185 m.RcptToLocalpart = "mjl"
186 qmr.FromID = retiredFromID
187 m.Size = int64(len(rawmsg))
188 m.RcptToLocalpart += smtp.Localpart("+unique")
189
190 err = DB.Insert(ctxbg, &qmr)
191 tcheck(t, err, "insert retired message to match")
192
193 if expOut != nil {
194 expOut.QueueMsgID = qmr.ID
195 }
196
197 mr := bytes.NewReader(rawmsg)
198 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
199 tcheck(t, err, "parsing message")
200
201 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
202 tcheck(t, err, "pass incoming message")
203
204 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
205 tcheck(t, err, "list hooks")
206 if !expIn && expOut == nil {
207 tcompare(t, len(hl), 0)
208 return
209 }
210 tcompare(t, len(hl), 1)
211 h := hl[0]
212 tcompare(t, h.IsIncoming, expIn)
213 if expIn {
214 return
215 }
216 var out webhook.Outgoing
217 dec := json.NewDecoder(strings.NewReader(h.Payload))
218 err = dec.Decode(&out)
219 tcheck(t, err, "decode outgoing webhook")
220
221 out.WebhookQueued = time.Time{}
222 tcompare(t, &out, expOut)
223 }
224
225 dsncompose := func(m *dsn.Message) []byte {
226 buf, err := m.Compose(pkglog, false)
227 tcheck(t, err, "compose dsn")
228 return buf
229 }
230 makedsn := func(action dsn.Action) *dsn.Message {
231 return &dsn.Message{
232 From: path,
233 To: path,
234 TextBody: "explanation",
235 MessageID: "<dsnmsgid@localhost>",
236 ReportingMTA: "localhost",
237 Recipients: []dsn.Recipient{
238 {
239 FinalRecipient: path,
240 Action: action,
241 Status: "5.0.0.",
242 DiagnosticCodeSMTP: "554 5.0.0 error",
243 },
244 },
245 }
246 }
247
248 msgfailed := dsncompose(makedsn(dsn.Failed))
249
250 // No FromID to match against, so we get a webhook for a new incoming message.
251 testIncoming(acc, msgfailed, "", false, nil)
252 testIncoming(accret, msgfailed, "mismatch", true, nil)
253
254 // DSN with multiple recipients are treated as unrecognized dsns.
255 multidsn := makedsn(dsn.Delivered)
256 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
257 msgmultidsn := dsncompose(multidsn)
258 testIncoming(acc, msgmultidsn, "unique", false, nil)
259 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
260 Event: webhook.EventUnrecognized,
261 DSN: true,
262 FromID: "unique",
263 })
264
265 msgdelayed := dsncompose(makedsn(dsn.Delayed))
266 testIncoming(acc, msgdelayed, "unique", false, nil)
267 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
268 Event: webhook.EventDelayed,
269 DSN: true,
270 FromID: "unique",
271 SMTPCode: 554,
272 SMTPEnhancedCode: "5.0.0",
273 })
274
275 msgrelayed := dsncompose(makedsn(dsn.Relayed))
276 testIncoming(acc, msgrelayed, "unique", false, nil)
277 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
278 Event: webhook.EventRelayed,
279 DSN: true,
280 FromID: "unique",
281 SMTPCode: 554,
282 SMTPEnhancedCode: "5.0.0",
283 })
284
285 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
286 testIncoming(acc, msgunrecognized, "unique", false, nil)
287 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
288 Event: webhook.EventUnrecognized,
289 DSN: true,
290 FromID: "unique",
291 })
292
293 // Not a DSN but to fromid address also causes "unrecognized".
294 msgunrecognized2 := []byte(testmsg)
295 testIncoming(acc, msgunrecognized2, "unique", false, nil)
296 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
297 Event: webhook.EventUnrecognized,
298 DSN: false,
299 FromID: "unique",
300 })
301
302 msgdelivered := dsncompose(makedsn(dsn.Delivered))
303 testIncoming(acc, msgdelivered, "unique", false, nil)
304 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
305 Event: webhook.EventDelivered,
306 DSN: true,
307 FromID: "unique",
308 // This is what DSN claims.
309 SMTPCode: 554,
310 SMTPEnhancedCode: "5.0.0",
311 })
312
313 testIncoming(acc, msgfailed, "unique", false, nil)
314 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
315 Event: webhook.EventFailed,
316 DSN: true,
317 FromID: "unique",
318 SMTPCode: 554,
319 SMTPEnhancedCode: "5.0.0",
320 })
321
322 // We still have a webhook in the queue from the test above.
323 // Try to get the hook delivered. We'll try various error handling cases and superseding.
324
325 qsize, err := HookQueueSize(ctxbg)
326 tcheck(t, err, "hook queue size")
327 tcompare(t, qsize, 1)
328
329 var handler http.HandlerFunc
330 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
331 w.WriteHeader(http.StatusInternalServerError)
332 fmt.Fprintln(w, "server error")
333 })
334 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
335 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
336 http.Error(w, "unauthorized", http.StatusUnauthorized)
337 return
338 }
339 if r.Header.Get("X-Mox-Webhook-ID") == "" {
340 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
341 return
342 }
343 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
344 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
345 return
346 }
347 fmt.Fprintln(w, "ok")
348 })
349 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
350 handler.ServeHTTP(w, r)
351 }))
352 defer hs.Close()
353
354 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
355 tcheck(t, err, "get hook from queue")
356
357 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
358 if next > 0 {
359 t.Fatalf("next scheduled work should be immediate, is %v", next)
360 }
361
362 // Respond with an error and see a retry is scheduled.
363 h.URL = hs.URL
364 // Update hook URL in database, so we can call hookLaunchWork. We'll call
365 // hookDeliver for later attempts.
366 err = DB.Update(ctxbg, &h)
367 tcheck(t, err, "update hook url")
368 handler = handleError
369 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
370 <-hookDeliveryResults
371 err = DB.Get(ctxbg, &h)
372 tcheck(t, err, "get hook after failed delivery attempt")
373 tcompare(t, h.Attempts, 1)
374 tcompare(t, len(h.Results), 1)
375 tcompare(t, h.LastResult().Success, false)
376 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
377 tcompare(t, h.LastResult().Response, "server error\n")
378
379 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
380 if next <= 0 {
381 t.Fatalf("next scheduled work is immediate, shoud be in the future")
382 }
383
384 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
385 tcheck(t, err, "schedule hook to now")
386 tcompare(t, n, 1)
387 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
388 tcheck(t, err, "schedule hook to now")
389 tcompare(t, n, 1)
390 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
391 if next > 0 {
392 t.Fatalf("next scheduled work should be immediate, is %v", next)
393 }
394
395 handler = handleOK
396 hookDeliver(pkglog, h)
397 <-hookDeliveryResults
398 err = DB.Get(ctxbg, &h)
399 tcompare(t, err, bstore.ErrAbsent)
400 hr := HookRetired{ID: h.ID}
401 err = DB.Get(ctxbg, &hr)
402 tcheck(t, err, "get retired hook after delivery")
403 tcompare(t, hr.Attempts, 2)
404 tcompare(t, len(hr.Results), 2)
405 tcompare(t, hr.LastResult().Success, true)
406 tcompare(t, hr.LastResult().Code, http.StatusOK)
407 tcompare(t, hr.LastResult().Response, "ok\n")
408
409 // Check that cleaning up retired webhooks works.
410 cleanupHookRetiredSingle(pkglog)
411 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
412 tcheck(t, err, "listing retired hooks")
413 tcompare(t, len(hrl), 0)
414
415 // Helper to get a representative webhook added to the queue.
416 addHook := func(a *store.Account) {
417 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
418 Event: webhook.EventFailed,
419 DSN: true,
420 FromID: "unique",
421 SMTPCode: 554,
422 SMTPEnhancedCode: "5.0.0",
423 })
424 }
425
426 // Keep attempting and failing delivery until we give up.
427 addHook(accret)
428 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
429 tcheck(t, err, "get added hook")
430 h.URL = hs.URL
431 handler = handleError
432 for i := 0; i < len(hookIntervals); i++ {
433 hookDeliver(pkglog, h)
434 <-hookDeliveryResults
435 err := DB.Get(ctxbg, &h)
436 tcheck(t, err, "get hook")
437 tcompare(t, h.Attempts, i+1)
438 }
439 // Final attempt.
440 hookDeliver(pkglog, h)
441 <-hookDeliveryResults
442 err = DB.Get(ctxbg, &h)
443 tcompare(t, err, bstore.ErrAbsent)
444 hr = HookRetired{ID: h.ID}
445 err = DB.Get(ctxbg, &hr)
446 tcheck(t, err, "get retired hook after failure")
447 tcompare(t, hr.Attempts, len(hookIntervals)+1)
448 tcompare(t, len(hr.Results), len(hookIntervals)+1)
449 tcompare(t, hr.LastResult().Success, false)
450 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
451 tcompare(t, hr.LastResult().Response, "server error\n")
452
453 // Check account "hook" doesn't get retired webhooks.
454 addHook(acchook)
455 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
456 tcheck(t, err, "get added hook")
457 handler = handleOK
458 h.URL = hs.URL
459 hookDeliver(pkglog, h)
460 <-hookDeliveryResults
461 err = DB.Get(ctxbg, &h)
462 tcompare(t, err, bstore.ErrAbsent)
463 hr = HookRetired{ID: h.ID}
464 err = DB.Get(ctxbg, &hr)
465 tcompare(t, err, bstore.ErrAbsent)
466
467 // HookCancel
468 addHook(accret)
469 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
470 tcheck(t, err, "get added hook")
471 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
472 tcheck(t, err, "canceling hook")
473 tcompare(t, n, 1)
474 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
475 tcheck(t, err, "list hook")
476 tcompare(t, len(l), 0)
477
478 // Superseding: When a webhook is scheduled for a message that already has a
479 // pending webhook, the previous webhook should be removed/retired.
480 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
481 tcheck(t, err, "clean up retired webhooks")
482 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
483 tcheck(t, err, "clean up retired messages")
484 qmr := MsgRetired{
485 SenderAccount: accret.Name,
486 SenderLocalpart: "sender",
487 SenderDomainStr: "remote.example",
488 RecipientLocalpart: "rcpt",
489 RecipientDomain: path.IPDomain,
490 RecipientDomainStr: "mox.example",
491 RecipientAddress: "rcpt@mox.example",
492 Success: true,
493 KeepUntil: now.Add(time.Minute),
494 FromID: "unique",
495 }
496 err = DB.Insert(ctxbg, &qmr)
497 tcheck(t, err, "insert retired message to match")
498 m.RcptToLocalpart = "mjl"
499 m.Size = int64(len(msgdelayed))
500 m.RcptToLocalpart += smtp.Localpart("+unique")
501
502 mr := bytes.NewReader(msgdelayed)
503 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
504 tcheck(t, err, "parsing message")
505
506 // Cause first webhook.
507 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
508 tcheck(t, err, "pass incoming message")
509 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
510 tcheck(t, err, "get hook")
511
512 // Cause second webhook for same message. First should now be retired and marked as superseded.
513 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
514 tcheck(t, err, "pass incoming message again")
515 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
516 tcheck(t, err, "get hook")
517 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
518 tcheck(t, err, "get retired hook")
519 tcompare(t, h.ID, hr.ID)
520 tcompare(t, hr.SupersededByID, h2.ID)
521 tcompare(t, h2.ID > h.ID, true)
522}
523
524func TestHookListFilterSort(t *testing.T) {
525 _, cleanup := setup(t)
526 defer cleanup()
527
528 now := time.Now().Round(0)
529 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
530 h1 := h
531 h1.Submitted = now.Add(-time.Second)
532 h1.NextAttempt = now.Add(time.Minute)
533 hl := []Hook{h, h, h, h, h, h1}
534 err := DB.Write(ctxbg, func(tx *bstore.Tx) error {
535 for i := range hl {
536 err := hookInsert(tx, &hl[i], now, time.Minute)
537 tcheck(t, err, "insert hook")
538 }
539 return nil
540 })
541 tcheck(t, err, "inserting hooks")
542 h1 = hl[len(hl)-1]
543
544 hlrev := slices.Clone(hl)
545 slices.Reverse(hlrev)
546
547 // Ascending by nextattempt,id.
548 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
549 tcheck(t, err, "list")
550 tcompare(t, l, hl)
551
552 // Descending by nextattempt,id.
553 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
554 tcheck(t, err, "list")
555 tcompare(t, l, hlrev)
556
557 // Descending by submitted,id.
558 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
559 tcheck(t, err, "list")
560 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
561 tcompare(t, l, ll)
562
563 // Filter by all fields to get a single.
564 allfilters := HookFilter{
565 Max: 2,
566 IDs: []int64{h1.ID},
567 Account: "mjl",
568 Submitted: "<1s",
569 NextAttempt: ">1s",
570 Event: "delivered",
571 }
572 l, err = HookList(ctxbg, allfilters, HookSort{})
573 tcheck(t, err, "list single")
574 tcompare(t, l, []Hook{h1})
575
576 // Paginated NextAttmpt asc.
577 var lastID int64
578 var last any
579 l = nil
580 for {
581 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
582 tcheck(t, err, "list paginated")
583 l = append(l, nl...)
584 if len(nl) == 0 {
585 break
586 }
587 tcompare(t, len(nl), 1)
588 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
589 }
590 tcompare(t, l, hl)
591
592 // Paginated NextAttempt desc.
593 l = nil
594 lastID = 0
595 last = ""
596 for {
597 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
598 tcheck(t, err, "list paginated")
599 l = append(l, nl...)
600 if len(nl) == 0 {
601 break
602 }
603 tcompare(t, len(nl), 1)
604 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
605 }
606 tcompare(t, l, hlrev)
607
608 // Paginated Submitted desc.
609 l = nil
610 lastID = 0
611 last = ""
612 for {
613 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
614 tcheck(t, err, "list paginated")
615 l = append(l, nl...)
616 if len(nl) == 0 {
617 break
618 }
619 tcompare(t, len(nl), 1)
620 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
621 }
622 tcompare(t, l, ll)
623
624 // Paginated Submitted asc.
625 l = nil
626 lastID = 0
627 last = ""
628 for {
629 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
630 tcheck(t, err, "list paginated")
631 l = append(l, nl...)
632 if len(nl) == 0 {
633 break
634 }
635 tcompare(t, len(nl), 1)
636 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
637 }
638 llrev := slices.Clone(ll)
639 slices.Reverse(llrev)
640 tcompare(t, l, llrev)
641
642 // Retire messages and do similar but more basic tests. The code is similar.
643 var hrl []HookRetired
644 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
645 for _, h := range hl {
646 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
647 err := tx.Insert(&hr)
648 tcheck(t, err, "inserting retired")
649 hrl = append(hrl, hr)
650 }
651 return nil
652 })
653 tcheck(t, err, "adding retired")
654
655 // Paginated LastActivity desc.
656 var lr []HookRetired
657 lastID = 0
658 last = ""
659 l = nil
660 for {
661 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
662 tcheck(t, err, "list paginated")
663 lr = append(lr, nl...)
664 if len(nl) == 0 {
665 break
666 }
667 tcompare(t, len(nl), 1)
668 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
669 }
670 hrlrev := slices.Clone(hrl)
671 slices.Reverse(hrlrev)
672 tcompare(t, lr, hrlrev)
673
674 // Filter by all fields to get a single.
675 allretiredfilters := HookRetiredFilter{
676 Max: 2,
677 IDs: []int64{hrlrev[0].ID},
678 Account: "mjl",
679 Submitted: "<1s",
680 LastActivity: ">1s",
681 Event: "delivered",
682 }
683 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
684 tcheck(t, err, "list single")
685 tcompare(t, lr, []HookRetired{hrlrev[0]})
686}
687