1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
4package queue
5
6import (
7 "bytes"
8 "context"
9 "errors"
10 "fmt"
11 "io"
12 "log/slog"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime/debug"
17 "slices"
18 "strings"
19 "time"
20
21 "golang.org/x/net/proxy"
22
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/prometheus/client_golang/prometheus/promauto"
25
26 "github.com/mjl-/bstore"
27
28 "github.com/mjl-/mox/config"
29 "github.com/mjl-/mox/dns"
30 "github.com/mjl-/mox/dsn"
31 "github.com/mjl-/mox/metrics"
32 "github.com/mjl-/mox/mlog"
33 "github.com/mjl-/mox/mox-"
34 "github.com/mjl-/mox/moxio"
35 "github.com/mjl-/mox/moxvar"
36 "github.com/mjl-/mox/smtp"
37 "github.com/mjl-/mox/smtpclient"
38 "github.com/mjl-/mox/store"
39 "github.com/mjl-/mox/tlsrpt"
40 "github.com/mjl-/mox/tlsrptdb"
41 "github.com/mjl-/mox/webapi"
42 "github.com/mjl-/mox/webhook"
43)
44
45// ErrFromID indicate a fromid was present when adding a message to the queue, but
46// it wasn't unique.
47var ErrFromID = errors.New("fromid not unique")
48
49var (
50 metricConnection = promauto.NewCounterVec(
51 prometheus.CounterOpts{
52 Name: "mox_queue_connection_total",
53 Help: "Queue client connections, outgoing.",
54 },
55 []string{
56 "result", // "ok", "timeout", "canceled", "error"
57 },
58 )
59 metricDelivery = promauto.NewHistogramVec(
60 prometheus.HistogramOpts{
61 Name: "mox_queue_delivery_duration_seconds",
62 Help: "SMTP client delivery attempt to single host.",
63 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
64 },
65 []string{
66 "attempt", // Number of attempts.
67 "transport", // empty for default direct delivery.
68 "tlsmode", // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
69 "result", // ok, timeout, canceled, temperror, permerror, error
70 },
71 )
72 metricHold = promauto.NewGauge(
73 prometheus.GaugeOpts{
74 Name: "mox_queue_hold",
75 Help: "Messages in queue that are on hold.",
76 },
77 )
78)
79
80var jitter = mox.NewPseudoRand()
81
82var DBTypes = []any{Msg{}, HoldRule{}, MsgRetired{}, webapi.Suppression{}, Hook{}, HookRetired{}} // Types stored in DB.
83var DB *bstore.DB // Exported for making backups.
84
85// Allow requesting delivery starting from up to this interval from time of submission.
86const FutureReleaseIntervalMax = 60 * 24 * time.Hour
87
88// Set for mox localserve, to prevent queueing.
89var Localserve bool
90
91// HoldRule is a set of conditions that cause a matching message to be marked as on
92// hold when it is queued. All-empty conditions matches all messages, effectively
93// pausing the entire queue.
94type HoldRule struct {
95 ID int64
96 Account string
97 SenderDomain dns.Domain
98 RecipientDomain dns.Domain
99 SenderDomainStr string // Unicode.
100 RecipientDomainStr string // Unicode.
101}
102
103func (pr HoldRule) All() bool {
104 pr.ID = 0
105 return pr == HoldRule{}
106}
107
108func (pr HoldRule) matches(m Msg) bool {
109 return pr.All() || pr.Account == m.SenderAccount || pr.SenderDomainStr == m.SenderDomainStr || pr.RecipientDomainStr == m.RecipientDomainStr
110}
111
112// Msg is a message in the queue.
113//
114// Use MakeMsg to make a message with fields that Add needs. Add will further set
115// queueing related fields.
116type Msg struct {
117 ID int64
118
119 // A message for multiple recipients will get a BaseID that is identical to the
120 // first Msg.ID queued. The message contents will be identical for each recipient,
121 // including MsgPrefix. If other properties are identical too, including recipient
122 // domain, multiple Msgs may be delivered in a single SMTP transaction. For
123 // messages with a single recipient, this field will be 0.
124 BaseID int64 `bstore:"index"`
125
126 Queued time.Time `bstore:"default now"`
127 Hold bool // If set, delivery won't be attempted.
128 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
129 SenderLocalpart smtp.Localpart // Should be a local user and domain.
130 SenderDomain dns.IPDomain
131 SenderDomainStr string // For filtering, unicode.
132 FromID string // For transactional messages, used to match later DSNs.
133 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
134 RecipientDomain dns.IPDomain
135 RecipientDomainStr string // For filtering, unicode domain. Can also contain ip enclosed in [].
136 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
137 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
138 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
139 NextAttempt time.Time // For scheduling.
140 LastAttempt *time.Time
141 Results []MsgResult
142
143 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
144 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
145 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
146 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
147 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
148 MessageID string // Message-ID header, including <>. Used when composing a DSN, in its References header.
149 MsgPrefix []byte // Data to send before the contents from the file, typically with headers like DKIM-Signature.
150 Subject string // For context about delivery.
151
152 // If set, this message is a DSN and this is a version using utf-8, for the case
153 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
154 // relevant.
155 DSNUTF8 []byte
156
157 // If non-empty, the transport to use for this message. Can be set through cli or
158 // admin interface. If empty (the default for a submitted message), regular routing
159 // rules apply.
160 Transport string
161
162 // RequireTLS influences TLS verification during delivery.
163 //
164 // If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
165 // back to optional opportunistic non-verified STARTTLS.
166 //
167 // If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
168 // MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
169 // server.
170 //
171 // If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
172 // domain's policy is ignored if it does not lead to a successful TLS connection,
173 // i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
174 RequireTLS *bool
175 // ../rfc/8689:250
176
177 // For DSNs, where the original FUTURERELEASE value must be included as per-message
178 // field. This field should be of the form "for;" plus interval, or "until;" plus
179 // utc date-time.
180 FutureReleaseRequest string
181 // ../rfc/4865:305
182
183 Extra map[string]string // Extra information, for transactional email.
184}
185
186// MsgResult is the result (or work in progress) of a delivery attempt.
187type MsgResult struct {
188 Start time.Time
189 Duration time.Duration
190 Success bool
191 Code int
192 Secode string
193 Error string
194 // todo: store smtp trace for failed deliveries for debugging, perhaps also for successful deliveries.
195}
196
197// Stored in MsgResult.Error while delivery is in progress. Replaced after success/error.
198const resultErrorDelivering = "delivering..."
199
200// markResult updates/adds a delivery result.
201func (m *Msg) markResult(code int, secode string, errmsg string, success bool) {
202 if len(m.Results) == 0 || m.Results[len(m.Results)-1].Error != resultErrorDelivering {
203 m.Results = append(m.Results, MsgResult{Start: time.Now()})
204 }
205 result := &m.Results[len(m.Results)-1]
206 result.Duration = time.Since(result.Start)
207 result.Code = code
208 result.Secode = secode
209 result.Error = errmsg
210 result.Success = success
211}
212
213// LastResult returns the last result entry, or an empty result.
214func (m *Msg) LastResult() MsgResult {
215 if len(m.Results) == 0 {
216 return MsgResult{Start: time.Now()}
217 }
218 return m.Results[len(m.Results)-1]
219}
220
221// Sender of message as used in MAIL FROM.
222func (m Msg) Sender() smtp.Path {
223 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
224}
225
226// Recipient of message as used in RCPT TO.
227func (m Msg) Recipient() smtp.Path {
228 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
229}
230
231// MessagePath returns the path where the message is stored.
232func (m Msg) MessagePath() string {
233 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
234}
235
236// todo: store which transport (if any) was actually used in MsgResult, based on routes.
237
238// Retired returns a MsgRetired for the message, for history of deliveries.
239func (m Msg) Retired(success bool, t, keepUntil time.Time) MsgRetired {
240 return MsgRetired{
241 ID: m.ID,
242 BaseID: m.BaseID,
243 Queued: m.Queued,
244 SenderAccount: m.SenderAccount,
245 SenderLocalpart: m.SenderLocalpart,
246 SenderDomainStr: m.SenderDomainStr,
247 FromID: m.FromID,
248 RecipientLocalpart: m.RecipientLocalpart,
249 RecipientDomain: m.RecipientDomain,
250 RecipientDomainStr: m.RecipientDomainStr,
251 Attempts: m.Attempts,
252 MaxAttempts: m.MaxAttempts,
253 DialedIPs: m.DialedIPs,
254 LastAttempt: m.LastAttempt,
255 Results: m.Results,
256 Has8bit: m.Has8bit,
257 SMTPUTF8: m.SMTPUTF8,
258 IsDMARCReport: m.IsDMARCReport,
259 IsTLSReport: m.IsTLSReport,
260 Size: m.Size,
261 MessageID: m.MessageID,
262 Subject: m.Subject,
263 Transport: m.Transport,
264 RequireTLS: m.RequireTLS,
265 FutureReleaseRequest: m.FutureReleaseRequest,
266 Extra: m.Extra,
267
268 RecipientAddress: smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}.XString(true),
269 Success: success,
270 LastActivity: t,
271 KeepUntil: keepUntil,
272 }
273}
274
275// MsgRetired is a message for which delivery completed, either successful,
276// failed/canceled. Retired messages are only stored if so configured, and will be
277// cleaned up after the configured period.
278type MsgRetired struct {
279 ID int64 // Same ID as it was as Msg.ID.
280
281 BaseID int64
282 Queued time.Time
283 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
284 SenderLocalpart smtp.Localpart // Should be a local user and domain.
285 SenderDomainStr string // For filtering, unicode.
286 FromID string `bstore:"index"` // Used to match DSNs.
287 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
288 RecipientDomain dns.IPDomain
289 RecipientDomainStr string // For filtering, unicode.
290 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
291 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
292 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
293 LastAttempt *time.Time
294 Results []MsgResult
295
296 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
297 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
298 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
299 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
300 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
301 MessageID string // Used when composing a DSN, in its References header.
302 Subject string // For context about delivery.
303
304 Transport string
305 RequireTLS *bool
306 FutureReleaseRequest string
307
308 Extra map[string]string // Extra information, for transactional email.
309
310 LastActivity time.Time `bstore:"index"`
311 RecipientAddress string `bstore:"index RecipientAddress+LastActivity"`
312 Success bool // Whether delivery to next hop succeeded.
313 KeepUntil time.Time `bstore:"index"`
314}
315
316// Sender of message as used in MAIL FROM.
317func (m MsgRetired) Sender() (path smtp.Path, err error) {
318 path.Localpart = m.RecipientLocalpart
319 if strings.HasPrefix(m.SenderDomainStr, "[") && strings.HasSuffix(m.SenderDomainStr, "]") {
320 s := m.SenderDomainStr[1 : len(m.SenderDomainStr)-1]
321 path.IPDomain.IP = net.ParseIP(s)
322 if path.IPDomain.IP == nil {
323 err = fmt.Errorf("parsing ip address %q: %v", s, err)
324 }
325 } else {
326 path.IPDomain.Domain, err = dns.ParseDomain(m.SenderDomainStr)
327 }
328 return
329}
330
331// Recipient of message as used in RCPT TO.
332func (m MsgRetired) Recipient() smtp.Path {
333 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
334}
335
336// LastResult returns the last result entry, or an empty result.
337func (m MsgRetired) LastResult() MsgResult {
338 if len(m.Results) == 0 {
339 return MsgResult{}
340 }
341 return m.Results[len(m.Results)-1]
342}
343
344// Init opens the queue database without starting delivery.
345func Init() error {
346 qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
347 os.MkdirAll(filepath.Dir(qpath), 0770)
348 isNew := false
349 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
350 isNew = true
351 }
352
353 var err error
354 log := mlog.New("queue", nil)
355 opts := bstore.Options{Timeout: 5 * time.Second, Perm: 0660, RegisterLogger: moxvar.RegisterLogger(qpath, log.Logger)}
356 DB, err = bstore.Open(mox.Shutdown, qpath, &opts, DBTypes...)
357 if err == nil {
358 err = DB.Read(mox.Shutdown, func(tx *bstore.Tx) error {
359 return metricHoldUpdate(tx)
360 })
361 }
362 if err != nil {
363 if isNew {
364 err := os.Remove(qpath)
365 log.Check(err, "removing new queue database file after error")
366 }
367 return fmt.Errorf("open queue database: %s", err)
368 }
369 return nil
370}
371
372// When we update the gauge, we just get the full current value, not try to account
373// for adds/removes.
374func metricHoldUpdate(tx *bstore.Tx) error {
375 count, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{Hold: true}).Count()
376 if err != nil {
377 return fmt.Errorf("querying messages on hold for metric: %v", err)
378 }
379 metricHold.Set(float64(count))
380 return nil
381}
382
383// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
384func Shutdown() {
385 err := DB.Close()
386 if err != nil {
387 mlog.New("queue", nil).Errorx("closing queue db", err)
388 }
389 DB = nil
390}
391
392// todo: the filtering & sorting can use improvements. too much duplicated code (variants between {Msg,Hook}{,Retired}. Sort has pagination fields, some untyped.
393
394// Filter filters messages to list or operate on. Used by admin web interface
395// and cli.
396//
397// Only non-empty/non-zero values are applied to the filter. Leaving all fields
398// empty/zero matches all messages.
399type Filter struct {
400 Max int
401 IDs []int64
402 Account string
403 From string
404 To string
405 Hold *bool
406 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
407 NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
408 Transport *string
409}
410
411func (f Filter) apply(q *bstore.Query[Msg]) error {
412 if len(f.IDs) > 0 {
413 q.FilterIDs(f.IDs)
414 }
415 applyTime := func(field string, s string) error {
416 orig := s
417 var before bool
418 if strings.HasPrefix(s, "<") {
419 before = true
420 } else if !strings.HasPrefix(s, ">") {
421 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
422 }
423 s = strings.TrimSpace(s[1:])
424 var t time.Time
425 if s == "now" {
426 t = time.Now()
427 } else if d, err := time.ParseDuration(s); err != nil {
428 return fmt.Errorf("parsing duration %q: %v", orig, err)
429 } else {
430 t = time.Now().Add(d)
431 }
432 if before {
433 q.FilterLess(field, t)
434 } else {
435 q.FilterGreater(field, t)
436 }
437 return nil
438 }
439 if f.Hold != nil {
440 q.FilterEqual("Hold", *f.Hold)
441 }
442 if f.Submitted != "" {
443 if err := applyTime("Queued", f.Submitted); err != nil {
444 return fmt.Errorf("applying filter for submitted: %v", err)
445 }
446 }
447 if f.NextAttempt != "" {
448 if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
449 return fmt.Errorf("applying filter for next attempt: %v", err)
450 }
451 }
452 if f.Account != "" {
453 q.FilterNonzero(Msg{SenderAccount: f.Account})
454 }
455 if f.Transport != nil {
456 q.FilterEqual("Transport", *f.Transport)
457 }
458 if f.From != "" || f.To != "" {
459 q.FilterFn(func(m Msg) bool {
460 return f.From != "" && strings.Contains(m.Sender().XString(true), f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
461 })
462 }
463 if f.Max != 0 {
464 q.Limit(f.Max)
465 }
466 return nil
467}
468
469type Sort struct {
470 Field string // "Queued" or "NextAttempt"/"".
471 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
472 Last any // Value of Field for last object. Must be set iff LastID is set.
473 Asc bool // Ascending, or descending.
474}
475
476func (s Sort) apply(q *bstore.Query[Msg]) error {
477 switch s.Field {
478 case "", "NextAttempt":
479 s.Field = "NextAttempt"
480 case "Queued":
481 s.Field = "Queued"
482 default:
483 return fmt.Errorf("unknown sort order field %q", s.Field)
484 }
485
486 if s.LastID > 0 {
487 ls, ok := s.Last.(string)
488 if !ok {
489 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
490 }
491 last, err := time.Parse(time.RFC3339Nano, ls)
492 if err != nil {
493 last, err = time.Parse(time.RFC3339, ls)
494 }
495 if err != nil {
496 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
497 }
498 q.FilterNotEqual("ID", s.LastID)
499 var fieldEqual func(m Msg) bool
500 if s.Field == "NextAttempt" {
501 fieldEqual = func(m Msg) bool { return m.NextAttempt.Equal(last) }
502 } else {
503 fieldEqual = func(m Msg) bool { return m.Queued.Equal(last) }
504 }
505 if s.Asc {
506 q.FilterGreaterEqual(s.Field, last)
507 q.FilterFn(func(m Msg) bool {
508 return !fieldEqual(m) || m.ID > s.LastID
509 })
510 } else {
511 q.FilterLessEqual(s.Field, last)
512 q.FilterFn(func(m Msg) bool {
513 return !fieldEqual(m) || m.ID < s.LastID
514 })
515 }
516 }
517 if s.Asc {
518 q.SortAsc(s.Field, "ID")
519 } else {
520 q.SortDesc(s.Field, "ID")
521 }
522 return nil
523}
524
525// List returns max 100 messages matching filter in the delivery queue.
526// By default, orders by next delivery attempt.
527func List(ctx context.Context, filter Filter, sort Sort) ([]Msg, error) {
528 q := bstore.QueryDB[Msg](ctx, DB)
529 if err := filter.apply(q); err != nil {
530 return nil, err
531 }
532 if err := sort.apply(q); err != nil {
533 return nil, err
534 }
535 qmsgs, err := q.List()
536 if err != nil {
537 return nil, err
538 }
539 return qmsgs, nil
540}
541
542// Count returns the number of messages in the delivery queue.
543func Count(ctx context.Context) (int, error) {
544 return bstore.QueryDB[Msg](ctx, DB).Count()
545}
546
547// HoldRuleList returns all hold rules.
548func HoldRuleList(ctx context.Context) ([]HoldRule, error) {
549 return bstore.QueryDB[HoldRule](ctx, DB).List()
550}
551
552// HoldRuleAdd adds a new hold rule causing newly submitted messages to be marked
553// as "on hold", and existing matching messages too.
554func HoldRuleAdd(ctx context.Context, log mlog.Log, hr HoldRule) (HoldRule, error) {
555 var n int
556 err := DB.Write(ctx, func(tx *bstore.Tx) error {
557 hr.ID = 0
558 hr.SenderDomainStr = hr.SenderDomain.Name()
559 hr.RecipientDomainStr = hr.RecipientDomain.Name()
560 if err := tx.Insert(&hr); err != nil {
561 return err
562 }
563 log.Info("adding hold rule", slog.Any("holdrule", hr))
564
565 q := bstore.QueryTx[Msg](tx)
566 if !hr.All() {
567 q.FilterNonzero(Msg{
568 SenderAccount: hr.Account,
569 SenderDomainStr: hr.SenderDomainStr,
570 RecipientDomainStr: hr.RecipientDomainStr,
571 })
572 }
573 var err error
574 n, err = q.UpdateField("Hold", true)
575 if err != nil {
576 return fmt.Errorf("marking existing matching messages in queue on hold: %v", err)
577 }
578 return metricHoldUpdate(tx)
579 })
580 if err != nil {
581 return HoldRule{}, err
582 }
583 log.Info("marked messages in queue as on hold", slog.Int("messages", n))
584 msgqueueKick()
585 return hr, nil
586}
587
588// HoldRuleRemove removes a hold rule. The Hold field of existing messages are not
589// changed.
590func HoldRuleRemove(ctx context.Context, log mlog.Log, holdRuleID int64) error {
591 return DB.Write(ctx, func(tx *bstore.Tx) error {
592 hr := HoldRule{ID: holdRuleID}
593 if err := tx.Get(&hr); err != nil {
594 return err
595 }
596 log.Info("removing hold rule", slog.Any("holdrule", hr))
597 return tx.Delete(HoldRule{ID: holdRuleID})
598 })
599}
600
601// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
602// messageID should include <>.
603func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time, subject string) Msg {
604 return Msg{
605 SenderLocalpart: sender.Localpart,
606 SenderDomain: sender.IPDomain,
607 RecipientLocalpart: recipient.Localpart,
608 RecipientDomain: recipient.IPDomain,
609 Has8bit: has8bit,
610 SMTPUTF8: smtputf8,
611 Size: size,
612 MessageID: messageID,
613 MsgPrefix: prefix,
614 Subject: subject,
615 RequireTLS: requireTLS,
616 Queued: time.Now(),
617 NextAttempt: next,
618 }
619}
620
621// Add one or more new messages to the queue. If the sender paths and MsgPrefix are
622// identical, they'll get the same BaseID, so they can be delivered in a single
623// SMTP transaction, with a single DATA command, but may be split into multiple
624// transactions if errors/limits are encountered. The queue is kicked immediately
625// to start a first delivery attempt.
626//
627// ID of the messagse must be 0 and will be set after inserting in the queue.
628//
629// Add sets derived fields like SenderDomainStr and RecipientDomainStr, and fields
630// related to queueing, such as Queued, NextAttempt.
631func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
632 if len(qml) == 0 {
633 return fmt.Errorf("must queue at least one message")
634 }
635
636 base := true
637
638 for i, qm := range qml {
639 if qm.ID != 0 {
640 return fmt.Errorf("id of queued messages must be 0")
641 }
642 // Sanity check, internal consistency.
643 qml[i].SenderDomainStr = formatIPDomain(qm.SenderDomain)
644 qml[i].RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
645 if base && i > 0 && qm.Sender().String() != qml[0].Sender().String() || !bytes.Equal(qm.MsgPrefix, qml[0].MsgPrefix) {
646 base = false
647 }
648 }
649
650 tx, err := DB.Begin(ctx, true)
651 if err != nil {
652 return fmt.Errorf("begin transaction: %w", err)
653 }
654 defer func() {
655 if tx != nil {
656 if err := tx.Rollback(); err != nil {
657 log.Errorx("rollback for queue", err)
658 }
659 }
660 }()
661
662 // Mark messages Hold if they match a hold rule.
663 holdRules, err := bstore.QueryTx[HoldRule](tx).List()
664 if err != nil {
665 return fmt.Errorf("getting queue hold rules")
666 }
667
668 // Insert messages into queue. If multiple messages are to be delivered in a single
669 // transaction, they all get a non-zero BaseID that is the Msg.ID of the first
670 // message inserted.
671 var baseID int64
672 for i := range qml {
673 // FromIDs must be unique if present. We don't have a unique index because values
674 // can be the empty string. We check in both Msg and MsgRetired, both are relevant
675 // for uniquely identifying a message sent in the past.
676 if fromID := qml[i].FromID; fromID != "" {
677 if exists, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{FromID: fromID}).Exists(); err != nil {
678 return fmt.Errorf("looking up fromid: %v", err)
679 } else if exists {
680 return fmt.Errorf("%w: fromid %q already present in message queue", ErrFromID, fromID)
681 }
682 if exists, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Exists(); err != nil {
683 return fmt.Errorf("looking up fromid: %v", err)
684 } else if exists {
685 return fmt.Errorf("%w: fromid %q already present in retired message queue", ErrFromID, fromID)
686 }
687 }
688
689 qml[i].SenderAccount = senderAccount
690 qml[i].BaseID = baseID
691 for _, hr := range holdRules {
692 if hr.matches(qml[i]) {
693 qml[i].Hold = true
694 break
695 }
696 }
697 if err := tx.Insert(&qml[i]); err != nil {
698 return err
699 }
700 if base && i == 0 && len(qml) > 1 {
701 baseID = qml[i].ID
702 qml[i].BaseID = baseID
703 if err := tx.Update(&qml[i]); err != nil {
704 return err
705 }
706 }
707 }
708
709 var paths []string
710 defer func() {
711 for _, p := range paths {
712 err := os.Remove(p)
713 log.Check(err, "removing destination message file for queue", slog.String("path", p))
714 }
715 }()
716
717 syncDirs := map[string]struct{}{}
718
719 for _, qm := range qml {
720 dst := qm.MessagePath()
721 paths = append(paths, dst)
722
723 dstDir := filepath.Dir(dst)
724 if _, ok := syncDirs[dstDir]; !ok {
725 os.MkdirAll(dstDir, 0770)
726 syncDirs[dstDir] = struct{}{}
727 }
728
729 if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
730 return fmt.Errorf("linking/copying message to new file: %s", err)
731 }
732 }
733
734 for dir := range syncDirs {
735 if err := moxio.SyncDir(log, dir); err != nil {
736 return fmt.Errorf("sync directory: %v", err)
737 }
738 }
739
740 for _, m := range qml {
741 if m.Hold {
742 if err := metricHoldUpdate(tx); err != nil {
743 return err
744 }
745 break
746 }
747 }
748
749 if err := tx.Commit(); err != nil {
750 return fmt.Errorf("commit transaction: %s", err)
751 }
752 tx = nil
753 paths = nil
754
755 msgqueueKick()
756
757 return nil
758}
759
760func formatIPDomain(d dns.IPDomain) string {
761 if len(d.IP) > 0 {
762 return "[" + d.IP.String() + "]"
763 }
764 return d.Domain.Name()
765}
766
767var (
768 msgqueue = make(chan struct{}, 1)
769 deliveryResults = make(chan string, 1)
770)
771
772func kick() {
773 msgqueueKick()
774 hookqueueKick()
775}
776
777func msgqueueKick() {
778 select {
779 case msgqueue <- struct{}{}:
780 default:
781 }
782}
783
784// NextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
785// kicks the queue.
786func NextAttemptAdd(ctx context.Context, filter Filter, d time.Duration) (affected int, err error) {
787 err = DB.Write(ctx, func(tx *bstore.Tx) error {
788 q := bstore.QueryTx[Msg](tx)
789 if err := filter.apply(q); err != nil {
790 return err
791 }
792 msgs, err := q.List()
793 if err != nil {
794 return fmt.Errorf("listing matching messages: %v", err)
795 }
796 for _, m := range msgs {
797 m.NextAttempt = m.NextAttempt.Add(d)
798 if err := tx.Update(&m); err != nil {
799 return err
800 }
801 }
802 affected = len(msgs)
803 return nil
804 })
805 if err != nil {
806 return 0, err
807 }
808 msgqueueKick()
809 return affected, nil
810}
811
812// NextAttemptSet sets NextAttempt for all matching messages to a new time, and
813// kicks the queue.
814func NextAttemptSet(ctx context.Context, filter Filter, t time.Time) (affected int, err error) {
815 q := bstore.QueryDB[Msg](ctx, DB)
816 if err := filter.apply(q); err != nil {
817 return 0, err
818 }
819 n, err := q.UpdateNonzero(Msg{NextAttempt: t})
820 if err != nil {
821 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
822 }
823 msgqueueKick()
824 return n, nil
825}
826
827// HoldSet sets Hold for all matching messages and kicks the queue.
828func HoldSet(ctx context.Context, filter Filter, hold bool) (affected int, err error) {
829 err = DB.Write(ctx, func(tx *bstore.Tx) error {
830 q := bstore.QueryTx[Msg](tx)
831 if err := filter.apply(q); err != nil {
832 return err
833 }
834 n, err := q.UpdateFields(map[string]any{"Hold": hold})
835 if err != nil {
836 return fmt.Errorf("selecting and updating messages in queue: %v", err)
837 }
838 affected = n
839 return metricHoldUpdate(tx)
840 })
841 if err != nil {
842 return 0, err
843 }
844 msgqueueKick()
845 return affected, nil
846}
847
848// TransportSet changes the transport to use for the matching messages.
849func TransportSet(ctx context.Context, filter Filter, transport string) (affected int, err error) {
850 q := bstore.QueryDB[Msg](ctx, DB)
851 if err := filter.apply(q); err != nil {
852 return 0, err
853 }
854 n, err := q.UpdateFields(map[string]any{"Transport": transport})
855 if err != nil {
856 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
857 }
858 msgqueueKick()
859 return n, nil
860}
861
862// Fail marks matching messages as failed for delivery, delivers a DSN to the
863// sender, and sends a webhook.
864//
865// Returns number of messages removed, which can be non-zero even in case of an
866// error.
867func Fail(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
868 return failDrop(ctx, log, f, true)
869}
870
871// Drop removes matching messages from the queue. Messages are added as retired
872// message, webhooks with the "canceled" event are queued.
873//
874// Returns number of messages removed, which can be non-zero even in case of an
875// error.
876func Drop(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
877 return failDrop(ctx, log, f, false)
878}
879
880func failDrop(ctx context.Context, log mlog.Log, filter Filter, fail bool) (affected int, err error) {
881 var msgs []Msg
882 err = DB.Write(ctx, func(tx *bstore.Tx) error {
883 q := bstore.QueryTx[Msg](tx)
884 if err := filter.apply(q); err != nil {
885 return err
886 }
887 var err error
888 msgs, err = q.List()
889 if err != nil {
890 return fmt.Errorf("getting messages to delete: %v", err)
891 }
892
893 if len(msgs) == 0 {
894 return nil
895 }
896
897 now := time.Now()
898 var remoteMTA dsn.NameIP
899 for i := range msgs {
900 result := MsgResult{
901 Start: now,
902 Error: "delivery canceled by admin",
903 }
904 msgs[i].Results = append(msgs[i].Results, result)
905 if fail {
906 if msgs[i].LastAttempt == nil {
907 msgs[i].LastAttempt = &now
908 }
909 deliverDSNFailure(log, msgs[i], remoteMTA, "", result.Error, nil)
910 }
911 }
912 event := webhook.EventCanceled
913 if fail {
914 event = webhook.EventFailed
915 }
916 if err := retireMsgs(log, tx, event, 0, "", nil, msgs...); err != nil {
917 return fmt.Errorf("removing queue messages from database: %w", err)
918 }
919 return metricHoldUpdate(tx)
920 })
921 if err != nil {
922 return 0, err
923 }
924 if len(msgs) > 0 {
925 if err := removeMsgsFS(log, msgs...); err != nil {
926 return len(msgs), fmt.Errorf("removing queue messages from file system: %w", err)
927 }
928 }
929 kick()
930 return len(msgs), nil
931}
932
933// RequireTLSSet updates the RequireTLS field of matching messages.
934func RequireTLSSet(ctx context.Context, filter Filter, requireTLS *bool) (affected int, err error) {
935 q := bstore.QueryDB[Msg](ctx, DB)
936 if err := filter.apply(q); err != nil {
937 return 0, err
938 }
939 n, err := q.UpdateFields(map[string]any{"RequireTLS": requireTLS})
940 msgqueueKick()
941 return n, err
942}
943
944// RetiredFilter filters messages to list or operate on. Used by admin web interface
945// and cli.
946//
947// Only non-empty/non-zero values are applied to the filter. Leaving all fields
948// empty/zero matches all messages.
949type RetiredFilter struct {
950 Max int
951 IDs []int64
952 Account string
953 From string
954 To string
955 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
956 LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
957 Transport *string
958 Success *bool
959}
960
961func (f RetiredFilter) apply(q *bstore.Query[MsgRetired]) error {
962 if len(f.IDs) > 0 {
963 q.FilterIDs(f.IDs)
964 }
965 applyTime := func(field string, s string) error {
966 orig := s
967 var before bool
968 if strings.HasPrefix(s, "<") {
969 before = true
970 } else if !strings.HasPrefix(s, ">") {
971 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
972 }
973 s = strings.TrimSpace(s[1:])
974 var t time.Time
975 if s == "now" {
976 t = time.Now()
977 } else if d, err := time.ParseDuration(s); err != nil {
978 return fmt.Errorf("parsing duration %q: %v", orig, err)
979 } else {
980 t = time.Now().Add(d)
981 }
982 if before {
983 q.FilterLess(field, t)
984 } else {
985 q.FilterGreater(field, t)
986 }
987 return nil
988 }
989 if f.Submitted != "" {
990 if err := applyTime("Queued", f.Submitted); err != nil {
991 return fmt.Errorf("applying filter for submitted: %v", err)
992 }
993 }
994 if f.LastActivity != "" {
995 if err := applyTime("LastActivity", f.LastActivity); err != nil {
996 return fmt.Errorf("applying filter for last activity: %v", err)
997 }
998 }
999 if f.Account != "" {
1000 q.FilterNonzero(MsgRetired{SenderAccount: f.Account})
1001 }
1002 if f.Transport != nil {
1003 q.FilterEqual("Transport", *f.Transport)
1004 }
1005 if f.From != "" || f.To != "" {
1006 q.FilterFn(func(m MsgRetired) bool {
1007 return f.From != "" && strings.Contains(m.SenderLocalpart.String()+"@"+m.SenderDomainStr, f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
1008 })
1009 }
1010 if f.Success != nil {
1011 q.FilterEqual("Success", *f.Success)
1012 }
1013 if f.Max != 0 {
1014 q.Limit(f.Max)
1015 }
1016 return nil
1017}
1018
1019type RetiredSort struct {
1020 Field string // "Queued" or "LastActivity"/"".
1021 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
1022 Last any // Value of Field for last object. Must be set iff LastID is set.
1023 Asc bool // Ascending, or descending.
1024}
1025
1026func (s RetiredSort) apply(q *bstore.Query[MsgRetired]) error {
1027 switch s.Field {
1028 case "", "LastActivity":
1029 s.Field = "LastActivity"
1030 case "Queued":
1031 s.Field = "Queued"
1032 default:
1033 return fmt.Errorf("unknown sort order field %q", s.Field)
1034 }
1035
1036 if s.LastID > 0 {
1037 ls, ok := s.Last.(string)
1038 if !ok {
1039 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
1040 }
1041 last, err := time.Parse(time.RFC3339Nano, ls)
1042 if err != nil {
1043 last, err = time.Parse(time.RFC3339, ls)
1044 }
1045 if err != nil {
1046 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
1047 }
1048 q.FilterNotEqual("ID", s.LastID)
1049 var fieldEqual func(m MsgRetired) bool
1050 if s.Field == "LastActivity" {
1051 fieldEqual = func(m MsgRetired) bool { return m.LastActivity.Equal(last) }
1052 } else {
1053 fieldEqual = func(m MsgRetired) bool { return m.Queued.Equal(last) }
1054 }
1055 if s.Asc {
1056 q.FilterGreaterEqual(s.Field, last)
1057 q.FilterFn(func(mr MsgRetired) bool {
1058 return !fieldEqual(mr) || mr.ID > s.LastID
1059 })
1060 } else {
1061 q.FilterLessEqual(s.Field, last)
1062 q.FilterFn(func(mr MsgRetired) bool {
1063 return !fieldEqual(mr) || mr.ID < s.LastID
1064 })
1065 }
1066 }
1067 if s.Asc {
1068 q.SortAsc(s.Field, "ID")
1069 } else {
1070 q.SortDesc(s.Field, "ID")
1071 }
1072 return nil
1073}
1074
1075// RetiredList returns retired messages.
1076func RetiredList(ctx context.Context, filter RetiredFilter, sort RetiredSort) ([]MsgRetired, error) {
1077 q := bstore.QueryDB[MsgRetired](ctx, DB)
1078 if err := filter.apply(q); err != nil {
1079 return nil, err
1080 }
1081 if err := sort.apply(q); err != nil {
1082 return nil, err
1083 }
1084 return q.List()
1085}
1086
1087type ReadReaderAtCloser interface {
1088 io.ReadCloser
1089 io.ReaderAt
1090}
1091
1092// OpenMessage opens a message present in the queue.
1093func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
1094 qm := Msg{ID: id}
1095 err := DB.Get(ctx, &qm)
1096 if err != nil {
1097 return nil, err
1098 }
1099 f, err := os.Open(qm.MessagePath())
1100 if err != nil {
1101 return nil, fmt.Errorf("open message file: %s", err)
1102 }
1103 r := store.FileMsgReader(qm.MsgPrefix, f)
1104 return r, err
1105}
1106
1107const maxConcurrentDeliveries = 10
1108const maxConcurrentHookDeliveries = 10
1109
1110// Start opens the database by calling Init, then starts the delivery and cleanup
1111// processes.
1112func Start(resolver dns.Resolver, done chan struct{}) error {
1113 if err := Init(); err != nil {
1114 return err
1115 }
1116
1117 go startQueue(resolver, done)
1118 go startHookQueue(done)
1119
1120 go cleanupMsgRetired(done)
1121 go cleanupHookRetired(done)
1122
1123 return nil
1124}
1125
1126func cleanupMsgRetired(done chan struct{}) {
1127 log := mlog.New("queue", nil)
1128
1129 defer func() {
1130 x := recover()
1131 if x != nil {
1132 log.Error("unhandled panic in cleanupMsgRetired", slog.Any("x", x))
1133 debug.PrintStack()
1134 metrics.PanicInc(metrics.Queue)
1135 }
1136 }()
1137
1138 timer := time.NewTimer(3 * time.Second)
1139 for {
1140 select {
1141 case <-mox.Shutdown.Done():
1142 done <- struct{}{}
1143 return
1144 case <-timer.C:
1145 }
1146
1147 cleanupMsgRetiredSingle(log)
1148 timer.Reset(time.Hour)
1149 }
1150}
1151
1152func cleanupMsgRetiredSingle(log mlog.Log) {
1153 n, err := bstore.QueryDB[MsgRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
1154 log.Check(err, "removing old retired messages")
1155 if n > 0 {
1156 log.Debug("cleaned up retired messages", slog.Int("count", n))
1157 }
1158}
1159
1160func startQueue(resolver dns.Resolver, done chan struct{}) {
1161 // High-level delivery strategy advice: ../rfc/5321:3685
1162 log := mlog.New("queue", nil)
1163
1164 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
1165 busyDomains := map[string]struct{}{}
1166
1167 timer := time.NewTimer(0)
1168
1169 for {
1170 select {
1171 case <-mox.Shutdown.Done():
1172 for len(busyDomains) > 0 {
1173 domain := <-deliveryResults
1174 delete(busyDomains, domain)
1175 }
1176 done <- struct{}{}
1177 return
1178 case <-msgqueue:
1179 case <-timer.C:
1180 case domain := <-deliveryResults:
1181 delete(busyDomains, domain)
1182 }
1183
1184 if len(busyDomains) >= maxConcurrentDeliveries {
1185 continue
1186 }
1187
1188 launchWork(log, resolver, busyDomains)
1189 timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
1190 }
1191}
1192
1193func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
1194 q := bstore.QueryDB[Msg](ctx, DB)
1195 if len(busyDomains) > 0 {
1196 var doms []any
1197 for d := range busyDomains {
1198 doms = append(doms, d)
1199 }
1200 q.FilterNotEqual("RecipientDomainStr", doms...)
1201 }
1202 q.FilterEqual("Hold", false)
1203 q.SortAsc("NextAttempt")
1204 q.Limit(1)
1205 qm, err := q.Get()
1206 if err == bstore.ErrAbsent {
1207 return 24 * time.Hour
1208 } else if err != nil {
1209 log.Errorx("finding time for next delivery attempt", err)
1210 return 1 * time.Minute
1211 }
1212 return time.Until(qm.NextAttempt)
1213}
1214
1215func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
1216 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
1217 q.FilterLessEqual("NextAttempt", time.Now())
1218 q.FilterEqual("Hold", false)
1219 q.SortAsc("NextAttempt")
1220 q.Limit(maxConcurrentDeliveries)
1221 if len(busyDomains) > 0 {
1222 var doms []any
1223 for d := range busyDomains {
1224 doms = append(doms, d)
1225 }
1226 q.FilterNotEqual("RecipientDomainStr", doms...)
1227 }
1228 var msgs []Msg
1229 seen := map[string]bool{}
1230 err := q.ForEach(func(m Msg) error {
1231 dom := m.RecipientDomainStr
1232 if _, ok := busyDomains[dom]; !ok && !seen[dom] {
1233 seen[dom] = true
1234 msgs = append(msgs, m)
1235 }
1236 return nil
1237 })
1238 if err != nil {
1239 log.Errorx("querying for work in queue", err)
1240 mox.Sleep(mox.Shutdown, 1*time.Second)
1241 return -1
1242 }
1243
1244 for _, m := range msgs {
1245 busyDomains[m.RecipientDomainStr] = struct{}{}
1246 go deliver(log, resolver, m)
1247 }
1248 return len(msgs)
1249}
1250
1251// todo future: we may consider keeping message files around for a while after retiring. especially for failures to deliver. to inspect what exactly wasn't delivered.
1252
1253func removeMsgsFS(log mlog.Log, msgs ...Msg) error {
1254 var errs []string
1255 for _, m := range msgs {
1256 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
1257 if err := os.Remove(p); err != nil {
1258 errs = append(errs, fmt.Sprintf("%s: %v", p, err))
1259 }
1260 }
1261 if len(errs) > 0 {
1262 return fmt.Errorf("removing message files from queue: %s", strings.Join(errs, "; "))
1263 }
1264 return nil
1265}
1266
1267// Move one or more messages to retire list or remove it. Webhooks are scheduled.
1268// IDs of msgs in suppressedMsgIDs caused a suppression to be added.
1269//
1270// Callers should update Msg.Results before calling.
1271//
1272// Callers must remove the messages from the file system afterwards, see
1273// removeMsgsFS. Callers must also kick the message and webhook queues.
1274func retireMsgs(log mlog.Log, tx *bstore.Tx, event webhook.OutgoingEvent, code int, secode string, suppressedMsgIDs []int64, msgs ...Msg) error {
1275 now := time.Now()
1276
1277 var hooks []Hook
1278 m0 := msgs[0]
1279 accConf, ok := mox.Conf.Account(m0.SenderAccount)
1280 var hookURL string
1281 if accConf.OutgoingWebhook != nil {
1282 hookURL = accConf.OutgoingWebhook.URL
1283 }
1284 log.Debug("retiring messages from queue", slog.Any("event", event), slog.String("account", m0.SenderAccount), slog.Bool("ok", ok), slog.String("webhookurl", hookURL))
1285 if hookURL != "" && (len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(event))) {
1286 for _, m := range msgs {
1287 suppressing := slices.Contains(suppressedMsgIDs, m.ID)
1288 h, err := hookCompose(m, hookURL, accConf.OutgoingWebhook.Authorization, event, suppressing, code, secode)
1289 if err != nil {
1290 log.Errorx("composing webhooks while retiring messages from queue, not queueing hook for message", err, slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
1291 } else {
1292 hooks = append(hooks, h)
1293 }
1294 }
1295 }
1296
1297 msgKeep := 24 * 7 * time.Hour
1298 hookKeep := 24 * 7 * time.Hour
1299 if ok {
1300 msgKeep = accConf.KeepRetiredMessagePeriod
1301 hookKeep = accConf.KeepRetiredWebhookPeriod
1302 }
1303
1304 for _, m := range msgs {
1305 if err := tx.Delete(&m); err != nil {
1306 return err
1307 }
1308 }
1309 if msgKeep > 0 {
1310 for _, m := range msgs {
1311 rm := m.Retired(event == webhook.EventDelivered, now, now.Add(msgKeep))
1312 if err := tx.Insert(&rm); err != nil {
1313 return err
1314 }
1315 }
1316 }
1317
1318 for i := range hooks {
1319 if err := hookInsert(tx, &hooks[i], now, hookKeep); err != nil {
1320 return fmt.Errorf("enqueueing webhooks while retiring messages from queue: %v", err)
1321 }
1322 }
1323
1324 if len(hooks) > 0 {
1325 for _, h := range hooks {
1326 log.Debug("queued webhook while retiring message from queue", h.attrs()...)
1327 }
1328 hookqueueKick()
1329 }
1330 return nil
1331}
1332
1333// deliver attempts to deliver a message.
1334// The queue is updated, either by removing a delivered or permanently failed
1335// message, or updating the time for the next attempt. A DSN may be sent.
1336func deliver(log mlog.Log, resolver dns.Resolver, m0 Msg) {
1337 ctx := mox.Shutdown
1338
1339 qlog := log.WithCid(mox.Cid()).With(
1340 slog.Any("from", m0.Sender()),
1341 slog.Int("attempts", m0.Attempts))
1342
1343 defer func() {
1344 deliveryResults <- formatIPDomain(m0.RecipientDomain)
1345
1346 x := recover()
1347 if x != nil {
1348 qlog.Error("deliver panic", slog.Any("panic", x), slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1349 debug.PrintStack()
1350 metrics.PanicInc(metrics.Queue)
1351 }
1352 }()
1353
1354 // We'll use a single transaction for the various checks, committing as soon as
1355 // we're done with it.
1356 xtx, err := DB.Begin(mox.Shutdown, true)
1357 if err != nil {
1358 qlog.Errorx("transaction for gathering messages to deliver", err)
1359 return
1360 }
1361 defer func() {
1362 if xtx != nil {
1363 err := xtx.Rollback()
1364 qlog.Check(err, "rolling back transaction after error delivering")
1365 }
1366 }()
1367
1368 // We register this attempt by setting LastAttempt, adding an empty Result, and
1369 // already setting NextAttempt in the future with exponential backoff. If we run
1370 // into trouble delivery below, at least we won't be bothering the receiving server
1371 // with our problems.
1372 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
1373 // 8h, 16h (send permanent failure DSN).
1374 // ../rfc/5321:3703
1375 // todo future: make the back off times configurable. ../rfc/5321:3713
1376 now := time.Now()
1377 var backoff time.Duration
1378 var origNextAttempt time.Time
1379 prepare := func() error {
1380 // Refresh message within transaction.
1381 m0 = Msg{ID: m0.ID}
1382 if err := xtx.Get(&m0); err != nil {
1383 return fmt.Errorf("get message to be delivered: %v", err)
1384 }
1385
1386 backoff = time.Duration(7*60+30+jitter.IntN(10)-5) * time.Second
1387 for range m0.Attempts {
1388 backoff *= time.Duration(2)
1389 }
1390 m0.Attempts++
1391 origNextAttempt = m0.NextAttempt
1392 m0.LastAttempt = &now
1393 m0.NextAttempt = now.Add(backoff)
1394 m0.Results = append(m0.Results, MsgResult{Start: now, Error: resultErrorDelivering})
1395 if err := xtx.Update(&m0); err != nil {
1396 return fmt.Errorf("update message to be delivered: %v", err)
1397 }
1398 return nil
1399 }
1400 if err := prepare(); err != nil {
1401 qlog.Errorx("storing delivery attempt", err, slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1402 return
1403 }
1404
1405 var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
1406
1407 // If domain of sender is currently disabled, fail the delivery attempt.
1408 if domConf, _ := mox.Conf.Domain(m0.SenderDomain.Domain); domConf.Disabled {
1409 failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, fmt.Errorf("domain of sender temporarily disabled"))
1410 err = xtx.Commit()
1411 qlog.Check(err, "commit processing failure to deliver messages")
1412 xtx = nil
1413 kick()
1414 return
1415 }
1416
1417 // Check if recipient is on suppression list. If so, fail delivery.
1418 path := smtp.Path{Localpart: m0.RecipientLocalpart, IPDomain: m0.RecipientDomain}
1419 baseAddr := baseAddress(path).XString(true)
1420 qsup := bstore.QueryTx[webapi.Suppression](xtx)
1421 qsup.FilterNonzero(webapi.Suppression{Account: m0.SenderAccount, BaseAddress: baseAddr})
1422 exists, err := qsup.Exists()
1423 if err != nil || exists {
1424 if err != nil {
1425 qlog.Errorx("checking whether recipient address is in suppression list", err)
1426 } else {
1427 err := fmt.Errorf("not delivering to recipient address %s: %w", path.XString(true), errSuppressed)
1428 err = smtpclient.Error{Permanent: true, Err: err}
1429 failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, err)
1430 }
1431 err = xtx.Commit()
1432 qlog.Check(err, "commit processing failure to deliver messages")
1433 xtx = nil
1434 kick()
1435 return
1436 }
1437
1438 resolveTransport := func(mm Msg) (string, config.Transport, bool) {
1439 if mm.Transport != "" {
1440 transport, ok := mox.Conf.Static.Transports[mm.Transport]
1441 if !ok {
1442 return "", config.Transport{}, false
1443 }
1444 return mm.Transport, transport, ok
1445 }
1446 route := findRoute(mm.Attempts, mm)
1447 return route.Transport, route.ResolvedTransport, true
1448 }
1449
1450 // Find route for transport to use for delivery attempt.
1451 m0.Attempts--
1452 transportName, transport, transportOK := resolveTransport(m0)
1453 m0.Attempts++
1454 if !transportOK {
1455 failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, fmt.Errorf("cannot find transport %q", m0.Transport))
1456 err = xtx.Commit()
1457 qlog.Check(err, "commit processing failure to deliver messages")
1458 xtx = nil
1459 kick()
1460 return
1461 }
1462
1463 if transportName != "" {
1464 qlog = qlog.With(slog.String("transport", transportName))
1465 qlog.Debug("delivering with transport")
1466 }
1467
1468 // Attempt to gather more recipients for this identical message, only with the same
1469 // recipient domain, and under the same conditions (recipientdomain, attempts,
1470 // requiretls, transport). ../rfc/5321:3759
1471 msgs := []*Msg{&m0}
1472 if m0.BaseID != 0 {
1473 gather := func() error {
1474 q := bstore.QueryTx[Msg](xtx)
1475 q.FilterNonzero(Msg{BaseID: m0.BaseID, RecipientDomainStr: m0.RecipientDomainStr, Attempts: m0.Attempts - 1})
1476 q.FilterNotEqual("ID", m0.ID)
1477 q.FilterLessEqual("NextAttempt", origNextAttempt)
1478 q.FilterEqual("Hold", false)
1479 err := q.ForEach(func(xm Msg) error {
1480 mrtls := m0.RequireTLS != nil
1481 xmrtls := xm.RequireTLS != nil
1482 if mrtls != xmrtls || mrtls && *m0.RequireTLS != *xm.RequireTLS {
1483 return nil
1484 }
1485 tn, _, ok := resolveTransport(xm)
1486 if ok && tn == transportName {
1487 msgs = append(msgs, &xm)
1488 }
1489 return nil
1490 })
1491 if err != nil {
1492 return fmt.Errorf("looking up more recipients: %v", err)
1493 }
1494
1495 // Mark these additional messages as attempted too.
1496 for _, mm := range msgs[1:] {
1497 mm.Attempts++
1498 mm.NextAttempt = m0.NextAttempt
1499 mm.LastAttempt = m0.LastAttempt
1500 mm.Results = append(mm.Results, MsgResult{Start: now, Error: resultErrorDelivering})
1501 if err := xtx.Update(mm); err != nil {
1502 return fmt.Errorf("updating more message recipients for smtp transaction: %v", err)
1503 }
1504 }
1505 return nil
1506 }
1507 if err := gather(); err != nil {
1508 qlog.Errorx("error finding more recipients for message, will attempt to send to single recipient", err)
1509 msgs = msgs[:1]
1510 }
1511 }
1512
1513 if err := xtx.Commit(); err != nil {
1514 qlog.Errorx("commit of preparation to deliver", err, slog.Any("msgid", m0.ID))
1515 return
1516 }
1517 xtx = nil
1518
1519 if len(msgs) > 1 {
1520 ids := make([]int64, len(msgs))
1521 rcpts := make([]smtp.Path, len(msgs))
1522 for i, m := range msgs {
1523 ids[i] = m.ID
1524 rcpts[i] = m.Recipient()
1525 }
1526 qlog.Debug("delivering to multiple recipients", slog.Any("msgids", ids), slog.Any("recipients", rcpts))
1527 } else {
1528 qlog.Debug("delivering to single recipient", slog.Any("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1529 }
1530
1531 if Localserve {
1532 deliverLocalserve(ctx, qlog, msgs, backoff)
1533 return
1534 }
1535
1536 // We gather TLS connection successes and failures during delivery, and we store
1537 // them in tlsrptdb. Every 24 hours we send an email with a report to the recipient
1538 // domains that opt in via a TLSRPT DNS record. For us, the tricky part is
1539 // collecting all reporting information. We've got several TLS modes
1540 // (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
1541 // Failures can happen at various levels: MTA-STS policies (apply to whole delivery
1542 // attempt/domain), MX targets (possibly multiple per delivery attempt, both for
1543 // MTA-STS and DANE).
1544 //
1545 // Once the SMTP client has tried a TLS handshake, we register success/failure,
1546 // regardless of what happens next on the connection. We also register failures
1547 // when they happen before we get to the SMTP client, but only if they are related
1548 // to TLS (and some DNSSEC).
1549 var recipientDomainResult tlsrpt.Result
1550 var hostResults []tlsrpt.Result
1551 defer func() {
1552 if mox.Conf.Static.NoOutgoingTLSReports || m0.RecipientDomain.IsIP() {
1553 return
1554 }
1555
1556 now := time.Now()
1557 dayUTC := now.UTC().Format("20060102")
1558
1559 // See if this contains a failure. If not, we'll mark TLS results for delivering
1560 // DMARC reports SendReport false, so we won't as easily get into a report sending
1561 // loop.
1562 var failure bool
1563 for _, result := range hostResults {
1564 if result.Summary.TotalFailureSessionCount > 0 {
1565 failure = true
1566 break
1567 }
1568 }
1569 if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
1570 failure = true
1571 }
1572
1573 results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
1574 tlsaPolicyDomains := map[string]bool{}
1575 addResult := func(r tlsrpt.Result, isHost bool) {
1576 var zerotype tlsrpt.PolicyType
1577 if r.Policy.Type == zerotype {
1578 return
1579 }
1580
1581 // Ensure we store policy domain in unicode in database.
1582 policyDomain, err := dns.ParseDomain(r.Policy.Domain)
1583 if err != nil {
1584 qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
1585 return
1586 }
1587
1588 if r.Policy.Type == tlsrpt.TLSA {
1589 tlsaPolicyDomains[policyDomain.ASCII] = true
1590 }
1591
1592 tlsResult := tlsrptdb.TLSResult{
1593 PolicyDomain: policyDomain.Name(),
1594 DayUTC: dayUTC,
1595 RecipientDomain: m0.RecipientDomain.Domain.Name(),
1596 IsHost: isHost,
1597 SendReport: !m0.IsTLSReport && (!m0.IsDMARCReport || failure),
1598 Results: []tlsrpt.Result{r},
1599 }
1600 results = append(results, tlsResult)
1601 }
1602 for _, result := range hostResults {
1603 addResult(result, true)
1604 }
1605 // If we were delivering to a mail host directly (not a domain with MX records), we
1606 // are more likely to get a TLSA policy than an STS policy. Don't potentially
1607 // confuse operators with both a tlsa and no-policy-found result.
1608 // todo spec: ../rfc/8460:440 an explicit no-sts-policy result would be useful.
1609 if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
1610 addResult(recipientDomainResult, false)
1611 }
1612
1613 if len(results) > 0 {
1614 err := tlsrptdb.AddTLSResults(context.Background(), results)
1615 qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
1616 }
1617 }()
1618
1619 var dialer smtpclient.Dialer = &net.Dialer{}
1620 if transport.Submissions != nil {
1621 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submissions, true, 465)
1622 } else if transport.Submission != nil {
1623 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submission, false, 587)
1624 } else if transport.SMTP != nil {
1625 // todo future: perhaps also gather tlsrpt results for submissions.
1626 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.SMTP, false, 25)
1627 } else {
1628 ourHostname := mox.Conf.Static.HostnameDomain
1629 if transport.Socks != nil {
1630 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
1631 if err != nil {
1632 failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer: %v", err))
1633 return
1634 } else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
1635 failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer is not a contextdialer"))
1636 return
1637 } else {
1638 dialer = d
1639 }
1640 ourHostname = transport.Socks.Hostname
1641 }
1642 recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, transport.Direct, msgs, backoff)
1643 }
1644}
1645
1646func findRoute(attempt int, m Msg) config.Route {
1647 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
1648 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
1649 return r
1650 }
1651 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
1652 return r
1653 }
1654 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
1655 return r
1656 }
1657 return config.Route{}
1658}
1659
1660func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
1661 for _, r := range routes {
1662 if routeMatch(attempt, m, r) {
1663 return r, true
1664 }
1665 }
1666 return config.Route{}, false
1667}
1668
1669func routeMatch(attempt int, m Msg, r config.Route) bool {
1670 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
1671}
1672
1673func routeMatchDomain(l []string, d dns.Domain) bool {
1674 if len(l) == 0 {
1675 return true
1676 }
1677 for _, e := range l {
1678 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
1679 return true
1680 }
1681 }
1682 return false
1683}
1684
1685// Returns string representing delivery result for err, and number of delivered and
1686// failed messages.
1687//
1688// Values: ok, okpartial, timeout, canceled, temperror, permerror, error.
1689func deliveryResult(err error, delivered, failed int) string {
1690 var cerr smtpclient.Error
1691 switch {
1692 case err == nil:
1693 if delivered == 0 {
1694 return "error"
1695 } else if failed > 0 {
1696 return "okpartial"
1697 }
1698 return "ok"
1699 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
1700 return "timeout"
1701 case errors.Is(err, context.Canceled):
1702 return "canceled"
1703 case errors.As(err, &cerr):
1704 if cerr.Permanent {
1705 return "permerror"
1706 }
1707 return "temperror"
1708 }
1709 return "error"
1710}
1711