14 "github.com/mjl-/bstore"
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
28 tcheck(t, err, "queue init")
30 accret, err := store.OpenAccount(pkglog, "retired")
31 tcheck(t, err, "open account for retired")
37 testIncoming := func(a *store.Account, expIn bool) {
40 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
41 tcheck(t, err, "clean up hooks")
43 mr := bytes.NewReader([]byte(testmsg))
44 now := time.Now().Round(0)
48 MailFrom: "sender@remote.example",
49 MailFromLocalpart: "sender",
50 MailFromDomain: "remote.example",
51 RcptToLocalpart: "rcpt",
52 RcptToDomain: "mox.example",
53 MsgFromLocalpart: "mjl",
54 MsgFromDomain: "mox.example",
55 MsgFromOrgDomain: "mox.example",
57 MailFromValidated: true,
58 MsgFromValidated: true,
59 EHLOValidation: store.ValidationPass,
60 MailFromValidation: store.ValidationPass,
61 MsgFromValidation: store.ValidationDMARC,
62 DKIMDomains: []string{"remote.example"},
64 Size: int64(len(testmsg)),
66 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
67 tcheck(t, err, "parsing message")
69 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
70 tcheck(t, err, "pass incoming message")
72 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
73 tcheck(t, err, "list hooks")
75 tcompare(t, len(hl), 0)
78 tcompare(t, len(hl), 1)
80 tcompare(t, h.IsIncoming, true)
81 var in webhook.Incoming
82 dec := json.NewDecoder(strings.NewReader(h.Payload))
84 tcheck(t, err, "decode incoming webhook")
85 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
87 expIncoming := webhook.Incoming{
88 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
89 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
90 CC: []webhook.NameAddress{},
91 BCC: []webhook.NameAddress{},
92 ReplyTo: []webhook.NameAddress{},
93 References: []string{},
97 Structure: webhook.PartStructure(&part),
98 Meta: webhook.IncomingMeta{
100 MailFrom: m.MailFrom,
101 MailFromValidated: m.MailFromValidated,
102 MsgFromValidated: m.MsgFromValidated,
103 RcptTo: "rcpt@mox.example",
104 DKIMVerifiedDomains: []string{"remote.example"},
106 Received: m.Received,
107 MailboxName: "Inbox",
111 tcompare(t, in, expIncoming)
114 testIncoming(acc, false)
115 testIncoming(accret, true)
118// Test with fromid and various DSNs, and delivery.
119func TestFromIDIncomingDelivery(t *testing.T) {
120 acc, cleanup := setup(t)
123 tcheck(t, err, "queue init")
125 accret, err := store.OpenAccount(pkglog, "retired")
126 tcheck(t, err, "open account for retired")
132 // Account that only gets webhook calls, but no retired webhooks.
133 acchook, err := store.OpenAccount(pkglog, "hook")
134 tcheck(t, err, "open account for hook")
137 acchook.CheckClosed()
140 addr, err := smtp.ParseAddress("mjl@mox.example")
141 tcheck(t, err, "parse address")
144 now := time.Now().Round(0)
148 MailFrom: "sender@remote.example",
149 MailFromLocalpart: "sender",
150 MailFromDomain: "remote.example",
151 RcptToLocalpart: "rcpt",
152 RcptToDomain: "mox.example",
153 MsgFromLocalpart: "mjl",
154 MsgFromDomain: "mox.example",
155 MsgFromOrgDomain: "mox.example",
157 MailFromValidated: true,
158 MsgFromValidated: true,
159 EHLOValidation: store.ValidationPass,
160 MailFromValidation: store.ValidationPass,
161 MsgFromValidation: store.ValidationDMARC,
162 DKIMDomains: []string{"remote.example"},
167 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
170 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
171 tcheck(t, err, "clean up hooks")
172 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
173 tcheck(t, err, "clean up retired messages")
176 SenderAccount: a.Name,
177 SenderLocalpart: "sender",
178 SenderDomainStr: "remote.example",
179 RecipientLocalpart: "rcpt",
180 RecipientDomain: path.IPDomain,
181 RecipientDomainStr: "mox.example",
182 RecipientAddress: "rcpt@mox.example",
184 KeepUntil: now.Add(time.Minute),
186 m.RcptToLocalpart = "mjl"
187 qmr.FromID = retiredFromID
188 m.Size = int64(len(rawmsg))
189 m.RcptToLocalpart += smtp.Localpart("+unique")
191 err = DB.Insert(ctxbg, &qmr)
192 tcheck(t, err, "insert retired message to match")
195 expOut.QueueMsgID = qmr.ID
198 mr := bytes.NewReader(rawmsg)
199 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
200 tcheck(t, err, "parsing message")
202 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
203 tcheck(t, err, "pass incoming message")
205 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
206 tcheck(t, err, "list hooks")
207 if !expIn && expOut == nil {
208 tcompare(t, len(hl), 0)
211 tcompare(t, len(hl), 1)
213 tcompare(t, h.IsIncoming, expIn)
217 var out webhook.Outgoing
218 dec := json.NewDecoder(strings.NewReader(h.Payload))
219 err = dec.Decode(&out)
220 tcheck(t, err, "decode outgoing webhook")
222 out.WebhookQueued = time.Time{}
223 tcompare(t, &out, expOut)
226 dsncompose := func(m *dsn.Message) []byte {
227 buf, err := m.Compose(pkglog, false)
228 tcheck(t, err, "compose dsn")
231 makedsn := func(action dsn.Action) *dsn.Message {
235 TextBody: "explanation",
236 MessageID: "<dsnmsgid@localhost>",
237 ReportingMTA: "localhost",
238 Recipients: []dsn.Recipient{
240 FinalRecipient: path,
243 DiagnosticCodeSMTP: "554 5.0.0 error",
249 msgfailed := dsncompose(makedsn(dsn.Failed))
251 // No FromID to match against, so we get a webhook for a new incoming message.
252 testIncoming(acc, msgfailed, "", false, nil)
253 testIncoming(accret, msgfailed, "mismatch", true, nil)
255 // DSN with multiple recipients are treated as unrecognized dsns.
256 multidsn := makedsn(dsn.Delivered)
257 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
258 msgmultidsn := dsncompose(multidsn)
259 testIncoming(acc, msgmultidsn, "unique", false, nil)
260 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
261 Event: webhook.EventUnrecognized,
266 msgdelayed := dsncompose(makedsn(dsn.Delayed))
267 testIncoming(acc, msgdelayed, "unique", false, nil)
268 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
269 Event: webhook.EventDelayed,
273 SMTPEnhancedCode: "5.0.0",
276 msgrelayed := dsncompose(makedsn(dsn.Relayed))
277 testIncoming(acc, msgrelayed, "unique", false, nil)
278 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
279 Event: webhook.EventRelayed,
283 SMTPEnhancedCode: "5.0.0",
286 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
287 testIncoming(acc, msgunrecognized, "unique", false, nil)
288 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
289 Event: webhook.EventUnrecognized,
294 // Not a DSN but to fromid address also causes "unrecognized".
295 msgunrecognized2 := []byte(testmsg)
296 testIncoming(acc, msgunrecognized2, "unique", false, nil)
297 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
298 Event: webhook.EventUnrecognized,
303 msgdelivered := dsncompose(makedsn(dsn.Delivered))
304 testIncoming(acc, msgdelivered, "unique", false, nil)
305 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
306 Event: webhook.EventDelivered,
309 // This is what DSN claims.
311 SMTPEnhancedCode: "5.0.0",
314 testIncoming(acc, msgfailed, "unique", false, nil)
315 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
316 Event: webhook.EventFailed,
320 SMTPEnhancedCode: "5.0.0",
323 // We still have a webhook in the queue from the test above.
324 // Try to get the hook delivered. We'll try various error handling cases and superseding.
326 qsize, err := HookQueueSize(ctxbg)
327 tcheck(t, err, "hook queue size")
328 tcompare(t, qsize, 1)
330 var handler http.HandlerFunc
331 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
332 w.WriteHeader(http.StatusInternalServerError)
333 fmt.Fprintln(w, "server error")
335 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
336 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
337 http.Error(w, "unauthorized", http.StatusUnauthorized)
340 if r.Header.Get("X-Mox-Webhook-ID") == "" {
341 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
344 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
345 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
348 fmt.Fprintln(w, "ok")
350 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
351 handler.ServeHTTP(w, r)
355 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
356 tcheck(t, err, "get hook from queue")
358 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
360 t.Fatalf("next scheduled work should be immediate, is %v", next)
363 // Respond with an error and see a retry is scheduled.
365 // Update hook URL in database, so we can call hookLaunchWork. We'll call
366 // hookDeliver for later attempts.
367 err = DB.Update(ctxbg, &h)
368 tcheck(t, err, "update hook url")
369 handler = handleError
370 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
371 <-hookDeliveryResults
372 err = DB.Get(ctxbg, &h)
373 tcheck(t, err, "get hook after failed delivery attempt")
374 tcompare(t, h.Attempts, 1)
375 tcompare(t, len(h.Results), 1)
376 tcompare(t, h.LastResult().Success, false)
377 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
378 tcompare(t, h.LastResult().Response, "server error\n")
380 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
382 t.Fatalf("next scheduled work is immediate, shoud be in the future")
385 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
386 tcheck(t, err, "schedule hook to now")
388 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
389 tcheck(t, err, "schedule hook to now")
391 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
393 t.Fatalf("next scheduled work should be immediate, is %v", next)
397 hookDeliver(pkglog, h)
398 <-hookDeliveryResults
399 err = DB.Get(ctxbg, &h)
400 tcompare(t, err, bstore.ErrAbsent)
401 hr := HookRetired{ID: h.ID}
402 err = DB.Get(ctxbg, &hr)
403 tcheck(t, err, "get retired hook after delivery")
404 tcompare(t, hr.Attempts, 2)
405 tcompare(t, len(hr.Results), 2)
406 tcompare(t, hr.LastResult().Success, true)
407 tcompare(t, hr.LastResult().Code, http.StatusOK)
408 tcompare(t, hr.LastResult().Response, "ok\n")
410 // Check that cleaning up retired webhooks works.
411 cleanupHookRetiredSingle(pkglog)
412 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
413 tcheck(t, err, "listing retired hooks")
414 tcompare(t, len(hrl), 0)
416 // Helper to get a representative webhook added to the queue.
417 addHook := func(a *store.Account) {
418 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
419 Event: webhook.EventFailed,
423 SMTPEnhancedCode: "5.0.0",
427 // Keep attempting and failing delivery until we give up.
429 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
430 tcheck(t, err, "get added hook")
432 handler = handleError
433 for i := 0; i < len(hookIntervals); i++ {
434 hookDeliver(pkglog, h)
435 <-hookDeliveryResults
436 err := DB.Get(ctxbg, &h)
437 tcheck(t, err, "get hook")
438 tcompare(t, h.Attempts, i+1)
441 hookDeliver(pkglog, h)
442 <-hookDeliveryResults
443 err = DB.Get(ctxbg, &h)
444 tcompare(t, err, bstore.ErrAbsent)
445 hr = HookRetired{ID: h.ID}
446 err = DB.Get(ctxbg, &hr)
447 tcheck(t, err, "get retired hook after failure")
448 tcompare(t, hr.Attempts, len(hookIntervals)+1)
449 tcompare(t, len(hr.Results), len(hookIntervals)+1)
450 tcompare(t, hr.LastResult().Success, false)
451 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
452 tcompare(t, hr.LastResult().Response, "server error\n")
454 // Check account "hook" doesn't get retired webhooks.
456 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
457 tcheck(t, err, "get added hook")
460 hookDeliver(pkglog, h)
461 <-hookDeliveryResults
462 err = DB.Get(ctxbg, &h)
463 tcompare(t, err, bstore.ErrAbsent)
464 hr = HookRetired{ID: h.ID}
465 err = DB.Get(ctxbg, &hr)
466 tcompare(t, err, bstore.ErrAbsent)
470 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
471 tcheck(t, err, "get added hook")
472 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
473 tcheck(t, err, "canceling hook")
475 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
476 tcheck(t, err, "list hook")
477 tcompare(t, len(l), 0)
479 // Superseding: When a webhook is scheduled for a message that already has a
480 // pending webhook, the previous webhook should be removed/retired.
481 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
482 tcheck(t, err, "clean up retired webhooks")
483 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
484 tcheck(t, err, "clean up retired messages")
486 SenderAccount: accret.Name,
487 SenderLocalpart: "sender",
488 SenderDomainStr: "remote.example",
489 RecipientLocalpart: "rcpt",
490 RecipientDomain: path.IPDomain,
491 RecipientDomainStr: "mox.example",
492 RecipientAddress: "rcpt@mox.example",
494 KeepUntil: now.Add(time.Minute),
497 err = DB.Insert(ctxbg, &qmr)
498 tcheck(t, err, "insert retired message to match")
499 m.RcptToLocalpart = "mjl"
500 m.Size = int64(len(msgdelayed))
501 m.RcptToLocalpart += smtp.Localpart("+unique")
503 mr := bytes.NewReader(msgdelayed)
504 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
505 tcheck(t, err, "parsing message")
507 // Cause first webhook.
508 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
509 tcheck(t, err, "pass incoming message")
510 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
511 tcheck(t, err, "get hook")
513 // Cause second webhook for same message. First should now be retired and marked as superseded.
514 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
515 tcheck(t, err, "pass incoming message again")
516 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
517 tcheck(t, err, "get hook")
518 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
519 tcheck(t, err, "get retired hook")
520 tcompare(t, h.ID, hr.ID)
521 tcompare(t, hr.SupersededByID, h2.ID)
522 tcompare(t, h2.ID > h.ID, true)
525func TestHookListFilterSort(t *testing.T) {
526 _, cleanup := setup(t)
529 tcheck(t, err, "queue init")
531 now := time.Now().Round(0)
532 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
534 h1.Submitted = now.Add(-time.Second)
535 h1.NextAttempt = now.Add(time.Minute)
536 hl := []Hook{h, h, h, h, h, h1}
537 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
539 err := hookInsert(tx, &hl[i], now, time.Minute)
540 tcheck(t, err, "insert hook")
544 tcheck(t, err, "inserting hooks")
547 hlrev := slices.Clone(hl)
548 slices.Reverse(hlrev)
550 // Ascending by nextattempt,id.
551 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
552 tcheck(t, err, "list")
555 // Descending by nextattempt,id.
556 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
557 tcheck(t, err, "list")
558 tcompare(t, l, hlrev)
560 // Descending by submitted,id.
561 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
562 tcheck(t, err, "list")
563 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
566 // Filter by all fields to get a single.
567 allfilters := HookFilter{
575 l, err = HookList(ctxbg, allfilters, HookSort{})
576 tcheck(t, err, "list single")
577 tcompare(t, l, []Hook{h1})
579 // Paginated NextAttmpt asc.
584 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
585 tcheck(t, err, "list paginated")
590 tcompare(t, len(nl), 1)
591 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
595 // Paginated NextAttempt desc.
600 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
601 tcheck(t, err, "list paginated")
606 tcompare(t, len(nl), 1)
607 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
609 tcompare(t, l, hlrev)
611 // Paginated Submitted desc.
616 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
617 tcheck(t, err, "list paginated")
622 tcompare(t, len(nl), 1)
623 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
627 // Paginated Submitted asc.
632 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
633 tcheck(t, err, "list paginated")
638 tcompare(t, len(nl), 1)
639 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
641 llrev := slices.Clone(ll)
642 slices.Reverse(llrev)
643 tcompare(t, l, llrev)
645 // Retire messages and do similar but more basic tests. The code is similar.
646 var hrl []HookRetired
647 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
648 for _, h := range hl {
649 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
650 err := tx.Insert(&hr)
651 tcheck(t, err, "inserting retired")
652 hrl = append(hrl, hr)
656 tcheck(t, err, "adding retired")
658 // Paginated LastActivity desc.
664 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
665 tcheck(t, err, "list paginated")
666 lr = append(lr, nl...)
670 tcompare(t, len(nl), 1)
671 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
673 hrlrev := slices.Clone(hrl)
674 slices.Reverse(hrlrev)
675 tcompare(t, lr, hrlrev)
677 // Filter by all fields to get a single.
678 allretiredfilters := HookRetiredFilter{
680 IDs: []int64{hrlrev[0].ID},
686 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
687 tcheck(t, err, "list single")
688 tcompare(t, lr, []HookRetired{hrlrev[0]})