1package queue
2
3import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "net/http/httptest"
9 "slices"
10 "strings"
11 "testing"
12 "time"
13
14 "github.com/mjl-/bstore"
15
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
21)
22
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
26 defer cleanup()
27 err := Init()
28 tcheck(t, err, "queue init")
29
30 accret, err := store.OpenAccount(pkglog, "retired")
31 tcheck(t, err, "open account for retired")
32 defer func() {
33 accret.Close()
34 accret.CheckClosed()
35 }()
36
37 testIncoming := func(a *store.Account, expIn bool) {
38 t.Helper()
39
40 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
41 tcheck(t, err, "clean up hooks")
42
43 mr := bytes.NewReader([]byte(testmsg))
44 now := time.Now().Round(0)
45 m := store.Message{
46 ID: 123,
47 RemoteIP: "::1",
48 MailFrom: "sender@remote.example",
49 MailFromLocalpart: "sender",
50 MailFromDomain: "remote.example",
51 RcptToLocalpart: "rcpt",
52 RcptToDomain: "mox.example",
53 MsgFromLocalpart: "mjl",
54 MsgFromDomain: "mox.example",
55 MsgFromOrgDomain: "mox.example",
56 EHLOValidated: true,
57 MailFromValidated: true,
58 MsgFromValidated: true,
59 EHLOValidation: store.ValidationPass,
60 MailFromValidation: store.ValidationPass,
61 MsgFromValidation: store.ValidationDMARC,
62 DKIMDomains: []string{"remote.example"},
63 Received: now,
64 Size: int64(len(testmsg)),
65 }
66 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
67 tcheck(t, err, "parsing message")
68
69 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
70 tcheck(t, err, "pass incoming message")
71
72 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
73 tcheck(t, err, "list hooks")
74 if !expIn {
75 tcompare(t, len(hl), 0)
76 return
77 }
78 tcompare(t, len(hl), 1)
79 h := hl[0]
80 tcompare(t, h.IsIncoming, true)
81 var in webhook.Incoming
82 dec := json.NewDecoder(strings.NewReader(h.Payload))
83 err = dec.Decode(&in)
84 tcheck(t, err, "decode incoming webhook")
85 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
86
87 expIncoming := webhook.Incoming{
88 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
89 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
90 CC: []webhook.NameAddress{},
91 BCC: []webhook.NameAddress{},
92 ReplyTo: []webhook.NameAddress{},
93 References: []string{},
94 Subject: "test",
95 Text: "test email\n",
96
97 Structure: webhook.PartStructure(&part),
98 Meta: webhook.IncomingMeta{
99 MsgID: m.ID,
100 MailFrom: m.MailFrom,
101 MailFromValidated: m.MailFromValidated,
102 MsgFromValidated: m.MsgFromValidated,
103 RcptTo: "rcpt@mox.example",
104 DKIMVerifiedDomains: []string{"remote.example"},
105 RemoteIP: "::1",
106 Received: m.Received,
107 MailboxName: "Inbox",
108 Automated: false,
109 },
110 }
111 tcompare(t, in, expIncoming)
112 }
113
114 testIncoming(acc, false)
115 testIncoming(accret, true)
116}
117
118// Test with fromid and various DSNs, and delivery.
119func TestFromIDIncomingDelivery(t *testing.T) {
120 acc, cleanup := setup(t)
121 defer cleanup()
122 err := Init()
123 tcheck(t, err, "queue init")
124
125 accret, err := store.OpenAccount(pkglog, "retired")
126 tcheck(t, err, "open account for retired")
127 defer func() {
128 accret.Close()
129 accret.CheckClosed()
130 }()
131
132 // Account that only gets webhook calls, but no retired webhooks.
133 acchook, err := store.OpenAccount(pkglog, "hook")
134 tcheck(t, err, "open account for hook")
135 defer func() {
136 acchook.Close()
137 acchook.CheckClosed()
138 }()
139
140 addr, err := smtp.ParseAddress("mjl@mox.example")
141 tcheck(t, err, "parse address")
142 path := addr.Path()
143
144 now := time.Now().Round(0)
145 m := store.Message{
146 ID: 123,
147 RemoteIP: "::1",
148 MailFrom: "sender@remote.example",
149 MailFromLocalpart: "sender",
150 MailFromDomain: "remote.example",
151 RcptToLocalpart: "rcpt",
152 RcptToDomain: "mox.example",
153 MsgFromLocalpart: "mjl",
154 MsgFromDomain: "mox.example",
155 MsgFromOrgDomain: "mox.example",
156 EHLOValidated: true,
157 MailFromValidated: true,
158 MsgFromValidated: true,
159 EHLOValidation: store.ValidationPass,
160 MailFromValidation: store.ValidationPass,
161 MsgFromValidation: store.ValidationDMARC,
162 DKIMDomains: []string{"remote.example"},
163 Received: now,
164 DSN: true,
165 }
166
167 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
168 t.Helper()
169
170 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
171 tcheck(t, err, "clean up hooks")
172 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
173 tcheck(t, err, "clean up retired messages")
174
175 qmr := MsgRetired{
176 SenderAccount: a.Name,
177 SenderLocalpart: "sender",
178 SenderDomainStr: "remote.example",
179 RecipientLocalpart: "rcpt",
180 RecipientDomain: path.IPDomain,
181 RecipientDomainStr: "mox.example",
182 RecipientAddress: "rcpt@mox.example",
183 Success: true,
184 KeepUntil: now.Add(time.Minute),
185 }
186 m.RcptToLocalpart = "mjl"
187 qmr.FromID = retiredFromID
188 m.Size = int64(len(rawmsg))
189 m.RcptToLocalpart += smtp.Localpart("+unique")
190
191 err = DB.Insert(ctxbg, &qmr)
192 tcheck(t, err, "insert retired message to match")
193
194 if expOut != nil {
195 expOut.QueueMsgID = qmr.ID
196 }
197
198 mr := bytes.NewReader(rawmsg)
199 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
200 tcheck(t, err, "parsing message")
201
202 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
203 tcheck(t, err, "pass incoming message")
204
205 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
206 tcheck(t, err, "list hooks")
207 if !expIn && expOut == nil {
208 tcompare(t, len(hl), 0)
209 return
210 }
211 tcompare(t, len(hl), 1)
212 h := hl[0]
213 tcompare(t, h.IsIncoming, expIn)
214 if expIn {
215 return
216 }
217 var out webhook.Outgoing
218 dec := json.NewDecoder(strings.NewReader(h.Payload))
219 err = dec.Decode(&out)
220 tcheck(t, err, "decode outgoing webhook")
221
222 out.WebhookQueued = time.Time{}
223 tcompare(t, &out, expOut)
224 }
225
226 dsncompose := func(m *dsn.Message) []byte {
227 buf, err := m.Compose(pkglog, false)
228 tcheck(t, err, "compose dsn")
229 return buf
230 }
231 makedsn := func(action dsn.Action) *dsn.Message {
232 return &dsn.Message{
233 From: path,
234 To: path,
235 TextBody: "explanation",
236 MessageID: "<dsnmsgid@localhost>",
237 ReportingMTA: "localhost",
238 Recipients: []dsn.Recipient{
239 {
240 FinalRecipient: path,
241 Action: action,
242 Status: "5.0.0.",
243 DiagnosticCodeSMTP: "554 5.0.0 error",
244 },
245 },
246 }
247 }
248
249 msgfailed := dsncompose(makedsn(dsn.Failed))
250
251 // No FromID to match against, so we get a webhook for a new incoming message.
252 testIncoming(acc, msgfailed, "", false, nil)
253 testIncoming(accret, msgfailed, "mismatch", true, nil)
254
255 // DSN with multiple recipients are treated as unrecognized dsns.
256 multidsn := makedsn(dsn.Delivered)
257 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
258 msgmultidsn := dsncompose(multidsn)
259 testIncoming(acc, msgmultidsn, "unique", false, nil)
260 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
261 Event: webhook.EventUnrecognized,
262 DSN: true,
263 FromID: "unique",
264 })
265
266 msgdelayed := dsncompose(makedsn(dsn.Delayed))
267 testIncoming(acc, msgdelayed, "unique", false, nil)
268 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
269 Event: webhook.EventDelayed,
270 DSN: true,
271 FromID: "unique",
272 SMTPCode: 554,
273 SMTPEnhancedCode: "5.0.0",
274 })
275
276 msgrelayed := dsncompose(makedsn(dsn.Relayed))
277 testIncoming(acc, msgrelayed, "unique", false, nil)
278 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
279 Event: webhook.EventRelayed,
280 DSN: true,
281 FromID: "unique",
282 SMTPCode: 554,
283 SMTPEnhancedCode: "5.0.0",
284 })
285
286 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
287 testIncoming(acc, msgunrecognized, "unique", false, nil)
288 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
289 Event: webhook.EventUnrecognized,
290 DSN: true,
291 FromID: "unique",
292 })
293
294 // Not a DSN but to fromid address also causes "unrecognized".
295 msgunrecognized2 := []byte(testmsg)
296 testIncoming(acc, msgunrecognized2, "unique", false, nil)
297 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
298 Event: webhook.EventUnrecognized,
299 DSN: false,
300 FromID: "unique",
301 })
302
303 msgdelivered := dsncompose(makedsn(dsn.Delivered))
304 testIncoming(acc, msgdelivered, "unique", false, nil)
305 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
306 Event: webhook.EventDelivered,
307 DSN: true,
308 FromID: "unique",
309 // This is what DSN claims.
310 SMTPCode: 554,
311 SMTPEnhancedCode: "5.0.0",
312 })
313
314 testIncoming(acc, msgfailed, "unique", false, nil)
315 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
316 Event: webhook.EventFailed,
317 DSN: true,
318 FromID: "unique",
319 SMTPCode: 554,
320 SMTPEnhancedCode: "5.0.0",
321 })
322
323 // We still have a webhook in the queue from the test above.
324 // Try to get the hook delivered. We'll try various error handling cases and superseding.
325
326 qsize, err := HookQueueSize(ctxbg)
327 tcheck(t, err, "hook queue size")
328 tcompare(t, qsize, 1)
329
330 var handler http.HandlerFunc
331 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
332 w.WriteHeader(http.StatusInternalServerError)
333 fmt.Fprintln(w, "server error")
334 })
335 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
336 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
337 http.Error(w, "unauthorized", http.StatusUnauthorized)
338 return
339 }
340 if r.Header.Get("X-Mox-Webhook-ID") == "" {
341 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
342 return
343 }
344 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
345 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
346 return
347 }
348 fmt.Fprintln(w, "ok")
349 })
350 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
351 handler.ServeHTTP(w, r)
352 }))
353 defer hs.Close()
354
355 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
356 tcheck(t, err, "get hook from queue")
357
358 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
359 if next > 0 {
360 t.Fatalf("next scheduled work should be immediate, is %v", next)
361 }
362
363 // Respond with an error and see a retry is scheduled.
364 h.URL = hs.URL
365 // Update hook URL in database, so we can call hookLaunchWork. We'll call
366 // hookDeliver for later attempts.
367 err = DB.Update(ctxbg, &h)
368 tcheck(t, err, "update hook url")
369 handler = handleError
370 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
371 <-hookDeliveryResults
372 err = DB.Get(ctxbg, &h)
373 tcheck(t, err, "get hook after failed delivery attempt")
374 tcompare(t, h.Attempts, 1)
375 tcompare(t, len(h.Results), 1)
376 tcompare(t, h.LastResult().Success, false)
377 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
378 tcompare(t, h.LastResult().Response, "server error\n")
379
380 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
381 if next <= 0 {
382 t.Fatalf("next scheduled work is immediate, shoud be in the future")
383 }
384
385 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
386 tcheck(t, err, "schedule hook to now")
387 tcompare(t, n, 1)
388 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
389 tcheck(t, err, "schedule hook to now")
390 tcompare(t, n, 1)
391 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
392 if next > 0 {
393 t.Fatalf("next scheduled work should be immediate, is %v", next)
394 }
395
396 handler = handleOK
397 hookDeliver(pkglog, h)
398 <-hookDeliveryResults
399 err = DB.Get(ctxbg, &h)
400 tcompare(t, err, bstore.ErrAbsent)
401 hr := HookRetired{ID: h.ID}
402 err = DB.Get(ctxbg, &hr)
403 tcheck(t, err, "get retired hook after delivery")
404 tcompare(t, hr.Attempts, 2)
405 tcompare(t, len(hr.Results), 2)
406 tcompare(t, hr.LastResult().Success, true)
407 tcompare(t, hr.LastResult().Code, http.StatusOK)
408 tcompare(t, hr.LastResult().Response, "ok\n")
409
410 // Check that cleaning up retired webhooks works.
411 cleanupHookRetiredSingle(pkglog)
412 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
413 tcheck(t, err, "listing retired hooks")
414 tcompare(t, len(hrl), 0)
415
416 // Helper to get a representative webhook added to the queue.
417 addHook := func(a *store.Account) {
418 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
419 Event: webhook.EventFailed,
420 DSN: true,
421 FromID: "unique",
422 SMTPCode: 554,
423 SMTPEnhancedCode: "5.0.0",
424 })
425 }
426
427 // Keep attempting and failing delivery until we give up.
428 addHook(accret)
429 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
430 tcheck(t, err, "get added hook")
431 h.URL = hs.URL
432 handler = handleError
433 for i := 0; i < len(hookIntervals); i++ {
434 hookDeliver(pkglog, h)
435 <-hookDeliveryResults
436 err := DB.Get(ctxbg, &h)
437 tcheck(t, err, "get hook")
438 tcompare(t, h.Attempts, i+1)
439 }
440 // Final attempt.
441 hookDeliver(pkglog, h)
442 <-hookDeliveryResults
443 err = DB.Get(ctxbg, &h)
444 tcompare(t, err, bstore.ErrAbsent)
445 hr = HookRetired{ID: h.ID}
446 err = DB.Get(ctxbg, &hr)
447 tcheck(t, err, "get retired hook after failure")
448 tcompare(t, hr.Attempts, len(hookIntervals)+1)
449 tcompare(t, len(hr.Results), len(hookIntervals)+1)
450 tcompare(t, hr.LastResult().Success, false)
451 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
452 tcompare(t, hr.LastResult().Response, "server error\n")
453
454 // Check account "hook" doesn't get retired webhooks.
455 addHook(acchook)
456 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
457 tcheck(t, err, "get added hook")
458 handler = handleOK
459 h.URL = hs.URL
460 hookDeliver(pkglog, h)
461 <-hookDeliveryResults
462 err = DB.Get(ctxbg, &h)
463 tcompare(t, err, bstore.ErrAbsent)
464 hr = HookRetired{ID: h.ID}
465 err = DB.Get(ctxbg, &hr)
466 tcompare(t, err, bstore.ErrAbsent)
467
468 // HookCancel
469 addHook(accret)
470 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
471 tcheck(t, err, "get added hook")
472 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
473 tcheck(t, err, "canceling hook")
474 tcompare(t, n, 1)
475 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
476 tcheck(t, err, "list hook")
477 tcompare(t, len(l), 0)
478
479 // Superseding: When a webhook is scheduled for a message that already has a
480 // pending webhook, the previous webhook should be removed/retired.
481 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
482 tcheck(t, err, "clean up retired webhooks")
483 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
484 tcheck(t, err, "clean up retired messages")
485 qmr := MsgRetired{
486 SenderAccount: accret.Name,
487 SenderLocalpart: "sender",
488 SenderDomainStr: "remote.example",
489 RecipientLocalpart: "rcpt",
490 RecipientDomain: path.IPDomain,
491 RecipientDomainStr: "mox.example",
492 RecipientAddress: "rcpt@mox.example",
493 Success: true,
494 KeepUntil: now.Add(time.Minute),
495 FromID: "unique",
496 }
497 err = DB.Insert(ctxbg, &qmr)
498 tcheck(t, err, "insert retired message to match")
499 m.RcptToLocalpart = "mjl"
500 m.Size = int64(len(msgdelayed))
501 m.RcptToLocalpart += smtp.Localpart("+unique")
502
503 mr := bytes.NewReader(msgdelayed)
504 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
505 tcheck(t, err, "parsing message")
506
507 // Cause first webhook.
508 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
509 tcheck(t, err, "pass incoming message")
510 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
511 tcheck(t, err, "get hook")
512
513 // Cause second webhook for same message. First should now be retired and marked as superseded.
514 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
515 tcheck(t, err, "pass incoming message again")
516 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
517 tcheck(t, err, "get hook")
518 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
519 tcheck(t, err, "get retired hook")
520 tcompare(t, h.ID, hr.ID)
521 tcompare(t, hr.SupersededByID, h2.ID)
522 tcompare(t, h2.ID > h.ID, true)
523}
524
525func TestHookListFilterSort(t *testing.T) {
526 _, cleanup := setup(t)
527 defer cleanup()
528 err := Init()
529 tcheck(t, err, "queue init")
530
531 now := time.Now().Round(0)
532 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
533 h1 := h
534 h1.Submitted = now.Add(-time.Second)
535 h1.NextAttempt = now.Add(time.Minute)
536 hl := []Hook{h, h, h, h, h, h1}
537 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
538 for i := range hl {
539 err := hookInsert(tx, &hl[i], now, time.Minute)
540 tcheck(t, err, "insert hook")
541 }
542 return nil
543 })
544 tcheck(t, err, "inserting hooks")
545 h1 = hl[len(hl)-1]
546
547 hlrev := slices.Clone(hl)
548 slices.Reverse(hlrev)
549
550 // Ascending by nextattempt,id.
551 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
552 tcheck(t, err, "list")
553 tcompare(t, l, hl)
554
555 // Descending by nextattempt,id.
556 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
557 tcheck(t, err, "list")
558 tcompare(t, l, hlrev)
559
560 // Descending by submitted,id.
561 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
562 tcheck(t, err, "list")
563 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
564 tcompare(t, l, ll)
565
566 // Filter by all fields to get a single.
567 allfilters := HookFilter{
568 Max: 2,
569 IDs: []int64{h1.ID},
570 Account: "mjl",
571 Submitted: "<1s",
572 NextAttempt: ">1s",
573 Event: "delivered",
574 }
575 l, err = HookList(ctxbg, allfilters, HookSort{})
576 tcheck(t, err, "list single")
577 tcompare(t, l, []Hook{h1})
578
579 // Paginated NextAttmpt asc.
580 var lastID int64
581 var last any
582 l = nil
583 for {
584 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
585 tcheck(t, err, "list paginated")
586 l = append(l, nl...)
587 if len(nl) == 0 {
588 break
589 }
590 tcompare(t, len(nl), 1)
591 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
592 }
593 tcompare(t, l, hl)
594
595 // Paginated NextAttempt desc.
596 l = nil
597 lastID = 0
598 last = ""
599 for {
600 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
601 tcheck(t, err, "list paginated")
602 l = append(l, nl...)
603 if len(nl) == 0 {
604 break
605 }
606 tcompare(t, len(nl), 1)
607 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
608 }
609 tcompare(t, l, hlrev)
610
611 // Paginated Submitted desc.
612 l = nil
613 lastID = 0
614 last = ""
615 for {
616 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
617 tcheck(t, err, "list paginated")
618 l = append(l, nl...)
619 if len(nl) == 0 {
620 break
621 }
622 tcompare(t, len(nl), 1)
623 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
624 }
625 tcompare(t, l, ll)
626
627 // Paginated Submitted asc.
628 l = nil
629 lastID = 0
630 last = ""
631 for {
632 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
633 tcheck(t, err, "list paginated")
634 l = append(l, nl...)
635 if len(nl) == 0 {
636 break
637 }
638 tcompare(t, len(nl), 1)
639 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
640 }
641 llrev := slices.Clone(ll)
642 slices.Reverse(llrev)
643 tcompare(t, l, llrev)
644
645 // Retire messages and do similar but more basic tests. The code is similar.
646 var hrl []HookRetired
647 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
648 for _, h := range hl {
649 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
650 err := tx.Insert(&hr)
651 tcheck(t, err, "inserting retired")
652 hrl = append(hrl, hr)
653 }
654 return nil
655 })
656 tcheck(t, err, "adding retired")
657
658 // Paginated LastActivity desc.
659 var lr []HookRetired
660 lastID = 0
661 last = ""
662 l = nil
663 for {
664 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
665 tcheck(t, err, "list paginated")
666 lr = append(lr, nl...)
667 if len(nl) == 0 {
668 break
669 }
670 tcompare(t, len(nl), 1)
671 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
672 }
673 hrlrev := slices.Clone(hrl)
674 slices.Reverse(hrlrev)
675 tcompare(t, lr, hrlrev)
676
677 // Filter by all fields to get a single.
678 allretiredfilters := HookRetiredFilter{
679 Max: 2,
680 IDs: []int64{hrlrev[0].ID},
681 Account: "mjl",
682 Submitted: "<1s",
683 LastActivity: ">1s",
684 Event: "delivered",
685 }
686 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
687 tcheck(t, err, "list single")
688 tcompare(t, lr, []HookRetired{hrlrev[0]})
689}
690