1package queue
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/textproto"
12 "runtime/debug"
13 "slices"
14 "strconv"
15 "strings"
16 "time"
17
18 "github.com/prometheus/client_golang/prometheus"
19 "github.com/prometheus/client_golang/prometheus/promauto"
20
21 "github.com/mjl-/bstore"
22
23 "github.com/mjl-/mox/dns"
24 "github.com/mjl-/mox/dsn"
25 "github.com/mjl-/mox/message"
26 "github.com/mjl-/mox/metrics"
27 "github.com/mjl-/mox/mlog"
28 "github.com/mjl-/mox/mox-"
29 "github.com/mjl-/mox/moxvar"
30 "github.com/mjl-/mox/smtp"
31 "github.com/mjl-/mox/store"
32 "github.com/mjl-/mox/webhook"
33 "github.com/mjl-/mox/webops"
34)
35
36var (
37 metricHookRequest = promauto.NewHistogram(
38 prometheus.HistogramOpts{
39 Name: "mox_webhook_request_duration_seconds",
40 Help: "HTTP webhook call duration.",
41 Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 20, 30},
42 },
43 )
44 metricHookResult = promauto.NewCounterVec(
45 prometheus.CounterOpts{
46 Name: "mox_webhook_results_total",
47 Help: "HTTP webhook call results.",
48 },
49 []string{"code"}, // Known http status codes (e.g. "404"), or "<major>xx" for unknown http status codes, or "error".
50 )
51)
52
53// Hook is a webhook call about a delivery. We'll try delivering with backoff until we succeed or fail.
54type Hook struct {
55 ID int64
56 QueueMsgID int64 `bstore:"index"` // Original queue Msg/MsgRetired ID. Zero for hooks for incoming messages.
57 FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
58 MessageID string // Of outgoing or incoming messages. Includes <>.
59 Subject string // Subject of original outgoing message, or of incoming message.
60 Extra map[string]string // From submitted message.
61
62 Account string `bstore:"nonzero"`
63 URL string `bstore:"nonzero"` // Taken from config when webhook is scheduled.
64 Authorization string // Optional value for authorization header to include in HTTP request.
65 IsIncoming bool
66 OutgoingEvent string // Empty string if not outgoing.
67 Payload string // JSON data to be submitted.
68
69 Submitted time.Time `bstore:"default now,index"`
70 Attempts int
71 NextAttempt time.Time `bstore:"nonzero,index"` // Index for fast scheduling.
72 Results []HookResult
73}
74
75// HookResult is the result of a single attempt to deliver a webhook.
76type HookResult struct {
77 Start time.Time
78 Duration time.Duration
79 URL string
80 Success bool
81 Code int // eg 200, 404, 500. 2xx implies success.
82 Error string
83 Response string // Max 512 bytes of HTTP response body.
84}
85
86// for logging queueing or starting delivery of a hook.
87func (h Hook) attrs() []slog.Attr {
88 event := string(h.OutgoingEvent)
89 if h.IsIncoming {
90 event = "incoming"
91 }
92 return []slog.Attr{
93 slog.Int64("webhookid", h.ID),
94 slog.Int("attempts", h.Attempts),
95 slog.Int64("msgid", h.QueueMsgID),
96 slog.String("account", h.Account),
97 slog.String("url", h.URL),
98 slog.String("fromid", h.FromID),
99 slog.String("messageid", h.MessageID),
100 slog.String("event", event),
101 slog.Time("nextattempt", h.NextAttempt),
102 }
103}
104
105// LastResult returns the last result entry, or an empty result.
106func (h Hook) LastResult() HookResult {
107 if len(h.Results) == 0 {
108 return HookResult{}
109 }
110 return h.Results[len(h.Results)-1]
111}
112
113// Retired returns a HookRetired for a Hook, for insertion into the database.
114func (h Hook) Retired(success bool, lastActivity, keepUntil time.Time) HookRetired {
115 return HookRetired{
116 ID: h.ID,
117 QueueMsgID: h.QueueMsgID,
118 FromID: h.FromID,
119 MessageID: h.MessageID,
120 Subject: h.Subject,
121 Extra: h.Extra,
122 Account: h.Account,
123 URL: h.URL,
124 Authorization: h.Authorization != "",
125 IsIncoming: h.IsIncoming,
126 OutgoingEvent: h.OutgoingEvent,
127 Payload: h.Payload,
128 Submitted: h.Submitted,
129 Attempts: h.Attempts,
130 Results: h.Results,
131 Success: success,
132 LastActivity: lastActivity,
133 KeepUntil: keepUntil,
134 }
135}
136
137// HookRetired is a Hook that was delivered/failed/canceled and kept according
138// to the configuration.
139type HookRetired struct {
140 ID int64 // Same as original Hook.ID.
141 QueueMsgID int64 // Original queue Msg or MsgRetired ID. Zero for hooks for incoming messages.
142 FromID string // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
143 MessageID string // Of outgoing or incoming messages. Includes <>.
144 Subject string // Subject of original outgoing message, or of incoming message.
145 Extra map[string]string // From submitted message.
146
147 Account string `bstore:"nonzero,index Account+LastActivity"`
148 URL string `bstore:"nonzero"` // Taken from config at start of each attempt.
149 Authorization bool // Whether request had authorization without keeping it around.
150 IsIncoming bool
151 OutgoingEvent string
152 Payload string // JSON data submitted.
153
154 Submitted time.Time
155 SupersededByID int64 // If not 0, a Hook.ID that superseded this one and Done will be true.
156 Attempts int
157 Results []HookResult
158
159 Success bool
160 LastActivity time.Time `bstore:"index"`
161 KeepUntil time.Time `bstore:"index"`
162}
163
164// LastResult returns the last result entry, or an empty result.
165func (h HookRetired) LastResult() HookResult {
166 if len(h.Results) == 0 {
167 return HookResult{}
168 }
169 return h.Results[len(h.Results)-1]
170}
171
172func cleanupHookRetired(done chan struct{}) {
173 log := mlog.New("queue", nil)
174
175 defer func() {
176 x := recover()
177 if x != nil {
178 log.Error("unhandled panic while cleaning up retired webhooks", slog.Any("x", x))
179 debug.PrintStack()
180 metrics.PanicInc(metrics.Queue)
181 }
182 }()
183
184 timer := time.NewTimer(4 * time.Second)
185 for {
186 select {
187 case <-mox.Shutdown.Done():
188 done <- struct{}{}
189 return
190 case <-timer.C:
191 }
192
193 cleanupHookRetiredSingle(log)
194 timer.Reset(time.Hour)
195 }
196}
197
198func cleanupHookRetiredSingle(log mlog.Log) {
199 n, err := bstore.QueryDB[HookRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
200 log.Check(err, "removing old retired webhooks")
201 if n > 0 {
202 log.Debug("cleaned up retired webhooks", slog.Int("count", n))
203 }
204}
205
206func hookRetiredKeep(account string) time.Duration {
207 keep := 24 * 7 * time.Hour
208 accConf, ok := mox.Conf.Account(account)
209 if ok {
210 keep = accConf.KeepRetiredWebhookPeriod
211 }
212 return keep
213}
214
215// HookFilter filters messages to list or operate on. Used by admin web interface
216// and cli.
217//
218// Only non-empty/non-zero values are applied to the filter. Leaving all fields
219// empty/zero matches all hooks.
220type HookFilter struct {
221 Max int
222 IDs []int64
223 Account string
224 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
225 NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
226 Event string // Including "incoming".
227}
228
229func (f HookFilter) apply(q *bstore.Query[Hook]) error {
230 if len(f.IDs) > 0 {
231 q.FilterIDs(f.IDs)
232 }
233 applyTime := func(field string, s string) error {
234 orig := s
235 var less bool
236 if strings.HasPrefix(s, "<") {
237 less = true
238 } else if !strings.HasPrefix(s, ">") {
239 return fmt.Errorf(`must start with "<" for less or ">" for greater than a duration ago`)
240 }
241 s = strings.TrimSpace(s[1:])
242 var t time.Time
243 if s == "now" {
244 t = time.Now()
245 } else if d, err := time.ParseDuration(s); err != nil {
246 return fmt.Errorf("parsing duration %q: %v", orig, err)
247 } else {
248 t = time.Now().Add(d)
249 }
250 if less {
251 q.FilterLess(field, t)
252 } else {
253 q.FilterGreater(field, t)
254 }
255 return nil
256 }
257 if f.Submitted != "" {
258 if err := applyTime("Submitted", f.Submitted); err != nil {
259 return fmt.Errorf("applying filter for submitted: %v", err)
260 }
261 }
262 if f.NextAttempt != "" {
263 if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
264 return fmt.Errorf("applying filter for next attempt: %v", err)
265 }
266 }
267 if f.Account != "" {
268 q.FilterNonzero(Hook{Account: f.Account})
269 }
270 if f.Event != "" {
271 if f.Event == "incoming" {
272 q.FilterNonzero(Hook{IsIncoming: true})
273 } else {
274 q.FilterNonzero(Hook{OutgoingEvent: f.Event})
275 }
276 }
277 if f.Max != 0 {
278 q.Limit(f.Max)
279 }
280 return nil
281}
282
283type HookSort struct {
284 Field string // "Queued" or "NextAttempt"/"".
285 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
286 Last any // Value of Field for last object. Must be set iff LastID is set.
287 Asc bool // Ascending, or descending.
288}
289
290func (s HookSort) apply(q *bstore.Query[Hook]) error {
291 switch s.Field {
292 case "", "NextAttempt":
293 s.Field = "NextAttempt"
294 case "Submitted":
295 s.Field = "Submitted"
296 default:
297 return fmt.Errorf("unknown sort order field %q", s.Field)
298 }
299
300 if s.LastID > 0 {
301 ls, ok := s.Last.(string)
302 if !ok {
303 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
304 }
305 last, err := time.Parse(time.RFC3339Nano, ls)
306 if err != nil {
307 last, err = time.Parse(time.RFC3339, ls)
308 }
309 if err != nil {
310 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
311 }
312 q.FilterNotEqual("ID", s.LastID)
313 var fieldEqual func(h Hook) bool
314 if s.Field == "NextAttempt" {
315 fieldEqual = func(h Hook) bool { return h.NextAttempt.Equal(last) }
316 } else {
317 fieldEqual = func(h Hook) bool { return h.Submitted.Equal(last) }
318 }
319 if s.Asc {
320 q.FilterGreaterEqual(s.Field, last)
321 q.FilterFn(func(h Hook) bool {
322 return !fieldEqual(h) || h.ID > s.LastID
323 })
324 } else {
325 q.FilterLessEqual(s.Field, last)
326 q.FilterFn(func(h Hook) bool {
327 return !fieldEqual(h) || h.ID < s.LastID
328 })
329 }
330 }
331 if s.Asc {
332 q.SortAsc(s.Field, "ID")
333 } else {
334 q.SortDesc(s.Field, "ID")
335 }
336 return nil
337}
338
339// HookQueueSize returns the number of webhooks in the queue.
340func HookQueueSize(ctx context.Context) (int, error) {
341 return bstore.QueryDB[Hook](ctx, DB).Count()
342}
343
344// HookList returns webhooks according to filter and sort.
345func HookList(ctx context.Context, filter HookFilter, sort HookSort) ([]Hook, error) {
346 q := bstore.QueryDB[Hook](ctx, DB)
347 if err := filter.apply(q); err != nil {
348 return nil, err
349 }
350 if err := sort.apply(q); err != nil {
351 return nil, err
352 }
353 return q.List()
354}
355
356// HookRetiredFilter filters messages to list or operate on. Used by admin web interface
357// and cli.
358//
359// Only non-empty/non-zero values are applied to the filter. Leaving all fields
360// empty/zero matches all hooks.
361type HookRetiredFilter struct {
362 Max int
363 IDs []int64
364 Account string
365 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
366 LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
367 Event string // Including "incoming".
368}
369
370func (f HookRetiredFilter) apply(q *bstore.Query[HookRetired]) error {
371 if len(f.IDs) > 0 {
372 q.FilterIDs(f.IDs)
373 }
374 applyTime := func(field string, s string) error {
375 orig := s
376 var less bool
377 if strings.HasPrefix(s, "<") {
378 less = true
379 } else if !strings.HasPrefix(s, ">") {
380 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
381 }
382 s = strings.TrimSpace(s[1:])
383 var t time.Time
384 if s == "now" {
385 t = time.Now()
386 } else if d, err := time.ParseDuration(s); err != nil {
387 return fmt.Errorf("parsing duration %q: %v", orig, err)
388 } else {
389 t = time.Now().Add(d)
390 }
391 if less {
392 q.FilterLess(field, t)
393 } else {
394 q.FilterGreater(field, t)
395 }
396 return nil
397 }
398 if f.Submitted != "" {
399 if err := applyTime("Submitted", f.Submitted); err != nil {
400 return fmt.Errorf("applying filter for submitted: %v", err)
401 }
402 }
403 if f.LastActivity != "" {
404 if err := applyTime("LastActivity", f.LastActivity); err != nil {
405 return fmt.Errorf("applying filter for last activity: %v", err)
406 }
407 }
408 if f.Account != "" {
409 q.FilterNonzero(HookRetired{Account: f.Account})
410 }
411 if f.Event != "" {
412 if f.Event == "incoming" {
413 q.FilterNonzero(HookRetired{IsIncoming: true})
414 } else {
415 q.FilterNonzero(HookRetired{OutgoingEvent: f.Event})
416 }
417 }
418 if f.Max != 0 {
419 q.Limit(f.Max)
420 }
421 return nil
422}
423
424type HookRetiredSort struct {
425 Field string // "Queued" or "LastActivity"/"".
426 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
427 Last any // Value of Field for last object. Must be set iff LastID is set.
428 Asc bool // Ascending, or descending.
429}
430
431func (s HookRetiredSort) apply(q *bstore.Query[HookRetired]) error {
432 switch s.Field {
433 case "", "LastActivity":
434 s.Field = "LastActivity"
435 case "Submitted":
436 s.Field = "Submitted"
437 default:
438 return fmt.Errorf("unknown sort order field %q", s.Field)
439 }
440
441 if s.LastID > 0 {
442 ls, ok := s.Last.(string)
443 if !ok {
444 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
445 }
446 last, err := time.Parse(time.RFC3339Nano, ls)
447 if err != nil {
448 last, err = time.Parse(time.RFC3339, ls)
449 }
450 if err != nil {
451 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
452 }
453 q.FilterNotEqual("ID", s.LastID)
454 var fieldEqual func(hr HookRetired) bool
455 if s.Field == "LastActivity" {
456 fieldEqual = func(hr HookRetired) bool { return hr.LastActivity.Equal(last) }
457 } else {
458 fieldEqual = func(hr HookRetired) bool { return hr.Submitted.Equal(last) }
459 }
460 if s.Asc {
461 q.FilterGreaterEqual(s.Field, last)
462 q.FilterFn(func(hr HookRetired) bool {
463 return !fieldEqual(hr) || hr.ID > s.LastID
464 })
465 } else {
466 q.FilterLessEqual(s.Field, last)
467 q.FilterFn(func(hr HookRetired) bool {
468 return !fieldEqual(hr) || hr.ID < s.LastID
469 })
470 }
471 }
472 if s.Asc {
473 q.SortAsc(s.Field, "ID")
474 } else {
475 q.SortDesc(s.Field, "ID")
476 }
477 return nil
478}
479
480// HookRetiredList returns retired webhooks according to filter and sort.
481func HookRetiredList(ctx context.Context, filter HookRetiredFilter, sort HookRetiredSort) ([]HookRetired, error) {
482 q := bstore.QueryDB[HookRetired](ctx, DB)
483 if err := filter.apply(q); err != nil {
484 return nil, err
485 }
486 if err := sort.apply(q); err != nil {
487 return nil, err
488 }
489 return q.List()
490}
491
492// HookNextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
493// kicks the queue.
494func HookNextAttemptAdd(ctx context.Context, filter HookFilter, d time.Duration) (affected int, err error) {
495 err = DB.Write(ctx, func(tx *bstore.Tx) error {
496 q := bstore.QueryTx[Hook](tx)
497 if err := filter.apply(q); err != nil {
498 return err
499 }
500 hooks, err := q.List()
501 if err != nil {
502 return fmt.Errorf("listing matching hooks: %v", err)
503 }
504 for _, h := range hooks {
505 h.NextAttempt = h.NextAttempt.Add(d)
506 if err := tx.Update(&h); err != nil {
507 return err
508 }
509 }
510 affected = len(hooks)
511 return nil
512 })
513 if err != nil {
514 return 0, err
515 }
516 hookqueueKick()
517 return affected, nil
518}
519
520// HookNextAttemptSet sets NextAttempt for all matching messages to a new absolute
521// time and kicks the queue.
522func HookNextAttemptSet(ctx context.Context, filter HookFilter, t time.Time) (affected int, err error) {
523 q := bstore.QueryDB[Hook](ctx, DB)
524 if err := filter.apply(q); err != nil {
525 return 0, err
526 }
527 n, err := q.UpdateNonzero(Hook{NextAttempt: t})
528 if err != nil {
529 return 0, fmt.Errorf("selecting and updating hooks in queue: %v", err)
530 }
531 hookqueueKick()
532 return n, nil
533}
534
535// HookCancel prevents more delivery attempts of the hook, moving it to the
536// retired list if configured.
537func HookCancel(ctx context.Context, log mlog.Log, filter HookFilter) (affected int, err error) {
538 var hooks []Hook
539 err = DB.Write(ctx, func(tx *bstore.Tx) error {
540 q := bstore.QueryTx[Hook](tx)
541 if err := filter.apply(q); err != nil {
542 return err
543 }
544 q.Gather(&hooks)
545 n, err := q.Delete()
546 if err != nil {
547 return fmt.Errorf("selecting and deleting hooks from queue: %v", err)
548 }
549
550 if len(hooks) == 0 {
551 return nil
552 }
553
554 now := time.Now()
555 for _, h := range hooks {
556 keep := hookRetiredKeep(h.Account)
557 if keep > 0 {
558 hr := h.Retired(false, now, now.Add(keep))
559 hr.Results = append(hr.Results, HookResult{Start: now, Error: "canceled by admin"})
560 if err := tx.Insert(&hr); err != nil {
561 return fmt.Errorf("inserting retired hook: %v", err)
562 }
563 }
564 }
565
566 affected = n
567 return nil
568 })
569 if err != nil {
570 return 0, err
571 }
572 for _, h := range hooks {
573 log.Info("canceled hook", h.attrs()...)
574 }
575 hookqueueKick()
576 return affected, nil
577}
578
579func hookCompose(m Msg, url, authz string, event webhook.OutgoingEvent, suppressing bool, code int, secodeOpt string) (Hook, error) {
580 now := time.Now()
581
582 var lastError string
583 if len(m.Results) > 0 {
584 lastError = m.Results[len(m.Results)-1].Error
585 }
586 var ecode string
587 if secodeOpt != "" {
588 ecode = fmt.Sprintf("%d.%s", code/100, secodeOpt)
589 }
590 data := webhook.Outgoing{
591 Event: event,
592 Suppressing: suppressing,
593 QueueMsgID: m.ID,
594 FromID: m.FromID,
595 MessageID: m.MessageID,
596 Subject: m.Subject,
597 WebhookQueued: now,
598 Error: lastError,
599 SMTPCode: code,
600 SMTPEnhancedCode: ecode,
601 Extra: m.Extra,
602 }
603 if data.Extra == nil {
604 data.Extra = map[string]string{}
605 }
606 payload, err := json.Marshal(data)
607 if err != nil {
608 return Hook{}, fmt.Errorf("marshal webhook payload: %v", err)
609 }
610
611 h := Hook{
612 QueueMsgID: m.ID,
613 FromID: m.FromID,
614 MessageID: m.MessageID,
615 Subject: m.Subject,
616 Extra: m.Extra,
617 Account: m.SenderAccount,
618 URL: url,
619 Authorization: authz,
620 IsIncoming: false,
621 OutgoingEvent: string(event),
622 Payload: string(payload),
623 Submitted: now,
624 NextAttempt: now,
625 }
626 return h, nil
627}
628
629// Incoming processes a message delivered over SMTP for webhooks. If the message is
630// a DSN, a webhook for outgoing deliveries may be scheduled (if configured).
631// Otherwise, a webhook for incoming deliveries may be scheduled.
632func Incoming(ctx context.Context, log mlog.Log, acc *store.Account, messageID string, m store.Message, part message.Part, mailboxName string) error {
633 now := time.Now()
634 var data any
635
636 log = log.With(
637 slog.Int64("msgid", m.ID),
638 slog.String("messageid", messageID),
639 slog.String("mailbox", mailboxName),
640 )
641
642 // todo future: if there is no fromid in our rcpt address, but this is a 3-part dsn with headers that includes message-id, try matching based on that.
643 // todo future: once we implement the SMTP DSN extension, use ENVID when sending (if destination implements it), and start looking for Original-Envelope-ID in the DSN.
644
645 // If this is a DSN for a message we sent, don't deliver a hook for incoming
646 // message, but an outgoing status webhook.
647 var fromID string
648 dom, err := dns.ParseDomain(m.RcptToDomain)
649 if err != nil {
650 log.Debugx("parsing recipient domain in incoming message", err)
651 } else {
652 domconf, _ := mox.Conf.Domain(dom)
653 if domconf.LocalpartCatchallSeparator != "" {
654 t := strings.SplitN(string(m.RcptToLocalpart), domconf.LocalpartCatchallSeparator, 2)
655 if len(t) == 2 {
656 fromID = t[1]
657 }
658 }
659 }
660 var outgoingEvent webhook.OutgoingEvent
661 var queueMsgID int64
662 var subject string
663 if fromID != "" {
664 err := DB.Write(ctx, func(tx *bstore.Tx) (rerr error) {
665 mr, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Get()
666 if err == bstore.ErrAbsent {
667 log.Debug("no original message found for fromid", slog.String("fromid", fromID))
668 return nil
669 } else if err != nil {
670 return fmt.Errorf("looking up original message for fromid: %v", err)
671 }
672
673 queueMsgID = mr.ID
674 subject = mr.Subject
675
676 log = log.With(slog.String("fromid", fromID))
677 log.Debug("processing incoming message about previous delivery for webhooks")
678
679 // We'll record this message in the results.
680 mr.LastActivity = now
681 mr.Results = append(mr.Results, MsgResult{Start: now, Error: "incoming message"})
682 result := &mr.Results[len(mr.Results)-1] // Updated below.
683
684 outgoingEvent = webhook.EventUnrecognized
685 var suppressedMsgIDs []int64
686 var isDSN bool
687 var code int
688 var secode string
689 defer func() {
690 if rerr == nil {
691 var ecode string
692 if secode != "" {
693 ecode = fmt.Sprintf("%d.%s", code/100, secode)
694 }
695 data = webhook.Outgoing{
696 Event: outgoingEvent,
697 DSN: isDSN,
698 Suppressing: len(suppressedMsgIDs) > 0,
699 QueueMsgID: mr.ID,
700 FromID: fromID,
701 MessageID: mr.MessageID,
702 Subject: mr.Subject,
703 WebhookQueued: now,
704 SMTPCode: code,
705 SMTPEnhancedCode: ecode,
706 Extra: mr.Extra,
707 }
708
709 if err := tx.Update(&mr); err != nil {
710 rerr = fmt.Errorf("updating retired message after processing: %v", err)
711 return
712 }
713 }
714 }()
715
716 if !(part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT" && len(part.Parts) >= 2 && part.Parts[1].MediaType == "MESSAGE" && (part.Parts[1].MediaSubType == "DELIVERY-STATUS" || part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS")) {
717 // Some kind of delivery-related event, but we don't recognize it.
718 result.Error = "incoming message not a dsn"
719 return nil
720 }
721 isDSN = true
722 dsnutf8 := part.Parts[1].MediaSubType == "GLOBAL-DELIVERY-STATUS"
723 dsnmsg, err := dsn.Decode(part.Parts[1].ReaderUTF8OrBinary(), dsnutf8)
724 if err != nil {
725 log.Infox("parsing dsn message for webhook", err)
726 result.Error = fmt.Sprintf("parsing incoming dsn: %v", err)
727 return nil
728 } else if len(dsnmsg.Recipients) != 1 {
729 log.Info("dsn message for webhook does not have exactly one dsn recipient", slog.Int("nrecipients", len(dsnmsg.Recipients)))
730 result.Error = fmt.Sprintf("incoming dsn has %d recipients, expecting 1", len(dsnmsg.Recipients))
731 return nil
732 }
733
734 dsnrcpt := dsnmsg.Recipients[0]
735
736 if dsnrcpt.DiagnosticCodeSMTP != "" {
737 code, secode = parseSMTPCodes(dsnrcpt.DiagnosticCodeSMTP)
738 }
739 if code == 0 && dsnrcpt.Status != "" {
740 if strings.HasPrefix(dsnrcpt.Status, "4.") {
741 code = 400
742 secode = dsnrcpt.Status[2:]
743 } else if strings.HasPrefix(dsnrcpt.Status, "5.") {
744 code = 500
745 secode = dsnrcpt.Status[2:]
746 }
747 }
748 result.Code = code
749 result.Secode = secode
750 log.Debug("incoming dsn message", slog.String("action", string(dsnrcpt.Action)), slog.Int("dsncode", code), slog.String("dsnsecode", secode))
751
752 switch s := dsnrcpt.Action; s {
753 case dsn.Failed:
754 outgoingEvent = webhook.EventFailed
755
756 if code != 0 {
757 sc := suppressionCheck{
758 MsgID: mr.ID,
759 Account: acc.Name,
760 Recipient: mr.Recipient(),
761 Code: code,
762 Secode: secode,
763 Source: "DSN",
764 }
765 suppressedMsgIDs, err = suppressionProcess(log, tx, sc)
766 if err != nil {
767 return fmt.Errorf("processing dsn for suppression list: %v", err)
768 }
769 } else {
770 log.Debug("no code/secode in dsn for failed delivery", slog.Int64("msgid", mr.ID))
771 }
772
773 case dsn.Delayed, dsn.Delivered, dsn.Relayed, dsn.Expanded:
774 outgoingEvent = webhook.OutgoingEvent(string(s))
775 result.Success = s != dsn.Delayed
776
777 default:
778 log.Info("unrecognized dsn action", slog.String("action", string(dsnrcpt.Action)))
779 }
780 return nil
781 })
782 if err != nil {
783 return fmt.Errorf("processing message based on fromid: %v", err)
784 }
785 }
786
787 accConf, _ := acc.Conf()
788
789 var hookURL, authz string
790 var isIncoming bool
791 if data == nil {
792 if accConf.IncomingWebhook == nil {
793 return nil
794 }
795 hookURL = accConf.IncomingWebhook.URL
796 authz = accConf.IncomingWebhook.Authorization
797
798 log.Debug("composing webhook for incoming message")
799
800 structure, err := PartStructure(log, &part)
801 if err != nil {
802 return fmt.Errorf("parsing part structure: %v", err)
803 }
804
805 isIncoming = true
806 var rcptTo string
807 if m.RcptToDomain != "" {
808 rcptTo = m.RcptToLocalpart.String() + "@" + m.RcptToDomain
809 }
810 in := webhook.Incoming{
811 Structure: structure,
812 Meta: webhook.IncomingMeta{
813 MsgID: m.ID,
814 MailFrom: m.MailFrom,
815 MailFromValidated: m.MailFromValidated,
816 MsgFromValidated: m.MsgFromValidated,
817 RcptTo: rcptTo,
818 DKIMVerifiedDomains: m.DKIMDomains,
819 RemoteIP: m.RemoteIP,
820 Received: m.Received,
821 MailboxName: mailboxName,
822 },
823 }
824 if in.Meta.DKIMVerifiedDomains == nil {
825 in.Meta.DKIMVerifiedDomains = []string{}
826 }
827 if env := part.Envelope; env != nil {
828 subject = env.Subject
829 in.From = addresses(env.From)
830 in.To = addresses(env.To)
831 in.CC = addresses(env.CC)
832 in.BCC = addresses(env.BCC)
833 in.ReplyTo = addresses(env.ReplyTo)
834 in.Subject = env.Subject
835 in.MessageID = env.MessageID
836 in.InReplyTo = env.InReplyTo
837 if !env.Date.IsZero() {
838 in.Date = &env.Date
839 }
840 }
841 // todo: ideally, we would have this information available in parsed Part, not require parsing headers here.
842 h, err := part.Header()
843 if err != nil {
844 log.Debugx("parsing headers of incoming message", err, slog.Int64("msgid", m.ID))
845 } else {
846 refs, err := message.ReferencedIDs(h.Values("References"), nil)
847 if err != nil {
848 log.Debugx("parsing references header", err, slog.Int64("msgid", m.ID))
849 }
850 for i, r := range refs {
851 refs[i] = "<" + r + ">"
852 }
853 if refs == nil {
854 refs = []string{}
855 }
856 in.References = refs
857
858 // Check if message is automated. Empty SMTP MAIL FROM indicates this was some kind
859 // of service message. Several headers indicate out-of-office replies, messages
860 // from mailing or marketing lists. And the content-type can indicate a report
861 // (e.g. DSN/MDN).
862 in.Meta.Automated = m.MailFrom == "" || isAutomated(h) || part.MediaType == "MULTIPART" && part.MediaSubType == "REPORT"
863 }
864
865 text, html, _, err := webops.ReadableParts(part, 1*1024*1024)
866 if err != nil {
867 log.Debugx("looking for text and html content in message", err)
868 }
869 in.Text = strings.ReplaceAll(text, "\r\n", "\n")
870 in.HTML = strings.ReplaceAll(html, "\r\n", "\n")
871
872 data = in
873 } else if accConf.OutgoingWebhook == nil {
874 return nil
875 } else if len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(outgoingEvent)) {
876 hookURL = accConf.OutgoingWebhook.URL
877 authz = accConf.OutgoingWebhook.Authorization
878 } else {
879 log.Debug("not sending webhook, account not subscribed for event", slog.String("event", string(outgoingEvent)))
880 return nil
881 }
882
883 payload, err := json.Marshal(data)
884 if err != nil {
885 return fmt.Errorf("marshal webhook payload: %v", err)
886 }
887
888 h := Hook{
889 QueueMsgID: queueMsgID,
890 FromID: fromID,
891 MessageID: messageID,
892 Subject: subject,
893 Account: acc.Name,
894 URL: hookURL,
895 Authorization: authz,
896 IsIncoming: isIncoming,
897 OutgoingEvent: string(outgoingEvent),
898 Payload: string(payload),
899 Submitted: now,
900 NextAttempt: now,
901 }
902 err = DB.Write(ctx, func(tx *bstore.Tx) error {
903 if err := hookInsert(tx, &h, now, accConf.KeepRetiredWebhookPeriod); err != nil {
904 return fmt.Errorf("queueing webhook for incoming message: %v", err)
905 }
906 return nil
907 })
908 if err != nil {
909 return fmt.Errorf("inserting webhook in database: %v", err)
910 }
911 log.Debug("queued webhook for incoming message", h.attrs()...)
912 hookqueueKick()
913 return nil
914}
915
916// PartStructure returns a webhook.Structure for a parsed message part.
917func PartStructure(log mlog.Log, p *message.Part) (webhook.Structure, error) {
918 parts := make([]webhook.Structure, len(p.Parts))
919 for i := range p.Parts {
920 var err error
921 parts[i], err = PartStructure(log, &p.Parts[i])
922 if err != nil && !errors.Is(err, message.ErrParamEncoding) {
923 return webhook.Structure{}, err
924 }
925 }
926 disp, filename, err := p.DispositionFilename()
927 if err != nil && errors.Is(err, message.ErrParamEncoding) {
928 log.Debugx("parsing disposition/filename", err)
929 } else if err != nil {
930 return webhook.Structure{}, err
931 }
932 s := webhook.Structure{
933 ContentType: strings.ToLower(p.MediaType + "/" + p.MediaSubType),
934 ContentTypeParams: p.ContentTypeParams,
935 ContentID: p.ContentID,
936 ContentDisposition: strings.ToLower(disp),
937 Filename: filename,
938 DecodedSize: p.DecodedSize,
939 Parts: parts,
940 }
941 // Replace nil map with empty map, for easier to use JSON.
942 if s.ContentTypeParams == nil {
943 s.ContentTypeParams = map[string]string{}
944 }
945 return s, nil
946}
947
948func isAutomated(h textproto.MIMEHeader) bool {
949 l := []string{"List-Id", "List-Unsubscribe", "List-Unsubscribe-Post", "Precedence"}
950 for _, k := range l {
951 if h.Get(k) != "" {
952 return true
953 }
954 }
955 if s := strings.TrimSpace(h.Get("Auto-Submitted")); s != "" && !strings.EqualFold(s, "no") {
956 return true
957 }
958 return false
959}
960
961func parseSMTPCodes(line string) (code int, secode string) {
962 t := strings.SplitN(line, " ", 3)
963 if len(t) <= 1 || len(t[0]) != 3 {
964 return 0, ""
965 }
966 v, err := strconv.ParseUint(t[0], 10, 64)
967 if err != nil || code >= 600 {
968 return 0, ""
969 }
970 if len(t) >= 2 && (strings.HasPrefix(t[1], "4.") || strings.HasPrefix(t[1], "5.")) {
971 secode = t[1][2:]
972 }
973 return int(v), secode
974}
975
976// Insert hook into database, but first retire any existing pending hook for
977// QueueMsgID if it is > 0.
978func hookInsert(tx *bstore.Tx, h *Hook, now time.Time, accountKeepPeriod time.Duration) error {
979 if err := tx.Insert(h); err != nil {
980 return fmt.Errorf("insert webhook: %v", err)
981 }
982 if h.QueueMsgID == 0 {
983 return nil
984 }
985
986 // Find existing queued hook for previously msgid from queue. Can be at most one.
987 oh, err := bstore.QueryTx[Hook](tx).FilterNonzero(Hook{QueueMsgID: h.QueueMsgID}).FilterNotEqual("ID", h.ID).Get()
988 if err == bstore.ErrAbsent {
989 return nil
990 } else if err != nil {
991 return fmt.Errorf("get existing webhook before inserting new hook for same queuemsgid %d", h.QueueMsgID)
992 }
993
994 // Retire this queued hook.
995 // This hook may be in the process of being delivered. When delivered, we'll try to
996 // move it from Hook to HookRetired. But that will fail since Hook is already
997 // retired. We detect that situation and update the retired hook with the new
998 // (final) result.
999 if accountKeepPeriod > 0 {
1000 hr := oh.Retired(false, now, now.Add(accountKeepPeriod))
1001 hr.SupersededByID = h.ID
1002 if err := tx.Insert(&hr); err != nil {
1003 return fmt.Errorf("inserting superseded webhook as retired hook: %v", err)
1004 }
1005 }
1006 if err := tx.Delete(&oh); err != nil {
1007 return fmt.Errorf("deleting superseded webhook: %v", err)
1008 }
1009 return nil
1010}
1011
1012func addresses(al []message.Address) []webhook.NameAddress {
1013 l := make([]webhook.NameAddress, len(al))
1014 for i, a := range al {
1015 addr := a.User + "@" + a.Host
1016 pa, err := smtp.ParseAddress(addr)
1017 if err == nil {
1018 addr = pa.Pack(true)
1019 }
1020 l[i] = webhook.NameAddress{
1021 Name: a.Name,
1022 Address: addr,
1023 }
1024 }
1025 return l
1026}
1027
1028var (
1029 hookqueue = make(chan struct{}, 1)
1030 hookDeliveryResults = make(chan string, 1)
1031)
1032
1033func hookqueueKick() {
1034 select {
1035 case hookqueue <- struct{}{}:
1036 default:
1037 }
1038}
1039
1040func startHookQueue(done chan struct{}) {
1041 log := mlog.New("queue", nil)
1042 busyHookURLs := map[string]struct{}{}
1043 timer := time.NewTimer(0)
1044 for {
1045 select {
1046 case <-mox.Shutdown.Done():
1047 for len(busyHookURLs) > 0 {
1048 url := <-hookDeliveryResults
1049 delete(busyHookURLs, url)
1050 }
1051 done <- struct{}{}
1052 return
1053 case <-hookqueue:
1054 case <-timer.C:
1055 case url := <-hookDeliveryResults:
1056 delete(busyHookURLs, url)
1057 }
1058
1059 if len(busyHookURLs) >= maxConcurrentHookDeliveries {
1060 continue
1061 }
1062
1063 hookLaunchWork(log, busyHookURLs)
1064 timer.Reset(hookNextWork(mox.Shutdown, log, busyHookURLs))
1065 }
1066}
1067
1068func hookNextWork(ctx context.Context, log mlog.Log, busyURLs map[string]struct{}) time.Duration {
1069 q := bstore.QueryDB[Hook](ctx, DB)
1070 if len(busyURLs) > 0 {
1071 var urls []any
1072 for u := range busyURLs {
1073 urls = append(urls, u)
1074 }
1075 q.FilterNotEqual("URL", urls...)
1076 }
1077 q.SortAsc("NextAttempt")
1078 q.Limit(1)
1079 h, err := q.Get()
1080 if err == bstore.ErrAbsent {
1081 return 24 * time.Hour
1082 } else if err != nil {
1083 log.Errorx("finding time for next webhook delivery attempt", err)
1084 return 1 * time.Minute
1085 }
1086 return time.Until(h.NextAttempt)
1087}
1088
1089func hookLaunchWork(log mlog.Log, busyURLs map[string]struct{}) int {
1090 q := bstore.QueryDB[Hook](mox.Shutdown, DB)
1091 q.FilterLessEqual("NextAttempt", time.Now())
1092 q.SortAsc("NextAttempt")
1093 q.Limit(maxConcurrentHookDeliveries)
1094 if len(busyURLs) > 0 {
1095 var urls []any
1096 for u := range busyURLs {
1097 urls = append(urls, u)
1098 }
1099 q.FilterNotEqual("URL", urls...)
1100 }
1101 var hooks []Hook
1102 seen := map[string]bool{}
1103 err := q.ForEach(func(h Hook) error {
1104 u := h.URL
1105 if _, ok := busyURLs[u]; !ok && !seen[u] {
1106 seen[u] = true
1107 hooks = append(hooks, h)
1108 }
1109 return nil
1110 })
1111 if err != nil {
1112 log.Errorx("querying for work in webhook queue", err)
1113 mox.Sleep(mox.Shutdown, 1*time.Second)
1114 return -1
1115 }
1116
1117 for _, h := range hooks {
1118 busyURLs[h.URL] = struct{}{}
1119 go hookDeliver(log, h)
1120 }
1121 return len(hooks)
1122}
1123
1124var hookIntervals []time.Duration
1125
1126func init() {
1127 const M = time.Minute
1128 const H = time.Hour
1129 hookIntervals = []time.Duration{M, 2 * M, 4 * M, 15 * M / 2, 15 * M, 30 * M, 1 * H, 2 * H, 4 * H, 8 * H, 16 * H}
1130}
1131
1132func hookDeliver(log mlog.Log, h Hook) {
1133 ctx := mox.Shutdown
1134
1135 qlog := log.WithCid(mox.Cid())
1136 qlog.Debug("attempting to deliver webhook", h.attrs()...)
1137 qlog = qlog.With(slog.Int64("webhookid", h.ID))
1138
1139 defer func() {
1140 hookDeliveryResults <- h.URL
1141
1142 x := recover()
1143 if x != nil {
1144 qlog.Error("webhook deliver panic", slog.Any("panic", x))
1145 debug.PrintStack()
1146 metrics.PanicInc(metrics.Queue)
1147 }
1148 }()
1149
1150 // todo: should we get a new webhook url from the config before attempting? would intervene with our "urls busy" approach. may not be worth it.
1151
1152 // Set Attempts & NextAttempt early. In case of failures while processing, at least
1153 // we won't try again immediately. We do backoff at intervals:
1154 var backoff time.Duration
1155 if h.Attempts < len(hookIntervals) {
1156 backoff = hookIntervals[h.Attempts]
1157 } else {
1158 backoff = hookIntervals[len(hookIntervals)-1] * time.Duration(2)
1159 }
1160 backoff += time.Duration(jitter.IntN(200)-100) * backoff / 10000
1161 h.Attempts++
1162 now := time.Now()
1163 h.NextAttempt = now.Add(backoff)
1164 h.Results = append(h.Results, HookResult{Start: now, URL: h.URL, Error: resultErrorDelivering})
1165 result := &h.Results[len(h.Results)-1]
1166 if err := DB.Update(mox.Shutdown, &h); err != nil {
1167 qlog.Errorx("storing webhook delivery attempt", err)
1168 return
1169 }
1170
1171 hctx, cancel := context.WithTimeout(ctx, 60*time.Second)
1172 defer cancel()
1173 t0 := time.Now()
1174 code, response, err := HookPost(hctx, qlog, h.ID, h.Attempts, h.URL, h.Authorization, h.Payload)
1175 result.Duration = time.Since(t0)
1176 result.Success = err == nil
1177 result.Code = code
1178 result.Error = ""
1179 result.Response = response
1180 if err != nil {
1181 result.Error = err.Error()
1182 }
1183 if err != nil && h.Attempts <= len(hookIntervals) {
1184 // We'll try again later, so only update existing record.
1185 qlog.Debugx("webhook delivery failed, will try again later", err)
1186 xerr := DB.Write(context.Background(), func(tx *bstore.Tx) error {
1187 if err := tx.Update(&h); err == bstore.ErrAbsent {
1188 return updateRetiredHook(tx, h, result)
1189 } else if err != nil {
1190 return fmt.Errorf("update webhook after retryable failure: %v", err)
1191 }
1192 return nil
1193 })
1194 qlog.Check(xerr, "updating failed webhook delivery attempt in database", slog.String("deliveryerr", err.Error()))
1195 return
1196 }
1197
1198 qlog.Debugx("webhook delivery completed", err, slog.Bool("success", result.Success))
1199
1200 // Move Hook to HookRetired.
1201 err = DB.Write(context.Background(), func(tx *bstore.Tx) error {
1202 if err := tx.Delete(&h); err == bstore.ErrAbsent {
1203 return updateRetiredHook(tx, h, result)
1204 } else if err != nil {
1205 return fmt.Errorf("removing webhook from database: %v", err)
1206 }
1207 keep := hookRetiredKeep(h.Account)
1208 if keep > 0 {
1209 hr := h.Retired(result.Success, t0, t0.Add(keep))
1210 if err := tx.Insert(&hr); err != nil {
1211 return fmt.Errorf("inserting retired webhook in database: %v", err)
1212 }
1213 }
1214 return nil
1215 })
1216 qlog.Check(err, "moving delivered webhook from to retired hooks")
1217}
1218
1219func updateRetiredHook(tx *bstore.Tx, h Hook, result *HookResult) error {
1220 // Hook is gone. It may have been superseded and moved to HookRetired while we were
1221 // delivering it. If so, add the result to the retired hook.
1222 hr := HookRetired{ID: h.ID}
1223 if err := tx.Get(&hr); err != nil {
1224 return fmt.Errorf("result for webhook that was no longer in webhook queue or retired webhooks: %v", err)
1225 }
1226 result.Error += "(superseded)"
1227 hr.Results = append(hr.Results, *result)
1228 if err := tx.Update(&hr); err != nil {
1229 return fmt.Errorf("updating retired webhook after webhook was superseded during delivery: %v", err)
1230 }
1231 return nil
1232}
1233
1234var hookClient = &http.Client{Transport: hookTransport()}
1235
1236func hookTransport() *http.Transport {
1237 t := http.DefaultTransport.(*http.Transport).Clone()
1238 // Limit resources consumed during idle periods, probably most of the time. But
1239 // during busy periods, we may use the few connections for many events.
1240 t.IdleConnTimeout = 5 * time.Second
1241 t.MaxIdleConnsPerHost = 2
1242 return t
1243}
1244
1245func HookPost(ctx context.Context, log mlog.Log, hookID int64, attempt int, url, authz string, payload string) (code int, response string, err error) {
1246 req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(payload))
1247 if err != nil {
1248 return 0, "", fmt.Errorf("new request: %v", err)
1249 }
1250 req.Header.Set("User-Agent", fmt.Sprintf("mox/%s (webhook)", moxvar.Version))
1251 req.Header.Set("Content-Type", "application/json; charset=utf-8")
1252 req.Header.Set("X-Mox-Webhook-ID", fmt.Sprintf("%d", hookID))
1253 req.Header.Set("X-Mox-Webhook-Attempt", fmt.Sprintf("%d", attempt))
1254 if authz != "" {
1255 req.Header.Set("Authorization", authz)
1256 }
1257 t0 := time.Now()
1258 resp, err := hookClient.Do(req)
1259 metricHookRequest.Observe(float64(time.Since(t0)) / float64(time.Second))
1260 if err != nil {
1261 metricHookResult.WithLabelValues("error").Inc()
1262 log.Debugx("webhook http transaction", err)
1263 return 0, "", fmt.Errorf("http transact: %v", err)
1264 }
1265 defer resp.Body.Close()
1266
1267 // Use full http status code for known codes, and a generic "<major>xx" for others.
1268 result := fmt.Sprintf("%d", resp.StatusCode)
1269 if http.StatusText(resp.StatusCode) == "" {
1270 result = fmt.Sprintf("%dxx", resp.StatusCode/100)
1271 }
1272 metricHookResult.WithLabelValues(result).Inc()
1273 log.Debug("webhook http post result", slog.Int("statuscode", resp.StatusCode), slog.Duration("duration", time.Since(t0)))
1274
1275 respbuf, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
1276 if resp.StatusCode != http.StatusOK {
1277 err = fmt.Errorf("http status %q, expected 200 ok", resp.Status)
1278 }
1279 return resp.StatusCode, string(respbuf), err
1280}
1281