14 "github.com/mjl-/bstore"
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
28 accret, err := store.OpenAccount(pkglog, "retired")
29 tcheck(t, err, "open account for retired")
35 testIncoming := func(a *store.Account, expIn bool) {
38 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
39 tcheck(t, err, "clean up hooks")
41 mr := bytes.NewReader([]byte(testmsg))
42 now := time.Now().Round(0)
46 MailFrom: "sender@remote.example",
47 MailFromLocalpart: "sender",
48 MailFromDomain: "remote.example",
49 RcptToLocalpart: "rcpt",
50 RcptToDomain: "mox.example",
51 MsgFromLocalpart: "mjl",
52 MsgFromDomain: "mox.example",
53 MsgFromOrgDomain: "mox.example",
55 MailFromValidated: true,
56 MsgFromValidated: true,
57 EHLOValidation: store.ValidationPass,
58 MailFromValidation: store.ValidationPass,
59 MsgFromValidation: store.ValidationDMARC,
60 DKIMDomains: []string{"remote.example"},
62 Size: int64(len(testmsg)),
64 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
65 tcheck(t, err, "parsing message")
67 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
68 tcheck(t, err, "pass incoming message")
70 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
71 tcheck(t, err, "list hooks")
73 tcompare(t, len(hl), 0)
76 tcompare(t, len(hl), 1)
78 tcompare(t, h.IsIncoming, true)
79 var in webhook.Incoming
80 dec := json.NewDecoder(strings.NewReader(h.Payload))
82 tcheck(t, err, "decode incoming webhook")
83 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
85 structure, err := PartStructure(pkglog, &part)
86 tcheck(t, err, "part structure")
88 expIncoming := webhook.Incoming{
89 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
90 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
91 CC: []webhook.NameAddress{},
92 BCC: []webhook.NameAddress{},
93 ReplyTo: []webhook.NameAddress{},
94 References: []string{},
99 Meta: webhook.IncomingMeta{
101 MailFrom: m.MailFrom,
102 MailFromValidated: m.MailFromValidated,
103 MsgFromValidated: m.MsgFromValidated,
104 RcptTo: "rcpt@mox.example",
105 DKIMVerifiedDomains: []string{"remote.example"},
107 Received: m.Received,
108 MailboxName: "Inbox",
112 tcompare(t, in, expIncoming)
115 testIncoming(acc, false)
116 testIncoming(accret, true)
119// Test with fromid and various DSNs, and delivery.
120func TestFromIDIncomingDelivery(t *testing.T) {
121 acc, cleanup := setup(t)
124 accret, err := store.OpenAccount(pkglog, "retired")
125 tcheck(t, err, "open account for retired")
131 // Account that only gets webhook calls, but no retired webhooks.
132 acchook, err := store.OpenAccount(pkglog, "hook")
133 tcheck(t, err, "open account for hook")
136 acchook.CheckClosed()
139 addr, err := smtp.ParseAddress("mjl@mox.example")
140 tcheck(t, err, "parse address")
143 now := time.Now().Round(0)
147 MailFrom: "sender@remote.example",
148 MailFromLocalpart: "sender",
149 MailFromDomain: "remote.example",
150 RcptToLocalpart: "rcpt",
151 RcptToDomain: "mox.example",
152 MsgFromLocalpart: "mjl",
153 MsgFromDomain: "mox.example",
154 MsgFromOrgDomain: "mox.example",
156 MailFromValidated: true,
157 MsgFromValidated: true,
158 EHLOValidation: store.ValidationPass,
159 MailFromValidation: store.ValidationPass,
160 MsgFromValidation: store.ValidationDMARC,
161 DKIMDomains: []string{"remote.example"},
166 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
169 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
170 tcheck(t, err, "clean up hooks")
171 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
172 tcheck(t, err, "clean up retired messages")
175 SenderAccount: a.Name,
176 SenderLocalpart: "sender",
177 SenderDomainStr: "remote.example",
178 RecipientLocalpart: "rcpt",
179 RecipientDomain: path.IPDomain,
180 RecipientDomainStr: "mox.example",
181 RecipientAddress: "rcpt@mox.example",
183 KeepUntil: now.Add(time.Minute),
185 m.RcptToLocalpart = "mjl"
186 qmr.FromID = retiredFromID
187 m.Size = int64(len(rawmsg))
188 m.RcptToLocalpart += smtp.Localpart("+unique")
190 err = DB.Insert(ctxbg, &qmr)
191 tcheck(t, err, "insert retired message to match")
194 expOut.QueueMsgID = qmr.ID
197 mr := bytes.NewReader(rawmsg)
198 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
199 tcheck(t, err, "parsing message")
201 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
202 tcheck(t, err, "pass incoming message")
204 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
205 tcheck(t, err, "list hooks")
206 if !expIn && expOut == nil {
207 tcompare(t, len(hl), 0)
210 tcompare(t, len(hl), 1)
212 tcompare(t, h.IsIncoming, expIn)
216 var out webhook.Outgoing
217 dec := json.NewDecoder(strings.NewReader(h.Payload))
218 err = dec.Decode(&out)
219 tcheck(t, err, "decode outgoing webhook")
221 out.WebhookQueued = time.Time{}
222 tcompare(t, &out, expOut)
225 dsncompose := func(m *dsn.Message) []byte {
226 buf, err := m.Compose(pkglog, false)
227 tcheck(t, err, "compose dsn")
230 makedsn := func(action dsn.Action) *dsn.Message {
234 TextBody: "explanation",
235 MessageID: "<dsnmsgid@localhost>",
236 ReportingMTA: "localhost",
237 Recipients: []dsn.Recipient{
239 FinalRecipient: path,
242 DiagnosticCodeSMTP: "554 5.0.0 error",
248 msgfailed := dsncompose(makedsn(dsn.Failed))
250 // No FromID to match against, so we get a webhook for a new incoming message.
251 testIncoming(acc, msgfailed, "", false, nil)
252 testIncoming(accret, msgfailed, "mismatch", true, nil)
254 // DSN with multiple recipients are treated as unrecognized dsns.
255 multidsn := makedsn(dsn.Delivered)
256 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
257 msgmultidsn := dsncompose(multidsn)
258 testIncoming(acc, msgmultidsn, "unique", false, nil)
259 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
260 Event: webhook.EventUnrecognized,
265 msgdelayed := dsncompose(makedsn(dsn.Delayed))
266 testIncoming(acc, msgdelayed, "unique", false, nil)
267 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
268 Event: webhook.EventDelayed,
272 SMTPEnhancedCode: "5.0.0",
275 msgrelayed := dsncompose(makedsn(dsn.Relayed))
276 testIncoming(acc, msgrelayed, "unique", false, nil)
277 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
278 Event: webhook.EventRelayed,
282 SMTPEnhancedCode: "5.0.0",
285 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
286 testIncoming(acc, msgunrecognized, "unique", false, nil)
287 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
288 Event: webhook.EventUnrecognized,
293 // Not a DSN but to fromid address also causes "unrecognized".
294 msgunrecognized2 := []byte(testmsg)
295 testIncoming(acc, msgunrecognized2, "unique", false, nil)
296 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
297 Event: webhook.EventUnrecognized,
302 msgdelivered := dsncompose(makedsn(dsn.Delivered))
303 testIncoming(acc, msgdelivered, "unique", false, nil)
304 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
305 Event: webhook.EventDelivered,
308 // This is what DSN claims.
310 SMTPEnhancedCode: "5.0.0",
313 testIncoming(acc, msgfailed, "unique", false, nil)
314 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
315 Event: webhook.EventFailed,
319 SMTPEnhancedCode: "5.0.0",
322 // We still have a webhook in the queue from the test above.
323 // Try to get the hook delivered. We'll try various error handling cases and superseding.
325 qsize, err := HookQueueSize(ctxbg)
326 tcheck(t, err, "hook queue size")
327 tcompare(t, qsize, 1)
329 var handler http.HandlerFunc
330 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
331 w.WriteHeader(http.StatusInternalServerError)
332 fmt.Fprintln(w, "server error")
334 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
335 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
336 http.Error(w, "unauthorized", http.StatusUnauthorized)
339 if r.Header.Get("X-Mox-Webhook-ID") == "" {
340 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
343 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
344 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
347 fmt.Fprintln(w, "ok")
349 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
350 handler.ServeHTTP(w, r)
354 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
355 tcheck(t, err, "get hook from queue")
357 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
359 t.Fatalf("next scheduled work should be immediate, is %v", next)
362 // Respond with an error and see a retry is scheduled.
364 // Update hook URL in database, so we can call hookLaunchWork. We'll call
365 // hookDeliver for later attempts.
366 err = DB.Update(ctxbg, &h)
367 tcheck(t, err, "update hook url")
368 handler = handleError
369 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
370 <-hookDeliveryResults
371 err = DB.Get(ctxbg, &h)
372 tcheck(t, err, "get hook after failed delivery attempt")
373 tcompare(t, h.Attempts, 1)
374 tcompare(t, len(h.Results), 1)
375 tcompare(t, h.LastResult().Success, false)
376 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
377 tcompare(t, h.LastResult().Response, "server error\n")
379 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
381 t.Fatalf("next scheduled work is immediate, shoud be in the future")
384 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
385 tcheck(t, err, "schedule hook to now")
387 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
388 tcheck(t, err, "schedule hook to now")
390 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
392 t.Fatalf("next scheduled work should be immediate, is %v", next)
396 hookDeliver(pkglog, h)
397 <-hookDeliveryResults
398 err = DB.Get(ctxbg, &h)
399 tcompare(t, err, bstore.ErrAbsent)
400 hr := HookRetired{ID: h.ID}
401 err = DB.Get(ctxbg, &hr)
402 tcheck(t, err, "get retired hook after delivery")
403 tcompare(t, hr.Attempts, 2)
404 tcompare(t, len(hr.Results), 2)
405 tcompare(t, hr.LastResult().Success, true)
406 tcompare(t, hr.LastResult().Code, http.StatusOK)
407 tcompare(t, hr.LastResult().Response, "ok\n")
409 // Check that cleaning up retired webhooks works.
410 cleanupHookRetiredSingle(pkglog)
411 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
412 tcheck(t, err, "listing retired hooks")
413 tcompare(t, len(hrl), 0)
415 // Helper to get a representative webhook added to the queue.
416 addHook := func(a *store.Account) {
417 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
418 Event: webhook.EventFailed,
422 SMTPEnhancedCode: "5.0.0",
426 // Keep attempting and failing delivery until we give up.
428 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
429 tcheck(t, err, "get added hook")
431 handler = handleError
432 for i := 0; i < len(hookIntervals); i++ {
433 hookDeliver(pkglog, h)
434 <-hookDeliveryResults
435 err := DB.Get(ctxbg, &h)
436 tcheck(t, err, "get hook")
437 tcompare(t, h.Attempts, i+1)
440 hookDeliver(pkglog, h)
441 <-hookDeliveryResults
442 err = DB.Get(ctxbg, &h)
443 tcompare(t, err, bstore.ErrAbsent)
444 hr = HookRetired{ID: h.ID}
445 err = DB.Get(ctxbg, &hr)
446 tcheck(t, err, "get retired hook after failure")
447 tcompare(t, hr.Attempts, len(hookIntervals)+1)
448 tcompare(t, len(hr.Results), len(hookIntervals)+1)
449 tcompare(t, hr.LastResult().Success, false)
450 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
451 tcompare(t, hr.LastResult().Response, "server error\n")
453 // Check account "hook" doesn't get retired webhooks.
455 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
456 tcheck(t, err, "get added hook")
459 hookDeliver(pkglog, h)
460 <-hookDeliveryResults
461 err = DB.Get(ctxbg, &h)
462 tcompare(t, err, bstore.ErrAbsent)
463 hr = HookRetired{ID: h.ID}
464 err = DB.Get(ctxbg, &hr)
465 tcompare(t, err, bstore.ErrAbsent)
469 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
470 tcheck(t, err, "get added hook")
471 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
472 tcheck(t, err, "canceling hook")
474 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
475 tcheck(t, err, "list hook")
476 tcompare(t, len(l), 0)
478 // Superseding: When a webhook is scheduled for a message that already has a
479 // pending webhook, the previous webhook should be removed/retired.
480 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
481 tcheck(t, err, "clean up retired webhooks")
482 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
483 tcheck(t, err, "clean up retired messages")
485 SenderAccount: accret.Name,
486 SenderLocalpart: "sender",
487 SenderDomainStr: "remote.example",
488 RecipientLocalpart: "rcpt",
489 RecipientDomain: path.IPDomain,
490 RecipientDomainStr: "mox.example",
491 RecipientAddress: "rcpt@mox.example",
493 KeepUntil: now.Add(time.Minute),
496 err = DB.Insert(ctxbg, &qmr)
497 tcheck(t, err, "insert retired message to match")
498 m.RcptToLocalpart = "mjl"
499 m.Size = int64(len(msgdelayed))
500 m.RcptToLocalpart += smtp.Localpart("+unique")
502 mr := bytes.NewReader(msgdelayed)
503 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
504 tcheck(t, err, "parsing message")
506 // Cause first webhook.
507 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
508 tcheck(t, err, "pass incoming message")
509 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
510 tcheck(t, err, "get hook")
512 // Cause second webhook for same message. First should now be retired and marked as superseded.
513 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
514 tcheck(t, err, "pass incoming message again")
515 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
516 tcheck(t, err, "get hook")
517 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
518 tcheck(t, err, "get retired hook")
519 tcompare(t, h.ID, hr.ID)
520 tcompare(t, hr.SupersededByID, h2.ID)
521 tcompare(t, h2.ID > h.ID, true)
524func TestHookListFilterSort(t *testing.T) {
525 _, cleanup := setup(t)
528 now := time.Now().Round(0)
529 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
531 h1.Submitted = now.Add(-time.Second)
532 h1.NextAttempt = now.Add(time.Minute)
533 hl := []Hook{h, h, h, h, h, h1}
534 err := DB.Write(ctxbg, func(tx *bstore.Tx) error {
536 err := hookInsert(tx, &hl[i], now, time.Minute)
537 tcheck(t, err, "insert hook")
541 tcheck(t, err, "inserting hooks")
544 hlrev := slices.Clone(hl)
545 slices.Reverse(hlrev)
547 // Ascending by nextattempt,id.
548 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
549 tcheck(t, err, "list")
552 // Descending by nextattempt,id.
553 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
554 tcheck(t, err, "list")
555 tcompare(t, l, hlrev)
557 // Descending by submitted,id.
558 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
559 tcheck(t, err, "list")
560 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
563 // Filter by all fields to get a single.
564 allfilters := HookFilter{
572 l, err = HookList(ctxbg, allfilters, HookSort{})
573 tcheck(t, err, "list single")
574 tcompare(t, l, []Hook{h1})
576 // Paginated NextAttmpt asc.
581 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
582 tcheck(t, err, "list paginated")
587 tcompare(t, len(nl), 1)
588 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
592 // Paginated NextAttempt desc.
597 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
598 tcheck(t, err, "list paginated")
603 tcompare(t, len(nl), 1)
604 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
606 tcompare(t, l, hlrev)
608 // Paginated Submitted desc.
613 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
614 tcheck(t, err, "list paginated")
619 tcompare(t, len(nl), 1)
620 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
624 // Paginated Submitted asc.
629 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
630 tcheck(t, err, "list paginated")
635 tcompare(t, len(nl), 1)
636 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
638 llrev := slices.Clone(ll)
639 slices.Reverse(llrev)
640 tcompare(t, l, llrev)
642 // Retire messages and do similar but more basic tests. The code is similar.
643 var hrl []HookRetired
644 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
645 for _, h := range hl {
646 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
647 err := tx.Insert(&hr)
648 tcheck(t, err, "inserting retired")
649 hrl = append(hrl, hr)
653 tcheck(t, err, "adding retired")
655 // Paginated LastActivity desc.
661 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
662 tcheck(t, err, "list paginated")
663 lr = append(lr, nl...)
667 tcompare(t, len(nl), 1)
668 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
670 hrlrev := slices.Clone(hrl)
671 slices.Reverse(hrlrev)
672 tcompare(t, lr, hrlrev)
674 // Filter by all fields to get a single.
675 allretiredfilters := HookRetiredFilter{
677 IDs: []int64{hrlrev[0].ID},
683 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
684 tcheck(t, err, "list single")
685 tcompare(t, lr, []HookRetired{hrlrev[0]})