14 "github.com/mjl-/bstore"
16 "github.com/mjl-/mox/dsn"
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/smtp"
19 "github.com/mjl-/mox/store"
20 "github.com/mjl-/mox/webhook"
23// Test webhooks for incoming message that is not related to outgoing deliveries.
24func TestHookIncoming(t *testing.T) {
25 acc, cleanup := setup(t)
28 accret, err := store.OpenAccount(pkglog, "retired")
29 tcheck(t, err, "open account for retired")
35 testIncoming := func(a *store.Account, expIn bool) {
38 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
39 tcheck(t, err, "clean up hooks")
41 mr := bytes.NewReader([]byte(testmsg))
42 now := time.Now().Round(0)
46 MailFrom: "sender@remote.example",
47 MailFromLocalpart: "sender",
48 MailFromDomain: "remote.example",
49 RcptToLocalpart: "rcpt",
50 RcptToDomain: "mox.example",
51 MsgFromLocalpart: "mjl",
52 MsgFromDomain: "mox.example",
53 MsgFromOrgDomain: "mox.example",
55 MailFromValidated: true,
56 MsgFromValidated: true,
57 EHLOValidation: store.ValidationPass,
58 MailFromValidation: store.ValidationPass,
59 MsgFromValidation: store.ValidationDMARC,
60 DKIMDomains: []string{"remote.example"},
62 Size: int64(len(testmsg)),
64 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(testmsg)))
65 tcheck(t, err, "parsing message")
67 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
68 tcheck(t, err, "pass incoming message")
70 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
71 tcheck(t, err, "list hooks")
73 tcompare(t, len(hl), 0)
76 tcompare(t, len(hl), 1)
78 tcompare(t, h.IsIncoming, true)
79 var in webhook.Incoming
80 dec := json.NewDecoder(strings.NewReader(h.Payload))
82 tcheck(t, err, "decode incoming webhook")
83 in.Meta.Received = in.Meta.Received.Local() // For TZ UTC.
85 expIncoming := webhook.Incoming{
86 From: []webhook.NameAddress{{Address: "mjl@mox.example"}},
87 To: []webhook.NameAddress{{Address: "mjl@mox.example"}},
88 CC: []webhook.NameAddress{},
89 BCC: []webhook.NameAddress{},
90 ReplyTo: []webhook.NameAddress{},
91 References: []string{},
95 Structure: webhook.PartStructure(&part),
96 Meta: webhook.IncomingMeta{
99 MailFromValidated: m.MailFromValidated,
100 MsgFromValidated: m.MsgFromValidated,
101 RcptTo: "rcpt@mox.example",
102 DKIMVerifiedDomains: []string{"remote.example"},
104 Received: m.Received,
105 MailboxName: "Inbox",
109 tcompare(t, in, expIncoming)
112 testIncoming(acc, false)
113 testIncoming(accret, true)
116// Test with fromid and various DSNs, and delivery.
117func TestFromIDIncomingDelivery(t *testing.T) {
118 acc, cleanup := setup(t)
121 accret, err := store.OpenAccount(pkglog, "retired")
122 tcheck(t, err, "open account for retired")
128 // Account that only gets webhook calls, but no retired webhooks.
129 acchook, err := store.OpenAccount(pkglog, "hook")
130 tcheck(t, err, "open account for hook")
133 acchook.CheckClosed()
136 addr, err := smtp.ParseAddress("mjl@mox.example")
137 tcheck(t, err, "parse address")
140 now := time.Now().Round(0)
144 MailFrom: "sender@remote.example",
145 MailFromLocalpart: "sender",
146 MailFromDomain: "remote.example",
147 RcptToLocalpart: "rcpt",
148 RcptToDomain: "mox.example",
149 MsgFromLocalpart: "mjl",
150 MsgFromDomain: "mox.example",
151 MsgFromOrgDomain: "mox.example",
153 MailFromValidated: true,
154 MsgFromValidated: true,
155 EHLOValidation: store.ValidationPass,
156 MailFromValidation: store.ValidationPass,
157 MsgFromValidation: store.ValidationDMARC,
158 DKIMDomains: []string{"remote.example"},
163 testIncoming := func(a *store.Account, rawmsg []byte, retiredFromID string, expIn bool, expOut *webhook.Outgoing) {
166 _, err := bstore.QueryDB[Hook](ctxbg, DB).Delete()
167 tcheck(t, err, "clean up hooks")
168 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
169 tcheck(t, err, "clean up retired messages")
172 SenderAccount: a.Name,
173 SenderLocalpart: "sender",
174 SenderDomainStr: "remote.example",
175 RecipientLocalpart: "rcpt",
176 RecipientDomain: path.IPDomain,
177 RecipientDomainStr: "mox.example",
178 RecipientAddress: "rcpt@mox.example",
180 KeepUntil: now.Add(time.Minute),
182 m.RcptToLocalpart = "mjl"
183 qmr.FromID = retiredFromID
184 m.Size = int64(len(rawmsg))
185 m.RcptToLocalpart += smtp.Localpart("+unique")
187 err = DB.Insert(ctxbg, &qmr)
188 tcheck(t, err, "insert retired message to match")
191 expOut.QueueMsgID = qmr.ID
194 mr := bytes.NewReader(rawmsg)
195 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(rawmsg)))
196 tcheck(t, err, "parsing message")
198 err = Incoming(ctxbg, pkglog, a, "<random@localhost>", m, part, "Inbox")
199 tcheck(t, err, "pass incoming message")
201 hl, err := bstore.QueryDB[Hook](ctxbg, DB).List()
202 tcheck(t, err, "list hooks")
203 if !expIn && expOut == nil {
204 tcompare(t, len(hl), 0)
207 tcompare(t, len(hl), 1)
209 tcompare(t, h.IsIncoming, expIn)
213 var out webhook.Outgoing
214 dec := json.NewDecoder(strings.NewReader(h.Payload))
215 err = dec.Decode(&out)
216 tcheck(t, err, "decode outgoing webhook")
218 out.WebhookQueued = time.Time{}
219 tcompare(t, &out, expOut)
222 dsncompose := func(m *dsn.Message) []byte {
223 buf, err := m.Compose(pkglog, false)
224 tcheck(t, err, "compose dsn")
227 makedsn := func(action dsn.Action) *dsn.Message {
231 TextBody: "explanation",
232 MessageID: "<dsnmsgid@localhost>",
233 ReportingMTA: "localhost",
234 Recipients: []dsn.Recipient{
236 FinalRecipient: path,
239 DiagnosticCodeSMTP: "554 5.0.0 error",
245 msgfailed := dsncompose(makedsn(dsn.Failed))
247 // No FromID to match against, so we get a webhook for a new incoming message.
248 testIncoming(acc, msgfailed, "", false, nil)
249 testIncoming(accret, msgfailed, "mismatch", true, nil)
251 // DSN with multiple recipients are treated as unrecognized dsns.
252 multidsn := makedsn(dsn.Delivered)
253 multidsn.Recipients = append(multidsn.Recipients, multidsn.Recipients[0])
254 msgmultidsn := dsncompose(multidsn)
255 testIncoming(acc, msgmultidsn, "unique", false, nil)
256 testIncoming(accret, msgmultidsn, "unique", false, &webhook.Outgoing{
257 Event: webhook.EventUnrecognized,
262 msgdelayed := dsncompose(makedsn(dsn.Delayed))
263 testIncoming(acc, msgdelayed, "unique", false, nil)
264 testIncoming(accret, msgdelayed, "unique", false, &webhook.Outgoing{
265 Event: webhook.EventDelayed,
269 SMTPEnhancedCode: "5.0.0",
272 msgrelayed := dsncompose(makedsn(dsn.Relayed))
273 testIncoming(acc, msgrelayed, "unique", false, nil)
274 testIncoming(accret, msgrelayed, "unique", false, &webhook.Outgoing{
275 Event: webhook.EventRelayed,
279 SMTPEnhancedCode: "5.0.0",
282 msgunrecognized := dsncompose(makedsn(dsn.Action("bogus")))
283 testIncoming(acc, msgunrecognized, "unique", false, nil)
284 testIncoming(accret, msgunrecognized, "unique", false, &webhook.Outgoing{
285 Event: webhook.EventUnrecognized,
290 // Not a DSN but to fromid address also causes "unrecognized".
291 msgunrecognized2 := []byte(testmsg)
292 testIncoming(acc, msgunrecognized2, "unique", false, nil)
293 testIncoming(accret, msgunrecognized2, "unique", false, &webhook.Outgoing{
294 Event: webhook.EventUnrecognized,
299 msgdelivered := dsncompose(makedsn(dsn.Delivered))
300 testIncoming(acc, msgdelivered, "unique", false, nil)
301 testIncoming(accret, msgdelivered, "unique", false, &webhook.Outgoing{
302 Event: webhook.EventDelivered,
305 // This is what DSN claims.
307 SMTPEnhancedCode: "5.0.0",
310 testIncoming(acc, msgfailed, "unique", false, nil)
311 testIncoming(accret, msgfailed, "unique", false, &webhook.Outgoing{
312 Event: webhook.EventFailed,
316 SMTPEnhancedCode: "5.0.0",
319 // We still have a webhook in the queue from the test above.
320 // Try to get the hook delivered. We'll try various error handling cases and superseding.
322 qsize, err := HookQueueSize(ctxbg)
323 tcheck(t, err, "hook queue size")
324 tcompare(t, qsize, 1)
326 var handler http.HandlerFunc
327 handleError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
328 w.WriteHeader(http.StatusInternalServerError)
329 fmt.Fprintln(w, "server error")
331 handleOK := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
332 if r.Header.Get("Authorization") != "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" {
333 http.Error(w, "unauthorized", http.StatusUnauthorized)
336 if r.Header.Get("X-Mox-Webhook-ID") == "" {
337 http.Error(w, "missing header x-mox-webhook-id", http.StatusBadRequest)
340 if r.Header.Get("X-Mox-Webhook-Attempt") == "" {
341 http.Error(w, "missing header x-mox-webhook-attempt", http.StatusBadRequest)
344 fmt.Fprintln(w, "ok")
346 hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
347 handler.ServeHTTP(w, r)
351 h, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
352 tcheck(t, err, "get hook from queue")
354 next := hookNextWork(ctxbg, pkglog, map[string]struct{}{"https://other.example/": {}})
356 t.Fatalf("next scheduled work should be immediate, is %v", next)
359 // Respond with an error and see a retry is scheduled.
361 // Update hook URL in database, so we can call hookLaunchWork. We'll call
362 // hookDeliver for later attempts.
363 err = DB.Update(ctxbg, &h)
364 tcheck(t, err, "update hook url")
365 handler = handleError
366 hookLaunchWork(pkglog, map[string]struct{}{"https://other.example/": {}})
367 <-hookDeliveryResults
368 err = DB.Get(ctxbg, &h)
369 tcheck(t, err, "get hook after failed delivery attempt")
370 tcompare(t, h.Attempts, 1)
371 tcompare(t, len(h.Results), 1)
372 tcompare(t, h.LastResult().Success, false)
373 tcompare(t, h.LastResult().Code, http.StatusInternalServerError)
374 tcompare(t, h.LastResult().Response, "server error\n")
376 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
378 t.Fatalf("next scheduled work is immediate, shoud be in the future")
381 n, err := HookNextAttemptSet(ctxbg, HookFilter{}, time.Now().Add(time.Minute))
382 tcheck(t, err, "schedule hook to now")
384 n, err = HookNextAttemptAdd(ctxbg, HookFilter{}, -time.Minute)
385 tcheck(t, err, "schedule hook to now")
387 next = hookNextWork(ctxbg, pkglog, map[string]struct{}{})
389 t.Fatalf("next scheduled work should be immediate, is %v", next)
393 hookDeliver(pkglog, h)
394 <-hookDeliveryResults
395 err = DB.Get(ctxbg, &h)
396 tcompare(t, err, bstore.ErrAbsent)
397 hr := HookRetired{ID: h.ID}
398 err = DB.Get(ctxbg, &hr)
399 tcheck(t, err, "get retired hook after delivery")
400 tcompare(t, hr.Attempts, 2)
401 tcompare(t, len(hr.Results), 2)
402 tcompare(t, hr.LastResult().Success, true)
403 tcompare(t, hr.LastResult().Code, http.StatusOK)
404 tcompare(t, hr.LastResult().Response, "ok\n")
406 // Check that cleaning up retired webhooks works.
407 cleanupHookRetiredSingle(pkglog)
408 hrl, err := bstore.QueryDB[HookRetired](ctxbg, DB).List()
409 tcheck(t, err, "listing retired hooks")
410 tcompare(t, len(hrl), 0)
412 // Helper to get a representative webhook added to the queue.
413 addHook := func(a *store.Account) {
414 testIncoming(a, msgfailed, "unique", false, &webhook.Outgoing{
415 Event: webhook.EventFailed,
419 SMTPEnhancedCode: "5.0.0",
423 // Keep attempting and failing delivery until we give up.
425 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
426 tcheck(t, err, "get added hook")
428 handler = handleError
429 for i := 0; i < len(hookIntervals); i++ {
430 hookDeliver(pkglog, h)
431 <-hookDeliveryResults
432 err := DB.Get(ctxbg, &h)
433 tcheck(t, err, "get hook")
434 tcompare(t, h.Attempts, i+1)
437 hookDeliver(pkglog, h)
438 <-hookDeliveryResults
439 err = DB.Get(ctxbg, &h)
440 tcompare(t, err, bstore.ErrAbsent)
441 hr = HookRetired{ID: h.ID}
442 err = DB.Get(ctxbg, &hr)
443 tcheck(t, err, "get retired hook after failure")
444 tcompare(t, hr.Attempts, len(hookIntervals)+1)
445 tcompare(t, len(hr.Results), len(hookIntervals)+1)
446 tcompare(t, hr.LastResult().Success, false)
447 tcompare(t, hr.LastResult().Code, http.StatusInternalServerError)
448 tcompare(t, hr.LastResult().Response, "server error\n")
450 // Check account "hook" doesn't get retired webhooks.
452 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
453 tcheck(t, err, "get added hook")
456 hookDeliver(pkglog, h)
457 <-hookDeliveryResults
458 err = DB.Get(ctxbg, &h)
459 tcompare(t, err, bstore.ErrAbsent)
460 hr = HookRetired{ID: h.ID}
461 err = DB.Get(ctxbg, &hr)
462 tcompare(t, err, bstore.ErrAbsent)
466 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
467 tcheck(t, err, "get added hook")
468 n, err = HookCancel(ctxbg, pkglog, HookFilter{})
469 tcheck(t, err, "canceling hook")
471 l, err := HookList(ctxbg, HookFilter{}, HookSort{})
472 tcheck(t, err, "list hook")
473 tcompare(t, len(l), 0)
475 // Superseding: When a webhook is scheduled for a message that already has a
476 // pending webhook, the previous webhook should be removed/retired.
477 _, err = bstore.QueryDB[HookRetired](ctxbg, DB).Delete()
478 tcheck(t, err, "clean up retired webhooks")
479 _, err = bstore.QueryDB[MsgRetired](ctxbg, DB).Delete()
480 tcheck(t, err, "clean up retired messages")
482 SenderAccount: accret.Name,
483 SenderLocalpart: "sender",
484 SenderDomainStr: "remote.example",
485 RecipientLocalpart: "rcpt",
486 RecipientDomain: path.IPDomain,
487 RecipientDomainStr: "mox.example",
488 RecipientAddress: "rcpt@mox.example",
490 KeepUntil: now.Add(time.Minute),
493 err = DB.Insert(ctxbg, &qmr)
494 tcheck(t, err, "insert retired message to match")
495 m.RcptToLocalpart = "mjl"
496 m.Size = int64(len(msgdelayed))
497 m.RcptToLocalpart += smtp.Localpart("+unique")
499 mr := bytes.NewReader(msgdelayed)
500 part, err := message.EnsurePart(pkglog.Logger, true, mr, int64(len(msgdelayed)))
501 tcheck(t, err, "parsing message")
503 // Cause first webhook.
504 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
505 tcheck(t, err, "pass incoming message")
506 h, err = bstore.QueryDB[Hook](ctxbg, DB).Get()
507 tcheck(t, err, "get hook")
509 // Cause second webhook for same message. First should now be retired and marked as superseded.
510 err = Incoming(ctxbg, pkglog, accret, "<random@localhost>", m, part, "Inbox")
511 tcheck(t, err, "pass incoming message again")
512 h2, err := bstore.QueryDB[Hook](ctxbg, DB).Get()
513 tcheck(t, err, "get hook")
514 hr, err = bstore.QueryDB[HookRetired](ctxbg, DB).Get()
515 tcheck(t, err, "get retired hook")
516 tcompare(t, h.ID, hr.ID)
517 tcompare(t, hr.SupersededByID, h2.ID)
518 tcompare(t, h2.ID > h.ID, true)
521func TestHookListFilterSort(t *testing.T) {
522 _, cleanup := setup(t)
525 now := time.Now().Round(0)
526 h := Hook{0, 0, "fromid", "messageid", "subj", nil, "mjl", "http://localhost", "", false, "delivered", "", now, 0, now, []HookResult{}}
528 h1.Submitted = now.Add(-time.Second)
529 h1.NextAttempt = now.Add(time.Minute)
530 hl := []Hook{h, h, h, h, h, h1}
531 err := DB.Write(ctxbg, func(tx *bstore.Tx) error {
533 err := hookInsert(tx, &hl[i], now, time.Minute)
534 tcheck(t, err, "insert hook")
538 tcheck(t, err, "inserting hooks")
541 hlrev := slices.Clone(hl)
542 slices.Reverse(hlrev)
544 // Ascending by nextattempt,id.
545 l, err := HookList(ctxbg, HookFilter{}, HookSort{Asc: true})
546 tcheck(t, err, "list")
549 // Descending by nextattempt,id.
550 l, err = HookList(ctxbg, HookFilter{}, HookSort{})
551 tcheck(t, err, "list")
552 tcompare(t, l, hlrev)
554 // Descending by submitted,id.
555 l, err = HookList(ctxbg, HookFilter{}, HookSort{Field: "Submitted"})
556 tcheck(t, err, "list")
557 ll := append(append([]Hook{}, hlrev[1:]...), hl[5])
560 // Filter by all fields to get a single.
561 allfilters := HookFilter{
569 l, err = HookList(ctxbg, allfilters, HookSort{})
570 tcheck(t, err, "list single")
571 tcompare(t, l, []Hook{h1})
573 // Paginated NextAttmpt asc.
578 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Asc: true, LastID: lastID, Last: last})
579 tcheck(t, err, "list paginated")
584 tcompare(t, len(nl), 1)
585 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
589 // Paginated NextAttempt desc.
594 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{LastID: lastID, Last: last})
595 tcheck(t, err, "list paginated")
600 tcompare(t, len(nl), 1)
601 lastID, last = nl[0].ID, nl[0].NextAttempt.Format(time.RFC3339Nano)
603 tcompare(t, l, hlrev)
605 // Paginated Submitted desc.
610 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", LastID: lastID, Last: last})
611 tcheck(t, err, "list paginated")
616 tcompare(t, len(nl), 1)
617 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
621 // Paginated Submitted asc.
626 nl, err := HookList(ctxbg, HookFilter{Max: 1}, HookSort{Field: "Submitted", Asc: true, LastID: lastID, Last: last})
627 tcheck(t, err, "list paginated")
632 tcompare(t, len(nl), 1)
633 lastID, last = nl[0].ID, nl[0].Submitted.Format(time.RFC3339Nano)
635 llrev := slices.Clone(ll)
636 slices.Reverse(llrev)
637 tcompare(t, l, llrev)
639 // Retire messages and do similar but more basic tests. The code is similar.
640 var hrl []HookRetired
641 err = DB.Write(ctxbg, func(tx *bstore.Tx) error {
642 for _, h := range hl {
643 hr := h.Retired(false, h.NextAttempt, time.Now().Add(time.Minute).Round(0))
644 err := tx.Insert(&hr)
645 tcheck(t, err, "inserting retired")
646 hrl = append(hrl, hr)
650 tcheck(t, err, "adding retired")
652 // Paginated LastActivity desc.
658 nl, err := HookRetiredList(ctxbg, HookRetiredFilter{Max: 1}, HookRetiredSort{LastID: lastID, Last: last})
659 tcheck(t, err, "list paginated")
660 lr = append(lr, nl...)
664 tcompare(t, len(nl), 1)
665 lastID, last = nl[0].ID, nl[0].LastActivity.Format(time.RFC3339Nano)
667 hrlrev := slices.Clone(hrl)
668 slices.Reverse(hrlrev)
669 tcompare(t, lr, hrlrev)
671 // Filter by all fields to get a single.
672 allretiredfilters := HookRetiredFilter{
674 IDs: []int64{hrlrev[0].ID},
680 lr, err = HookRetiredList(ctxbg, allretiredfilters, HookRetiredSort{})
681 tcheck(t, err, "list single")
682 tcompare(t, lr, []HookRetired{hrlrev[0]})