1package queue
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/textproto"
11 "runtime/debug"
12 "slices"
13 "strconv"
14 "strings"
15 "time"
16
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promauto"
19
20 "github.com/mjl-/bstore"
21
22 "github.com/mjl-/mox/dns"
23 "github.com/mjl-/mox/dsn"
24 "github.com/mjl-/mox/message"
25 "github.com/mjl-/mox/metrics"
26 "github.com/mjl-/mox/mlog"
27 "github.com/mjl-/mox/mox-"
28 "github.com/mjl-/mox/moxvar"
29 "github.com/mjl-/mox/smtp"
30 "github.com/mjl-/mox/store"
31 "github.com/mjl-/mox/webhook"
32 "github.com/mjl-/mox/webops"
33)
34
35var (
36 metricHookRequest = promauto.NewHistogram(
37 prometheus.HistogramOpts{
38 Name: "mox_webhook_request_duration_seconds",
39 Help: "HTTP webhook call duration.",
40 Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 20, 30},
41 },
42 )
43 metricHookResult = promauto.NewCounterVec(
44 prometheus.CounterOpts{
45 Name: "mox_webhook_results_total",
46 Help: "HTTP webhook call results.",
47 },
48 []string{"code"}, // Known http status codes (e.g. "404"), or "<major>xx" for unknown http status codes, or "error".
49 )
50)
51
52// Hook is a webhook call about a delivery. We'll try delivering with backoff until we succeed or fail.
53type Hook struct {
54 ID int64
55 QueueMsgID int64 `bstore:"index"` // Original queue Msg/MsgRetired ID. Zero for hooks for incoming messages.
56 FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
57 MessageID string // Of outgoing or incoming messages. Includes <>.
58 Subject string // Subject of original outgoing message, or of incoming message.
59 Extra map[string]string // From submitted message.
60
61 Account string `bstore:"nonzero"`
62 URL string `bstore:"nonzero"` // Taken from config when webhook is scheduled.
63 Authorization string // Optional value for authorization header to include in HTTP request.
64 IsIncoming bool
65 OutgoingEvent string // Empty string if not outgoing.
66 Payload string // JSON data to be submitted.
67
68 Submitted time.Time `bstore:"default now,index"`
69 Attempts int
70 NextAttempt time.Time `bstore:"nonzero,index"` // Index for fast scheduling.
71 Results []HookResult
72}
73
74// HookResult is the result of a single attempt to deliver a webhook.
75type HookResult struct {
76 Start time.Time
77 Duration time.Duration
78 URL string
79 Success bool
80 Code int // eg 200, 404, 500. 2xx implies success.
81 Error string
82 Response string // Max 512 bytes of HTTP response body.
83}
84
85// for logging queueing or starting delivery of a hook.
86func (h Hook) attrs() []slog.Attr {
87 event := string(h.OutgoingEvent)
88 if h.IsIncoming {
89 event = "incoming"
90 }
91 return []slog.Attr{
92 slog.Int64("webhookid", h.ID),
93 slog.Int("attempts", h.Attempts),
94 slog.Int64("msgid", h.QueueMsgID),
95 slog.String("account", h.Account),
96 slog.String("url", h.URL),
97 slog.String("fromid", h.FromID),
98 slog.String("messageid", h.MessageID),
99 slog.String("event", event),
100 slog.Time("nextattempt", h.NextAttempt),
101 }
102}
103
104// LastResult returns the last result entry, or an empty result.
105func (h Hook) LastResult() HookResult {
106 if len(h.Results) == 0 {
107 return HookResult{}
108 }
109 return h.Results[len(h.Results)-1]
110}
111
112// Retired returns a HookRetired for a Hook, for insertion into the database.
113func (h Hook) Retired(success bool, lastActivity, keepUntil time.Time) HookRetired {
114 return HookRetired{
115 ID: h.ID,
116 QueueMsgID: h.QueueMsgID,
117 FromID: h.FromID,
118 MessageID: h.MessageID,
119 Subject: h.Subject,
120 Extra: h.Extra,
121 Account: h.Account,
122 URL: h.URL,
123 Authorization: h.Authorization != "",
124 IsIncoming: h.IsIncoming,
125 OutgoingEvent: h.OutgoingEvent,
126 Payload: h.Payload,
127 Submitted: h.Submitted,
128 Attempts: h.Attempts,
129 Results: h.Results,
130 Success: success,
131 LastActivity: lastActivity,
132 KeepUntil: keepUntil,
133 }
134}
135
136// HookRetired is a Hook that was delivered/failed/canceled and kept according
137// to the configuration.
138type HookRetired struct {
139 ID int64 // Same as original Hook.ID.
140 QueueMsgID int64 // Original queue Msg or MsgRetired ID. Zero for hooks for incoming messages.
141 FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
142 MessageID string // Of outgoing or incoming messages. Includes <>.
143 Subject string // Subject of original outgoing message, or of incoming message.
144 Extra map[string]string // From submitted message.
145
146 Account string `bstore:"nonzero,index Account+LastActivity"`
147 URL string `bstore:"nonzero"` // Taken from config at start of each attempt.
148 Authorization bool // Whether request had authorization without keeping it around.
149 IsIncoming bool
150 OutgoingEvent string
151 Payload string // JSON data submitted.
152
153 Submitted time.Time
154 SupersededByID int64 // If not 0, a Hook.ID that superseded this one and Done will be true.
155 Attempts int
156 Results []HookResult
157
158 Success bool
159 LastActivity time.Time `bstore:"index"`
160 KeepUntil time.Time `bstore:"index"`
161}
162
163// LastResult returns the last result entry, or an empty result.
164func (h HookRetired) LastResult() HookResult {
165 if len(h.Results) == 0 {
166 return HookResult{}
167 }
168 return h.Results[len(h.Results)-1]
169}
170
171func cleanupHookRetired(done chan struct{}) {
172 log := mlog.New("queue", nil)
173
174 defer func() {
175 x := recover()
176 if x != nil {
177 log.Error("unhandled panic while cleaning up retired webhooks", slog.Any("x", x))
178 debug.PrintStack()
179 metrics.PanicInc(metrics.Queue)
180 }
181 }()
182
183 timer := time.NewTimer(4 * time.Second)
184 for {
185 select {
186 case <-mox.Shutdown.Done():
187 done <- struct{}{}
188 return
189 case <-timer.C:
190 }
191
192 cleanupHookRetiredSingle(log)
193 timer.Reset(time.Hour)
194 }
195}
196
197func cleanupHookRetiredSingle(log mlog.Log) {
198 n, err := bstore.QueryDB[HookRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
199 log.Check(err, "removing old retired webhooks")
200 if n > 0 {
201 log.Debug("cleaned up retired webhooks", slog.Int("count", n))
202 }
203}
204
205func hookRetiredKeep(account string) time.Duration {
206 keep := 24 * 7 * time.Hour
207 accConf, ok := mox.Conf.Account(account)
208 if ok {
209 keep = accConf.KeepRetiredWebhookPeriod
210 }
211 return keep
212}
213
214// HookFilter filters messages to list or operate on. Used by admin web interface
215// and cli.
216//
217// Only non-empty/non-zero values are applied to the filter. Leaving all fields
218// empty/zero matches all hooks.
219type HookFilter struct {
220 Max int
221 IDs []int64
222 Account string
223 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
224 NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
225 Event string // Including "incoming".
226}
227
228func (f HookFilter) apply(q *bstore.Query[Hook]) error {
229 if len(f.IDs) > 0 {
230 q.FilterIDs(f.IDs)
231 }
232 applyTime := func(field string, s string) error {
233 orig := s
234 var less bool
235 if strings.HasPrefix(s, "<") {
236 less = true
237 } else if !strings.HasPrefix(s, ">") {
238 return fmt.Errorf(`must start with "<" for less or ">" for greater than a duration ago`)
239 }
240 s = strings.TrimSpace(s[1:])
241 var t time.Time
242 if s == "now" {
243 t = time.Now()
244 } else if d, err := time.ParseDuration(s); err != nil {
245 return fmt.Errorf("parsing duration %q: %v", orig, err)
246 } else {
247 t = time.Now().Add(d)
248 }
249 if less {
250 q.FilterLess(field, t)
251 } else {
252 q.FilterGreater(field, t)
253 }
254 return nil
255 }
256 if f.Submitted != "" {
257 if err := applyTime("Submitted", f.Submitted); err != nil {
258 return fmt.Errorf("applying filter for submitted: %v", err)
259 }
260 }
261 if f.NextAttempt != "" {
262 if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
263 return fmt.Errorf("applying filter for next attempt: %v", err)
264 }
265 }
266 if f.Account != "" {
267 q.FilterNonzero(Hook{Account: f.Account})
268 }
269 if f.Event != "" {
270 if f.Event == "incoming" {
271 q.FilterNonzero(Hook{IsIncoming: true})
272 } else {
273 q.FilterNonzero(Hook{OutgoingEvent: f.Event})
274 }
275 }
276 if f.Max != 0 {
277 q.Limit(f.Max)
278 }
279 return nil
280}
281
282type HookSort struct {
283 Field string // "Queued" or "NextAttempt"/"".
284 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
285 Last any // Value of Field for last object. Must be set iff LastID is set.
286 Asc bool // Ascending, or descending.
287}
288
289func (s HookSort) apply(q *bstore.Query[Hook]) error {
290 switch s.Field {
291 case "", "NextAttempt":
292 s.Field = "NextAttempt"
293 case "Submitted":
294 s.Field = "Submitted"
295 default:
296 return fmt.Errorf("unknown sort order field %q", s.Field)
297 }
298
299 if s.LastID > 0 {
300 ls, ok := s.Last.(string)
301 if !ok {
302 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
303 }
304 last, err := time.Parse(time.RFC3339Nano, ls)
305 if err != nil {
306 last, err = time.Parse(time.RFC3339, ls)
307 }
308 if err != nil {
309 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
310 }
311 q.FilterNotEqual("ID", s.LastID)
312 var fieldEqual func(h Hook) bool
313 if s.Field == "NextAttempt" {
314 fieldEqual = func(h Hook) bool { return h.NextAttempt.Equal(last) }
315 } else {
316 fieldEqual = func(h Hook) bool { return h.Submitted.Equal(last) }
317 }
318 if s.Asc {
319 q.FilterGreaterEqual(s.Field, last)
320 q.FilterFn(func(h Hook) bool {
321 return !fieldEqual(h) || h.ID > s.LastID
322 })
323 } else {
324 q.FilterLessEqual(s.Field, last)
325 q.FilterFn(func(h Hook) bool {
326 return !fieldEqual(h) || h.ID < s.LastID
327 })
328 }
329 }
330 if s.Asc {
331 q.SortAsc(s.Field, "ID")
332 } else {
333 q.SortDesc(s.Field, "ID")
334 }
335 return nil
336}
337
338// HookQueueSize returns the number of webhooks in the queue.
339func HookQueueSize(ctx context.Context) (int, error) {
340 return bstore.QueryDB[Hook](ctx, DB).Count()
341}
342
343// HookList returns webhooks according to filter and sort.
344func HookList(ctx context.Context, filter HookFilter, sort HookSort) ([]Hook, error) {
345 q := bstore.QueryDB[Hook](ctx, DB)
346 if err := filter.apply(q); err != nil {
347 return nil, err
348 }
349 if err := sort.apply(q); err != nil {
350 return nil, err
351 }
352 return q.List()
353}
354
355// HookRetiredFilter filters messages to list or operate on. Used by admin web interface
356// and cli.
357//
358// Only non-empty/non-zero values are applied to the filter. Leaving all fields
359// empty/zero matches all hooks.
360type HookRetiredFilter struct {
361 Max int
362 IDs []int64
363 Account string
364 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
365 LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
366 Event string // Including "incoming".
367}
368
369func (f HookRetiredFilter) apply(q *bstore.Query[HookRetired]) error {
370 if len(f.IDs) > 0 {
371 q.FilterIDs(f.IDs)
372 }
373 applyTime := func(field string, s string) error {
374 orig := s
375 var less bool
376 if strings.HasPrefix(s, "<") {
377 less = true
378 } else if !strings.HasPrefix(s, ">") {
379 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
380 }
381 s = strings.TrimSpace(s[1:])
382 var t time.Time
383 if s == "now" {
384 t = time.Now()
385 } else if d, err := time.ParseDuration(s); err != nil {
386 return fmt.Errorf("parsing duration %q: %v", orig, err)
387 } else {
388 t = time.Now().Add(d)
389 }
390 if less {
391 q.FilterLess(field, t)
392 } else {
393 q.FilterGreater(field, t)
394 }
395 return nil
396 }
397 if f.Submitted != "" {
398 if err := applyTime("Submitted", f.Submitted); err != nil {
399 return fmt.Errorf("applying filter for submitted: %v", err)
400 }
401 }
402 if f.LastActivity != "" {
403 if err := applyTime("LastActivity", f.LastActivity); err != nil {
404 return fmt.Errorf("applying filter for last activity: %v", err)
405 }
406 }
407 if f.Account != "" {
408 q.FilterNonzero(HookRetired{Account: f.Account})
409 }
410 if f.Event != "" {
411 if f.Event == "incoming" {
412 q.FilterNonzero(HookRetired{IsIncoming: true})
413 } else {
414 q.FilterNonzero(HookRetired{OutgoingEvent: f.Event})
415 }
416 }
417 if f.Max != 0 {
418 q.Limit(f.Max)
419 }
420 return nil
421}
422
423type HookRetiredSort struct {
424 Field string // "Queued" or "LastActivity"/"".
425 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
426 Last any // Value of Field for last object. Must be set iff LastID is set.
427 Asc bool // Ascending, or descending.
428}
429
430func (s HookRetiredSort) apply(q *bstore.Query[HookRetired]) error {
431 switch s.Field {
432 case "", "LastActivity":
433 s.Field = "LastActivity"
434 case "Submitted":
435 s.Field = "Submitted"
436 default:
437 return fmt.Errorf("unknown sort order field %q", s.Field)
438 }
439
440 if s.LastID > 0 {
441 ls, ok := s.Last.(string)
442 if !ok {
443 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
444 }
445 last, err := time.Parse(time.RFC3339Nano, ls)
446 if err != nil {
447 last, err = time.Parse(time.RFC3339, ls)
448 }
449 if err != nil {
450 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
451 }
452 q.FilterNotEqual("ID", s.LastID)
453 var fieldEqual func(hr HookRetired) bool
454 if s.Field == "LastActivity" {
455 fieldEqual = func(hr HookRetired) bool { return hr.LastActivity.Equal(last) }
456 } else {
457 fieldEqual = func(hr HookRetired) bool { return hr.Submitted.Equal(last) }
458 }
459 if s.Asc {
460 q.FilterGreaterEqual(s.Field, last)
461 q.FilterFn(func(hr HookRetired) bool {
462 return !fieldEqual(hr) || hr.ID > s.LastID
463 })
464 } else {
465 q.FilterLessEqual(s.Field, last)
466 q.FilterFn(func(hr HookRetired) bool {
467 return !fieldEqual(hr) || hr.ID < s.LastID
468 })
469 }
470 }
471 if s.Asc {
472 q.SortAsc(s.Field, "ID")
473 } else {
474 q.SortDesc(s.Field, "ID")
475 }
476 return nil
477}
478
479// HookRetiredList returns retired webhooks according to filter and sort.
480func HookRetiredList(ctx context.Context, filter HookRetiredFilter, sort HookRetiredSort) ([]HookRetired, error) {
481 q := bstore.QueryDB[HookRetired](ctx, DB)
482 if err := filter.apply(q); err != nil {
483 return nil, err
484 }
485 if err := sort.apply(q); err != nil {
486 return nil, err
487 }
488 return q.List()
489}
490
491// HookNextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
492// kicks the queue.
493func HookNextAttemptAdd(ctx context.Context, filter HookFilter, d time.Duration) (affected int, err error) {
494 err = DB.Write(ctx, func(tx *bstore.Tx) error {
495 q := bstore.QueryTx[Hook](tx)
496 if err := filter.apply(q); err != nil {
497 return err
498 }
499 hooks, err := q.List()
500 if err != nil {
501 return fmt.Errorf("listing matching hooks: %v", err)
502 }
503 for _, h := range hooks {
504 h.NextAttempt = h.NextAttempt.Add(d)
505 if err := tx.Update(&h); err != nil {
506 return err
507 }
508 }
509 affected = len(hooks)
510 return nil
511 })
512 if err != nil {
513 return 0, err
514 }
515 hookqueueKick()
516 return affected, nil
517}
518
519// HookNextAttemptSet sets NextAttempt for all matching messages to a new absolute
520// time and kicks the queue.
521func HookNextAttemptSet(ctx context.Context, filter HookFilter, t time.Time) (affected int, err error) {
522 q := bstore.QueryDB[Hook](ctx, DB)
523 if err := filter.apply(q); err != nil {
524 return 0, err
525 }
526 n, err := q.UpdateNonzero(Hook{NextAttempt: t})
527 if err != nil {
528 return 0, fmt.Errorf("selecting and updating hooks in queue: %v", err)
529 }
530 hookqueueKick()
531 return n, nil
532}
533
534// HookCancel prevents more delivery attempts of the hook, moving it to the
535// retired list if configured.
536func HookCancel(ctx context.Context, log mlog.Log, filter HookFilter) (affected int, err error) {
537 var hooks []Hook
538 err = DB.Write(ctx, func(tx *bstore.Tx) error {
539 q := bstore.QueryTx[Hook](tx)
540 if err := filter.apply(q); err != nil {
541 return err
542 }
543 q.Gather(&hooks)
544 n, err := q.Delete()
545 if err != nil {
546 return fmt.Errorf("selecting and deleting hooks from queue: %v", err)
547 }
548
549 if len(hooks) == 0 {
550 return nil
551 }
552
553 now := time.Now()
554 for _, h := range hooks {
555 keep := hookRetiredKeep(h.Account)
556 if keep > 0 {
557 hr := h.Retired(false, now, now.Add(keep))
558 hr.Results = append(hr.Results, HookResult{Start: now, Error: "canceled by admin"})
559 if err := tx.Insert(&hr); err != nil {
560 return fmt.Errorf("inserting retired hook: %v", err)
561 }
562 }
563 }
564
565 affected = n
566 return nil
567 })
568 if err != nil {
569 return 0, err
570 }
571 for _, h := range hooks {
572 log.Info("canceled hook", h.attrs()...)
573 }
574 hookqueueKick()
575 return affected, nil
576}
577
578func hookCompose(m Msg, url, authz string, event webhook.OutgoingEvent, suppressing bool, code int, secodeOpt string) (Hook, error) {
579 now := time.Now()
580
581 var lastError string
582 if len(m.Results) > 0 {
583 lastError = m.Results[len(m.Results)-1].Error
584 }
585 var ecode string
586 if secodeOpt != "" {
587 ecode = fmt.Sprintf("%d.%s", code/100, secodeOpt)
588 }
589 data := webhook.Outgoing{
590 Event: event,
591 Suppressing: suppressing,
592 QueueMsgID: m.ID,
593 FromID: m.FromID,
594 MessageID: m.MessageID,
595 Subject: m.Subject,
596 WebhookQueued: now,
597 Error: lastError,
598 SMTPCode: code,
599 SMTPEnhancedCode: ecode,
600 Extra: m.Extra,
601 }
602 if data.Extra == nil {
603 data.Extra = map[string]string{}
604 }
605 payload, err := json.Marshal(data)
606 if err != nil {
607 return Hook{}, fmt.Errorf("marshal webhook payload: %v", err)
608 }
609
610 h := Hook{
611 QueueMsgID: m.ID,
612 FromID: m.FromID,
613 MessageID: m.MessageID,
614 Subject: m.Subject,
615 Extra: m.Extra,
616 Account: m.SenderAccount,
617 URL: url,
618 Authorization: authz,
619 IsIncoming: false,
620 OutgoingEvent: string(event),
621 Payload: string(payload),
622 Submitted: now,
623 NextAttempt: now,
624 }
625 return h, nil
626}
627
628// Incoming processes a message delivered over SMTP for webhooks. If the message is
629// a DSN, a webhook for outgoing deliveries may be scheduled (if configured).
630// Otherwise, a webhook for incoming deliveries may be scheduled.
631func Incoming(ctx context.Context, log mlog.Log, acc *store.Account, messageID string, m store.Message, part message.Part, mailboxName string) error {
632 now := time.Now()
633 var data any
634
635 log = log.With(
636 slog.Int64("msgid", m.ID),
637 slog.String("messageid", messageID),
638 slog.String("mailbox", mailboxName),
639 )
640
641 // todo future: if there is no fromid in our rcpt address, but this is a 3-part dsn with headers that includes message-id, try matching based on that.
642 // todo future: once we implement the SMTP DSN extension, use ENVID when sending (if destination implements it), and start looking for Original-Envelope-ID in the DSN.
643
644 // If this is a DSN for a message we sent, don't deliver a hook for incoming
645 // message, but an outgoing status webhook.
646 var fromID string
647 dom, err := dns.ParseDomain(m.RcptToDomain)
648 if err != nil {
649 log.Debugx("parsing recipient domain in incoming message", err)
650 } else {
651 domconf, _ := mox.Conf.Domain(dom)
652 if domconf.LocalpartCatchallSeparator != "" {
653 t := strings.SplitN(string(m.RcptToLocalpart), domconf.LocalpartCatchallSeparator, 2)
654 if len(t) == 2 {
655 fromID = t[1]
656 }
657 }
658 }
659 var outgoingEvent webhook.OutgoingEvent
660 var queueMsgID int64
661 var subject string
662 if fromID != "" {
663 err := DB.Write(ctx, func(tx *bstore.Tx) (rerr error) {
664 mr, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Get()
665 if err == bstore.ErrAbsent {
666 log.Debug("no original message found for fromid", slog.String("fromid", fromID))
667 return nil
668 } else if err != nil {
669 return fmt.Errorf("looking up original message for fromid: %v", err)
670 }
671
672 queueMsgID = mr.ID
673 subject = mr.Subject
674
675 log = log.With(slog.String("fromid", fromID))
676 log.Debug("processing incoming message about previous delivery for webhooks")
677
678 // We'll record this message in the results.
679 mr.LastActivity = now
680 mr.Results = append(mr.Results, MsgResult{Start: now, Error: "incoming message"})
681 result := &mr.Results[len(mr.Results)-1] // Updated below.
682
683 outgoingEvent = webhook.EventUnrecognized
684 var suppressedMsgIDs []int64
685 var isDSN bool
686 var code int
687 var secode string
688 defer func() {
689 if rerr == nil {
690 var ecode string
691 if secode != "" {
692 ecode = fmt.Sprintf("%d.%s", code/100, secode)
693 }
694 data = webhook.Outgoing{
695 Event: outgoingEvent,
696 DSN: isDSN,
697 Suppressing: len(suppressedMsgIDs) > 0,
698 QueueMsgID: mr.ID,
699 FromID: fromID,
700 MessageID: mr.MessageID,
701 Subject: mr.Subject,
702 WebhookQueued: now,
703 SMTPCode: code,
704 SMTPEnhancedCode: ecode,
705 Extra: mr.Extra,
706 }
707
708 if err := tx.Update(&mr); err != nil {
709 rerr = fmt.Errorf("updating retired message after processing: %v", err)
710 return
711 }
712 }
713 }()
714
715 if !(part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT" && len(part.Parts) >= 2 && part.Parts[1].MediaType == "MESSAGE" && (part.Parts[1].MediaSubType == "DELIVERY-STATUS" || part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS")) {
716 // Some kind of delivery-related event, but we don't recognize it.
717 result.Error = "incoming message not a dsn"
718 return nil
719 }
720 isDSN = true
721 dsnutf8 := part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS"
722 dsnmsg, err := dsn.Decode(part.Parts[1].ReaderUTF8OrBinary(), dsnutf8)
723 if err != nil {
724 log.Infox("parsing dsn message for webhook", err)
725 result.Error = fmt.Sprintf("parsing incoming dsn: %v", err)
726 return nil
727 } else if len(dsnmsg.Recipients) != 1 {
728 log.Info("dsn message for webhook does not have exactly one dsn recipient", slog.Int("nrecipients", len(dsnmsg.Recipients)))
729 result.Error = fmt.Sprintf("incoming dsn has %d recipients, expecting 1", len(dsnmsg.Recipients))
730 return nil
731 }
732
733 dsnrcpt := dsnmsg.Recipients[0]
734
735 if dsnrcpt.DiagnosticCodeSMTP != "" {
736 code, secode = parseSMTPCodes(dsnrcpt.DiagnosticCodeSMTP)
737 }
738 if code == 0 && dsnrcpt.Status != "" {
739 if strings.HasPrefix(dsnrcpt.Status, "4.") {
740 code = 400
741 secode = dsnrcpt.Status[2:]
742 } else if strings.HasPrefix(dsnrcpt.Status, "5.") {
743 code = 500
744 secode = dsnrcpt.Status[2:]
745 }
746 }
747 result.Code = code
748 result.Secode = secode
749 log.Debug("incoming dsn message", slog.String("action", string(dsnrcpt.Action)), slog.Int("dsncode", code), slog.String("dsnsecode", secode))
750
751 switch s := dsnrcpt.Action; s {
752 case dsn.Failed:
753 outgoingEvent = webhook.EventFailed
754
755 if code != 0 {
756 sc := suppressionCheck{
757 MsgID: mr.ID,
758 Account: acc.Name,
759 Recipient: mr.Recipient(),
760 Code: code,
761 Secode: secode,
762 Source: "DSN",
763 }
764 suppressedMsgIDs, err = suppressionProcess(log, tx, sc)
765 if err != nil {
766 return fmt.Errorf("processing dsn for suppression list: %v", err)
767 }
768 } else {
769 log.Debug("no code/secode in dsn for failed delivery", slog.Int64("msgid", mr.ID))
770 }
771
772 case dsn.Delayed, dsn.Delivered, dsn.Relayed, dsn.Expanded:
773 outgoingEvent = webhook.OutgoingEvent(string(s))
774 result.Success = s != dsn.Delayed
775
776 default:
777 log.Info("unrecognized dsn action", slog.String("action", string(dsnrcpt.Action)))
778 }
779 return nil
780 })
781 if err != nil {
782 return fmt.Errorf("processing message based on fromid: %v", err)
783 }
784 }
785
786 accConf, _ := acc.Conf()
787
788 var hookURL, authz string
789 var isIncoming bool
790 if data == nil {
791 if accConf.IncomingWebhook == nil {
792 return nil
793 }
794 hookURL = accConf.IncomingWebhook.URL
795 authz = accConf.IncomingWebhook.Authorization
796
797 log.Debug("composing webhook for incoming message")
798
799 isIncoming = true
800 var rcptTo string
801 if m.RcptToDomain != "" {
802 rcptTo = m.RcptToLocalpart.String() + "@" + m.RcptToDomain
803 }
804 in := webhook.Incoming{
805 Structure: webhook.PartStructure(&part),
806 Meta: webhook.IncomingMeta{
807 MsgID: m.ID,
808 MailFrom: m.MailFrom,
809 MailFromValidated: m.MailFromValidated,
810 MsgFromValidated: m.MsgFromValidated,
811 RcptTo: rcptTo,
812 DKIMVerifiedDomains: m.DKIMDomains,
813 RemoteIP: m.RemoteIP,
814 Received: m.Received,
815 MailboxName: mailboxName,
816 },
817 }
818 if in.Meta.DKIMVerifiedDomains == nil {
819 in.Meta.DKIMVerifiedDomains = []string{}
820 }
821 if env := part.Envelope; env != nil {
822 subject = env.Subject
823 in.From = addresses(env.From)
824 in.To = addresses(env.To)
825 in.CC = addresses(env.CC)
826 in.BCC = addresses(env.BCC)
827 in.ReplyTo = addresses(env.ReplyTo)
828 in.Subject = env.Subject
829 in.MessageID = env.MessageID
830 in.InReplyTo = env.InReplyTo
831 if !env.Date.IsZero() {
832 in.Date = &env.Date
833 }
834 }
835 // todo: ideally, we would have this information available in parsed Part, not require parsing headers here.
836 h, err := part.Header()
837 if err != nil {
838 log.Debugx("parsing headers of incoming message", err, slog.Int64("msgid", m.ID))
839 } else {
840 refs, err := message.ReferencedIDs(h.Values("References"), nil)
841 if err != nil {
842 log.Debugx("parsing references header", err, slog.Int64("msgid", m.ID))
843 }
844 for i, r := range refs {
845 refs[i] = "<" + r + ">"
846 }
847 if refs == nil {
848 refs = []string{}
849 }
850 in.References = refs
851
852 // Check if message is automated. Empty SMTP MAIL FROM indicates this was some kind
853 // of service message. Several headers indicate out-of-office replies, messages
854 // from mailing or marketing lists. And the content-type can indicate a report
855 // (e.g. DSN/MDN).
856 in.Meta.Automated = m.MailFrom == "" || isAutomated(h) || part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT"
857 }
858
859 text, html, _, err := webops.ReadableParts(part, 1*1024*1024)
860 if err != nil {
861 log.Debugx("looking for text and html content in message", err)
862 }
863 in.Text = strings.ReplaceAll(text, "\r\n", "\n")
864 in.HTML = strings.ReplaceAll(html, "\r\n", "\n")
865
866 data = in
867 } else if accConf.OutgoingWebhook == nil {
868 return nil
869 } else if len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(outgoingEvent)) {
870 hookURL = accConf.OutgoingWebhook.URL
871 authz = accConf.OutgoingWebhook.Authorization
872 } else {
873 log.Debug("not sending webhook, account not subscribed for event", slog.String("event", string(outgoingEvent)))
874 return nil
875 }
876
877 payload, err := json.Marshal(data)
878 if err != nil {
879 return fmt.Errorf("marshal webhook payload: %v", err)
880 }
881
882 h := Hook{
883 QueueMsgID: queueMsgID,
884 FromID: fromID,
885 MessageID: messageID,
886 Subject: subject,
887 Account: acc.Name,
888 URL: hookURL,
889 Authorization: authz,
890 IsIncoming: isIncoming,
891 OutgoingEvent: string(outgoingEvent),
892 Payload: string(payload),
893 Submitted: now,
894 NextAttempt: now,
895 }
896 err = DB.Write(ctx, func(tx *bstore.Tx) error {
897 if err := hookInsert(tx, &h, now, accConf.KeepRetiredWebhookPeriod); err != nil {
898 return fmt.Errorf("queueing webhook for incoming message: %v", err)
899 }
900 return nil
901 })
902 if err != nil {
903 return fmt.Errorf("inserting webhook in database: %v", err)
904 }
905 log.Debug("queued webhook for incoming message", h.attrs()...)
906 hookqueueKick()
907 return nil
908}
909
910func isAutomated(h textproto.MIMEHeader) bool {
911 l := []string{"List-Id", "List-Unsubscribe", "List-Unsubscribe-Post", "Precedence"}
912 for _, k := range l {
913 if h.Get(k) != "" {
914 return true
915 }
916 }
917 if s := strings.TrimSpace(h.Get("Auto-Submitted")); s != "" && !strings.EqualFold(s, "no") {
918 return true
919 }
920 return false
921}
922
923func parseSMTPCodes(line string) (code int, secode string) {
924 t := strings.SplitN(line, " ", 3)
925 if len(t) <= 1 || len(t[0]) != 3 {
926 return 0, ""
927 }
928 v, err := strconv.ParseUint(t[0], 10, 64)
929 if err != nil || code >= 600 {
930 return 0, ""
931 }
932 if len(t) >= 2 && (strings.HasPrefix(t[1], "4.") || strings.HasPrefix(t[1], "5.")) {
933 secode = t[1][2:]
934 }
935 return int(v), secode
936}
937
938// Insert hook into database, but first retire any existing pending hook for
939// QueueMsgID if it is > 0.
940func hookInsert(tx *bstore.Tx, h *Hook, now time.Time, accountKeepPeriod time.Duration) error {
941 if err := tx.Insert(h); err != nil {
942 return fmt.Errorf("insert webhook: %v", err)
943 }
944 if h.QueueMsgID == 0 {
945 return nil
946 }
947
948 // Find existing queued hook for previously msgid from queue. Can be at most one.
949 oh, err := bstore.QueryTx[Hook](tx).FilterNonzero(Hook{QueueMsgID: h.QueueMsgID}).FilterNotEqual("ID", h.ID).Get()
950 if err == bstore.ErrAbsent {
951 return nil
952 } else if err != nil {
953 return fmt.Errorf("get existing webhook before inserting new hook for same queuemsgid %d", h.QueueMsgID)
954 }
955
956 // Retire this queued hook.
957 // This hook may be in the process of being delivered. When delivered, we'll try to
958 // move it from Hook to HookRetired. But that will fail since Hook is already
959 // retired. We detect that situation and update the retired hook with the new
960 // (final) result.
961 if accountKeepPeriod > 0 {
962 hr := oh.Retired(false, now, now.Add(accountKeepPeriod))
963 hr.SupersededByID = h.ID
964 if err := tx.Insert(&hr); err != nil {
965 return fmt.Errorf("inserting superseded webhook as retired hook: %v", err)
966 }
967 }
968 if err := tx.Delete(&oh); err != nil {
969 return fmt.Errorf("deleting superseded webhook: %v", err)
970 }
971 return nil
972}
973
974func addresses(al []message.Address) []webhook.NameAddress {
975 l := make([]webhook.NameAddress, len(al))
976 for i, a := range al {
977 addr := a.User + "@" + a.Host
978 pa, err := smtp.ParseAddress(addr)
979 if err == nil {
980 addr = pa.Pack(true)
981 }
982 l[i] = webhook.NameAddress{
983 Name: a.Name,
984 Address: addr,
985 }
986 }
987 return l
988}
989
990var (
991 hookqueue = make(chan struct{}, 1)
992 hookDeliveryResults = make(chan string, 1)
993)
994
995func hookqueueKick() {
996 select {
997 case hookqueue <- struct{}{}:
998 default:
999 }
1000}
1001
1002func startHookQueue(done chan struct{}) {
1003 log := mlog.New("queue", nil)
1004 busyHookURLs := map[string]struct{}{}
1005 timer := time.NewTimer(0)
1006 for {
1007 select {
1008 case <-mox.Shutdown.Done():
1009 for len(busyHookURLs) > 0 {
1010 url := <-hookDeliveryResults
1011 delete(busyHookURLs, url)
1012 }
1013 done <- struct{}{}
1014 return
1015 case <-hookqueue:
1016 case <-timer.C:
1017 case url := <-hookDeliveryResults:
1018 delete(busyHookURLs, url)
1019 }
1020
1021 if len(busyHookURLs) >= maxConcurrentHookDeliveries {
1022 continue
1023 }
1024
1025 hookLaunchWork(log, busyHookURLs)
1026 timer.Reset(hookNextWork(mox.Shutdown, log, busyHookURLs))
1027 }
1028}
1029
1030func hookNextWork(ctx context.Context, log mlog.Log, busyURLs map[string]struct{}) time.Duration {
1031 q := bstore.QueryDB[Hook](ctx, DB)
1032 if len(busyURLs) > 0 {
1033 var urls []any
1034 for u := range busyURLs {
1035 urls = append(urls, u)
1036 }
1037 q.FilterNotEqual("URL", urls...)
1038 }
1039 q.SortAsc("NextAttempt")
1040 q.Limit(1)
1041 h, err := q.Get()
1042 if err == bstore.ErrAbsent {
1043 return 24 * time.Hour
1044 } else if err != nil {
1045 log.Errorx("finding time for next webhook delivery attempt", err)
1046 return 1 * time.Minute
1047 }
1048 return time.Until(h.NextAttempt)
1049}
1050
1051func hookLaunchWork(log mlog.Log, busyURLs map[string]struct{}) int {
1052 q := bstore.QueryDB[Hook](mox.Shutdown, DB)
1053 q.FilterLessEqual("NextAttempt", time.Now())
1054 q.SortAsc("NextAttempt")
1055 q.Limit(maxConcurrentHookDeliveries)
1056 if len(busyURLs) > 0 {
1057 var urls []any
1058 for u := range busyURLs {
1059 urls = append(urls, u)
1060 }
1061 q.FilterNotEqual("URL", urls...)
1062 }
1063 var hooks []Hook
1064 seen := map[string]bool{}
1065 err := q.ForEach(func(h Hook) error {
1066 u := h.URL
1067 if _, ok := busyURLs[u]; !ok && !seen[u] {
1068 seen[u] = true
1069 hooks = append(hooks, h)
1070 }
1071 return nil
1072 })
1073 if err != nil {
1074 log.Errorx("querying for work in webhook queue", err)
1075 mox.Sleep(mox.Shutdown, 1*time.Second)
1076 return -1
1077 }
1078
1079 for _, h := range hooks {
1080 busyURLs[h.URL] = struct{}{}
1081 go hookDeliver(log, h)
1082 }
1083 return len(hooks)
1084}
1085
1086var hookIntervals []time.Duration
1087
1088func init() {
1089 const M = time.Minute
1090 const H = time.Hour
1091 hookIntervals = []time.Duration{M, 2 * M, 4 * M, 15 * M / 2, 15 * M, 30 * M, 1 * H, 2 * H, 4 * H, 8 * H, 16 * H}
1092}
1093
1094func hookDeliver(log mlog.Log, h Hook) {
1095 ctx := mox.Shutdown
1096
1097 qlog := log.WithCid(mox.Cid())
1098 qlog.Debug("attempting to deliver webhook", h.attrs()...)
1099 qlog = qlog.With(slog.Int64("webhookid", h.ID))
1100
1101 defer func() {
1102 hookDeliveryResults <- h.URL
1103
1104 x := recover()
1105 if x != nil {
1106 qlog.Error("webhook deliver panic", slog.Any("panic", x))
1107 debug.PrintStack()
1108 metrics.PanicInc(metrics.Queue)
1109 }
1110 }()
1111
1112 // todo: should we get a new webhook url from the config before attempting? would intervene with our "urls busy" approach. may not be worth it.
1113
1114 // Set Attempts & NextAttempt early. In case of failures while processing, at least
1115 // we won't try again immediately. We do backoff at intervals:
1116 var backoff time.Duration
1117 if h.Attempts < len(hookIntervals) {
1118 backoff = hookIntervals[h.Attempts]
1119 } else {
1120 backoff = hookIntervals[len(hookIntervals)-1] * time.Duration(2)
1121 }
1122 backoff += time.Duration(jitter.Intn(200)-100) * backoff / 10000
1123 h.Attempts++
1124 now := time.Now()
1125 h.NextAttempt = now.Add(backoff)
1126 h.Results = append(h.Results, HookResult{Start: now, URL: h.URL, Error: resultErrorDelivering})
1127 result := &h.Results[len(h.Results)-1]
1128 if err := DB.Update(mox.Shutdown, &h); err != nil {
1129 qlog.Errorx("storing webhook delivery attempt", err)
1130 return
1131 }
1132
1133 hctx, cancel := context.WithTimeout(ctx, 60*time.Second)
1134 defer cancel()
1135 t0 := time.Now()
1136 code, response, err := HookPost(hctx, qlog, h.ID, h.Attempts, h.URL, h.Authorization, h.Payload)
1137 result.Duration = time.Since(t0)
1138 result.Success = err == nil
1139 result.Code = code
1140 result.Error = ""
1141 result.Response = response
1142 if err != nil {
1143 result.Error = err.Error()
1144 }
1145 if err != nil && h.Attempts <= len(hookIntervals) {
1146 // We'll try again later, so only update existing record.
1147 qlog.Debugx("webhook delivery failed, will try again later", err)
1148 xerr := DB.Write(context.Background(), func(tx *bstore.Tx) error {
1149 if err := tx.Update(&h); err == bstore.ErrAbsent {
1150 return updateRetiredHook(tx, h, result)
1151 } else if err != nil {
1152 return fmt.Errorf("update webhook after retryable failure: %v", err)
1153 }
1154 return nil
1155 })
1156 qlog.Check(xerr, "updating failed webhook delivery attempt in database", slog.String("deliveryerr", err.Error()))
1157 return
1158 }
1159
1160 qlog.Debugx("webhook delivery completed", err, slog.Bool("success", result.Success))
1161
1162 // Move Hook to HookRetired.
1163 err = DB.Write(context.Background(), func(tx *bstore.Tx) error {
1164 if err := tx.Delete(&h); err == bstore.ErrAbsent {
1165 return updateRetiredHook(tx, h, result)
1166 } else if err != nil {
1167 return fmt.Errorf("removing webhook from database: %v", err)
1168 }
1169 keep := hookRetiredKeep(h.Account)
1170 if keep > 0 {
1171 hr := h.Retired(result.Success, t0, t0.Add(keep))
1172 if err := tx.Insert(&hr); err != nil {
1173 return fmt.Errorf("inserting retired webhook in database: %v", err)
1174 }
1175 }
1176 return nil
1177 })
1178 qlog.Check(err, "moving delivered webhook from to retired hooks")
1179}
1180
1181func updateRetiredHook(tx *bstore.Tx, h Hook, result *HookResult) error {
1182 // Hook is gone. It may have been superseded and moved to HookRetired while we were
1183 // delivering it. If so, add the result to the retired hook.
1184 hr := HookRetired{ID: h.ID}
1185 if err := tx.Get(&hr); err != nil {
1186 return fmt.Errorf("result for webhook that was no longer in webhook queue or retired webhooks: %v", err)
1187 }
1188 result.Error += "(superseded)"
1189 hr.Results = append(hr.Results, *result)
1190 if err := tx.Update(&hr); err != nil {
1191 return fmt.Errorf("updating retired webhook after webhook was superseded during delivery: %v", err)
1192 }
1193 return nil
1194}
1195
1196var hookClient = &http.Client{Transport: hookTransport()}
1197
1198func hookTransport() *http.Transport {
1199 t := http.DefaultTransport.(*http.Transport).Clone()
1200 // Limit resources consumed during idle periods, probably most of the time. But
1201 // during busy periods, we may use the few connections for many events.
1202 t.IdleConnTimeout = 5 * time.Second
1203 t.MaxIdleConnsPerHost = 2
1204 return t
1205}
1206
1207func HookPost(ctx context.Context, log mlog.Log, hookID int64, attempt int, url, authz string, payload string) (code int, response string, err error) {
1208 req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(payload))
1209 if err != nil {
1210 return 0, "", fmt.Errorf("new request: %v", err)
1211 }
1212 req.Header.Set("User-Agent", fmt.Sprintf("mox/%s (webhook)", moxvar.Version))
1213 req.Header.Set("Content-Type", "application/json; charset=utf-8")
1214 req.Header.Set("X-Mox-Webhook-ID", fmt.Sprintf("%d", hookID))
1215 req.Header.Set("X-Mox-Webhook-Attempt", fmt.Sprintf("%d", attempt))
1216 if authz != "" {
1217 req.Header.Set("Authorization", authz)
1218 }
1219 t0 := time.Now()
1220 resp, err := hookClient.Do(req)
1221 metricHookRequest.Observe(float64(time.Since(t0)) / float64(time.Second))
1222 if err != nil {
1223 metricHookResult.WithLabelValues("error").Inc()
1224 log.Debugx("webhook http transaction", err)
1225 return 0, "", fmt.Errorf("http transact: %v", err)
1226 }
1227 defer resp.Body.Close()
1228
1229 // Use full http status code for known codes, and a generic "<major>xx" for others.
1230 result := fmt.Sprintf("%d", resp.StatusCode)
1231 if http.StatusText(resp.StatusCode) == "" {
1232 result = fmt.Sprintf("%dxx", resp.StatusCode/100)
1233 }
1234 metricHookResult.WithLabelValues(result).Inc()
1235 log.Debug("webhook http post result", slog.Int("statuscode", resp.StatusCode), slog.Duration("duration", time.Since(t0)))
1236
1237 respbuf, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
1238 if resp.StatusCode != http.StatusOK {
1239 err = fmt.Errorf("http status %q, expected 200 ok", resp.Status)
1240 }
1241 return resp.StatusCode, string(respbuf), err
1242}
1243