1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
4package queue
5
6import (
7 "context"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime/debug"
16 "sort"
17 "strings"
18 "time"
19
20 "golang.org/x/net/proxy"
21
22 "github.com/prometheus/client_golang/prometheus"
23 "github.com/prometheus/client_golang/prometheus/promauto"
24
25 "github.com/mjl-/bstore"
26
27 "github.com/mjl-/mox/config"
28 "github.com/mjl-/mox/dns"
29 "github.com/mjl-/mox/dsn"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/moxio"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/smtpclient"
36 "github.com/mjl-/mox/store"
37 "github.com/mjl-/mox/tlsrpt"
38 "github.com/mjl-/mox/tlsrptdb"
39)
40
41var (
42 metricConnection = promauto.NewCounterVec(
43 prometheus.CounterOpts{
44 Name: "mox_queue_connection_total",
45 Help: "Queue client connections, outgoing.",
46 },
47 []string{
48 "result", // "ok", "timeout", "canceled", "error"
49 },
50 )
51 metricDelivery = promauto.NewHistogramVec(
52 prometheus.HistogramOpts{
53 Name: "mox_queue_delivery_duration_seconds",
54 Help: "SMTP client delivery attempt to single host.",
55 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
56 },
57 []string{
58 "attempt", // Number of attempts.
59 "transport", // empty for default direct delivery.
60 "tlsmode", // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
61 "result", // ok, timeout, canceled, temperror, permerror, error
62 },
63 )
64)
65
66var jitter = mox.NewPseudoRand()
67
68var DBTypes = []any{Msg{}} // Types stored in DB.
69var DB *bstore.DB // Exported for making backups.
70
71// Allow requesting delivery starting from up to this interval from time of submission.
72const FutureReleaseIntervalMax = 60 * 24 * time.Hour
73
74// Set for mox localserve, to prevent queueing.
75var Localserve bool
76
77// Msg is a message in the queue.
78//
79// Use MakeMsg to make a message with fields that Add needs. Add will further set
80// queueing related fields.
81type Msg struct {
82 ID int64
83
84 // A message for multiple recipients will get a BaseID that is identical to the
85 // first Msg.ID queued. The message contents will be identical for each recipient,
86 // including MsgPrefix. If other properties are identical too, including recipient
87 // domain, multiple Msgs may be delivered in a single SMTP transaction. For
88 // messages with a single recipient, this field will be 0.
89 BaseID int64 `bstore:"index"`
90
91 Queued time.Time `bstore:"default now"`
92 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
93 SenderLocalpart smtp.Localpart // Should be a local user and domain.
94 SenderDomain dns.IPDomain
95 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
96 RecipientDomain dns.IPDomain
97 RecipientDomainStr string // For filtering.
98 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
99 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
100 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
101 NextAttempt time.Time // For scheduling.
102 LastAttempt *time.Time
103 LastError string
104
105 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
106 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
107 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
108 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
109 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
110 MessageID string // Used when composing a DSN, in its References header.
111 MsgPrefix []byte
112
113 // If set, this message is a DSN and this is a version using utf-8, for the case
114 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
115 // relevant.
116 DSNUTF8 []byte
117
118 // If non-empty, the transport to use for this message. Can be set through cli or
119 // admin interface. If empty (the default for a submitted message), regular routing
120 // rules apply.
121 Transport string
122
123 // RequireTLS influences TLS verification during delivery.
124 //
125 // If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
126 // back to optional opportunistic non-verified STARTTLS.
127 //
128 // If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
129 // MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
130 // server.
131 //
132 // If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
133 // domain's policy is ignored if it does not lead to a successful TLS connection,
134 // i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
135 RequireTLS *bool
136 // ../rfc/8689:250
137
138 // For DSNs, where the original FUTURERELEASE value must be included as per-message
139 // field. This field should be of the form "for;" plus interval, or "until;" plus
140 // utc date-time.
141 FutureReleaseRequest string
142 // ../rfc/4865:305
143}
144
145// Sender of message as used in MAIL FROM.
146func (m Msg) Sender() smtp.Path {
147 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
148}
149
150// Recipient of message as used in RCPT TO.
151func (m Msg) Recipient() smtp.Path {
152 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
153}
154
155// MessagePath returns the path where the message is stored.
156func (m Msg) MessagePath() string {
157 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
158}
159
160// Init opens the queue database without starting delivery.
161func Init() error {
162 qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
163 os.MkdirAll(filepath.Dir(qpath), 0770)
164 isNew := false
165 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
166 isNew = true
167 }
168
169 var err error
170 DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
171 if err != nil {
172 if isNew {
173 os.Remove(qpath)
174 }
175 return fmt.Errorf("open queue database: %s", err)
176 }
177 return nil
178}
179
180// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
181func Shutdown() {
182 err := DB.Close()
183 if err != nil {
184 mlog.New("queue", nil).Errorx("closing queue db", err)
185 }
186 DB = nil
187}
188
189// List returns all messages in the delivery queue.
190// Ordered by earliest delivery attempt first.
191func List(ctx context.Context) ([]Msg, error) {
192 qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
193 if err != nil {
194 return nil, err
195 }
196 sort.Slice(qmsgs, func(i, j int) bool {
197 a := qmsgs[i]
198 b := qmsgs[j]
199 la := a.LastAttempt != nil
200 lb := b.LastAttempt != nil
201 if !la && lb {
202 return true
203 } else if la && !lb {
204 return false
205 }
206 if !la && !lb || a.LastAttempt.Equal(*b.LastAttempt) {
207 return a.ID < b.ID
208 }
209 return a.LastAttempt.Before(*b.LastAttempt)
210 })
211 return qmsgs, nil
212}
213
214// Count returns the number of messages in the delivery queue.
215func Count(ctx context.Context) (int, error) {
216 return bstore.QueryDB[Msg](ctx, DB).Count()
217}
218
219// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
220func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time) Msg {
221 return Msg{
222 SenderLocalpart: sender.Localpart,
223 SenderDomain: sender.IPDomain,
224 RecipientLocalpart: recipient.Localpart,
225 RecipientDomain: recipient.IPDomain,
226 RecipientDomainStr: formatIPDomain(recipient.IPDomain),
227 Has8bit: has8bit,
228 SMTPUTF8: smtputf8,
229 Size: size,
230 MessageID: messageID,
231 MsgPrefix: prefix,
232 RequireTLS: requireTLS,
233 Queued: time.Now(),
234 NextAttempt: next,
235 }
236}
237
238// Add one or more new messages to the queue. They'll get the same BaseID, so they
239// can be delivered in a single SMTP transaction, with a single DATA command, but
240// may be split into multiple transactions if errors/limits are encountered. The
241// queue is kicked immediately to start a first delivery attempt.
242//
243// ID of the messagse must be 0 and will be set after inserting in the queue.
244//
245// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
246// such as Queued, NextAttempt, LastAttempt, LastError.
247func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
248 if len(qml) == 0 {
249 return fmt.Errorf("must queue at least one message")
250 }
251
252 for _, qm := range qml {
253 if qm.ID != 0 {
254 return fmt.Errorf("id of queued messages must be 0")
255 }
256 if qm.RecipientDomainStr == "" {
257 return fmt.Errorf("recipient domain cannot be empty")
258 }
259 // Sanity check, internal consistency.
260 rcptDom := formatIPDomain(qm.RecipientDomain)
261 if qm.RecipientDomainStr != rcptDom {
262 return fmt.Errorf("mismatch between recipient domain and string form of domain")
263 }
264 }
265
266 if Localserve {
267 if senderAccount == "" {
268 return fmt.Errorf("cannot queue with localserve without local account")
269 }
270 acc, err := store.OpenAccount(log, senderAccount)
271 if err != nil {
272 return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
273 }
274 defer func() {
275 err := acc.Close()
276 log.Check(err, "closing account")
277 }()
278 conf, _ := acc.Conf()
279 err = nil
280 acc.WithWLock(func() {
281 for i, qm := range qml {
282 qml[i].SenderAccount = senderAccount
283 m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
284 dest := conf.Destinations[qm.Sender().String()]
285 err = acc.DeliverDestination(log, dest, &m, msgFile)
286 if err != nil {
287 err = fmt.Errorf("delivering message: %v", err)
288 return // Returned again outside WithWLock.
289 }
290 }
291 })
292 if err == nil {
293 log.Debug("immediately delivered from queue to sender")
294 }
295 return err
296 }
297
298 tx, err := DB.Begin(ctx, true)
299 if err != nil {
300 return fmt.Errorf("begin transaction: %w", err)
301 }
302 defer func() {
303 if tx != nil {
304 if err := tx.Rollback(); err != nil {
305 log.Errorx("rollback for queue", err)
306 }
307 }
308 }()
309
310 // Insert messages into queue. If there are multiple messages, they all get a
311 // non-zero BaseID that is the Msg.ID of the first message inserted.
312 var baseID int64
313 for i := range qml {
314 qml[i].SenderAccount = senderAccount
315 qml[i].BaseID = baseID
316 if err := tx.Insert(&qml[i]); err != nil {
317 return err
318 }
319 if i == 0 && len(qml) > 1 {
320 baseID = qml[i].ID
321 qml[i].BaseID = baseID
322 if err := tx.Update(&qml[i]); err != nil {
323 return err
324 }
325 }
326 }
327
328 var paths []string
329 defer func() {
330 for _, p := range paths {
331 err := os.Remove(p)
332 log.Check(err, "removing destination message file for queue", slog.String("path", p))
333 }
334 }()
335
336 for _, qm := range qml {
337 dst := qm.MessagePath()
338 paths = append(paths, dst)
339 dstDir := filepath.Dir(dst)
340 os.MkdirAll(dstDir, 0770)
341 if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
342 return fmt.Errorf("linking/copying message to new file: %s", err)
343 } else if err := moxio.SyncDir(log, dstDir); err != nil {
344 return fmt.Errorf("sync directory: %v", err)
345 }
346 }
347
348 if err := tx.Commit(); err != nil {
349 return fmt.Errorf("commit transaction: %s", err)
350 }
351 tx = nil
352 paths = nil
353
354 queuekick()
355 return nil
356}
357
358func formatIPDomain(d dns.IPDomain) string {
359 if len(d.IP) > 0 {
360 return "[" + d.IP.String() + "]"
361 }
362 return d.Domain.Name()
363}
364
365var (
366 kick = make(chan struct{}, 1)
367 deliveryResults = make(chan string, 1)
368)
369
370func queuekick() {
371 select {
372 case kick <- struct{}{}:
373 default:
374 }
375}
376
377// Kick sets the NextAttempt for messages matching all filter parameters (ID,
378// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
379// of those messages. If all parameters are zero, all messages are kicked. If
380// transport is set, the delivery attempts for the matching messages will use the
381// transport. An empty string is the default transport, i.e. direct delivery.
382// Returns number of messages queued for immediate delivery.
383func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
384 q := bstore.QueryDB[Msg](ctx, DB)
385 if ID > 0 {
386 q.FilterID(ID)
387 }
388 if toDomain != "" {
389 q.FilterEqual("RecipientDomainStr", toDomain)
390 }
391 if recipient != "" {
392 q.FilterFn(func(qm Msg) bool {
393 return qm.Recipient().XString(true) == recipient
394 })
395 }
396 up := map[string]any{"NextAttempt": time.Now()}
397 if transport != nil {
398 if *transport != "" {
399 _, ok := mox.Conf.Static.Transports[*transport]
400 if !ok {
401 return 0, fmt.Errorf("unknown transport %q", *transport)
402 }
403 }
404 up["Transport"] = *transport
405 }
406 n, err := q.UpdateFields(up)
407 if err != nil {
408 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
409 }
410 queuekick()
411 return n, nil
412}
413
414// Drop removes messages from the queue that match all nonzero parameters.
415// If all parameters are zero, all messages are removed.
416// Returns number of messages removed.
417func Drop(ctx context.Context, log mlog.Log, ID int64, toDomain string, recipient string) (int, error) {
418 q := bstore.QueryDB[Msg](ctx, DB)
419 if ID > 0 {
420 q.FilterID(ID)
421 }
422 if toDomain != "" {
423 q.FilterEqual("RecipientDomainStr", toDomain)
424 }
425 if recipient != "" {
426 q.FilterFn(func(qm Msg) bool {
427 return qm.Recipient().XString(true) == recipient
428 })
429 }
430 var msgs []Msg
431 q.Gather(&msgs)
432 n, err := q.Delete()
433 if err != nil {
434 return 0, fmt.Errorf("selecting and deleting messages from queue: %v", err)
435 }
436 for _, m := range msgs {
437 p := m.MessagePath()
438 if err := os.Remove(p); err != nil {
439 log.Errorx("removing queue message from file system", err, slog.Int64("queuemsgid", m.ID), slog.String("path", p))
440 }
441 }
442 return n, nil
443}
444
445// SaveRequireTLS updates the RequireTLS field of the message with id.
446func SaveRequireTLS(ctx context.Context, id int64, requireTLS *bool) error {
447 return DB.Write(ctx, func(tx *bstore.Tx) error {
448 m := Msg{ID: id}
449 if err := tx.Get(&m); err != nil {
450 return fmt.Errorf("get message: %w", err)
451 }
452 m.RequireTLS = requireTLS
453 return tx.Update(&m)
454 })
455}
456
457type ReadReaderAtCloser interface {
458 io.ReadCloser
459 io.ReaderAt
460}
461
462// OpenMessage opens a message present in the queue.
463func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
464 qm := Msg{ID: id}
465 err := DB.Get(ctx, &qm)
466 if err != nil {
467 return nil, err
468 }
469 f, err := os.Open(qm.MessagePath())
470 if err != nil {
471 return nil, fmt.Errorf("open message file: %s", err)
472 }
473 r := store.FileMsgReader(qm.MsgPrefix, f)
474 return r, err
475}
476
477const maxConcurrentDeliveries = 10
478
479// Start opens the database by calling Init, then starts the delivery process.
480func Start(resolver dns.Resolver, done chan struct{}) error {
481 if err := Init(); err != nil {
482 return err
483 }
484
485 log := mlog.New("queue", nil)
486
487 // High-level delivery strategy advice: ../rfc/5321:3685
488 go func() {
489 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
490 busyDomains := map[string]struct{}{}
491
492 timer := time.NewTimer(0)
493
494 for {
495 select {
496 case <-mox.Shutdown.Done():
497 done <- struct{}{}
498 return
499 case <-kick:
500 case <-timer.C:
501 case domain := <-deliveryResults:
502 delete(busyDomains, domain)
503 }
504
505 if len(busyDomains) >= maxConcurrentDeliveries {
506 continue
507 }
508
509 launchWork(log, resolver, busyDomains)
510 timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
511 }
512 }()
513 return nil
514}
515
516func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
517 q := bstore.QueryDB[Msg](ctx, DB)
518 if len(busyDomains) > 0 {
519 var doms []any
520 for d := range busyDomains {
521 doms = append(doms, d)
522 }
523 q.FilterNotEqual("RecipientDomainStr", doms...)
524 }
525 q.SortAsc("NextAttempt")
526 q.Limit(1)
527 qm, err := q.Get()
528 if err == bstore.ErrAbsent {
529 return 24 * time.Hour
530 } else if err != nil {
531 log.Errorx("finding time for next delivery attempt", err)
532 return 1 * time.Minute
533 }
534 return time.Until(qm.NextAttempt)
535}
536
537func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
538 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
539 q.FilterLessEqual("NextAttempt", time.Now())
540 q.SortAsc("NextAttempt")
541 q.Limit(maxConcurrentDeliveries)
542 if len(busyDomains) > 0 {
543 var doms []any
544 for d := range busyDomains {
545 doms = append(doms, d)
546 }
547 q.FilterNotEqual("RecipientDomainStr", doms...)
548 }
549 var msgs []Msg
550 seen := map[string]bool{}
551 err := q.ForEach(func(m Msg) error {
552 dom := m.RecipientDomainStr
553 if _, ok := busyDomains[dom]; !ok && !seen[dom] {
554 seen[dom] = true
555 msgs = append(msgs, m)
556 }
557 return nil
558 })
559 if err != nil {
560 log.Errorx("querying for work in queue", err)
561 mox.Sleep(mox.Shutdown, 1*time.Second)
562 return -1
563 }
564
565 for _, m := range msgs {
566 busyDomains[m.RecipientDomainStr] = struct{}{}
567 go deliver(log, resolver, m)
568 }
569 return len(msgs)
570}
571
572// Remove message from queue in database and file system.
573func queueDelete(ctx context.Context, msgIDs ...int64) error {
574 err := DB.Write(ctx, func(tx *bstore.Tx) error {
575 for _, id := range msgIDs {
576 if err := tx.Delete(&Msg{ID: id}); err != nil {
577 return err
578 }
579 }
580 return nil
581 })
582 if err != nil {
583 return err
584 }
585 // If removing from database fails, we'll also leave the file in the file system.
586
587 var errs []string
588 for _, id := range msgIDs {
589 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(id)))
590 if err := os.Remove(p); err != nil {
591 errs = append(errs, fmt.Sprintf("%s: %v", p, err))
592 }
593 }
594 if len(errs) > 0 {
595 return fmt.Errorf("removing message files from queue: %s", strings.Join(errs, "; "))
596 }
597 return nil
598}
599
600// deliver attempts to deliver a message.
601// The queue is updated, either by removing a delivered or permanently failed
602// message, or updating the time for the next attempt. A DSN may be sent.
603func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
604 ctx := mox.Shutdown
605
606 qlog := log.WithCid(mox.Cid()).With(
607 slog.Any("from", m.Sender()),
608 slog.Int("attempts", m.Attempts))
609
610 defer func() {
611 deliveryResults <- formatIPDomain(m.RecipientDomain)
612
613 x := recover()
614 if x != nil {
615 qlog.Error("deliver panic", slog.Any("panic", x), slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
616 debug.PrintStack()
617 metrics.PanicInc(metrics.Queue)
618 }
619 }()
620
621 // We register this attempt by setting last_attempt, and already next_attempt time
622 // in the future with exponential backoff. If we run into trouble delivery below,
623 // at least we won't be bothering the receiving server with our problems.
624 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
625 // 8h, 16h (send permanent failure DSN).
626 // ../rfc/5321:3703
627 // todo future: make the back off times configurable. ../rfc/5321:3713
628 backoff := time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
629 for i := 0; i < m.Attempts; i++ {
630 backoff *= time.Duration(2)
631 }
632 m.Attempts++
633 origNextAttempt := m.NextAttempt
634 now := time.Now()
635 m.LastAttempt = &now
636 m.NextAttempt = now.Add(backoff)
637 qup := bstore.QueryDB[Msg](mox.Shutdown, DB)
638 qup.FilterID(m.ID)
639 update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
640 if _, err := qup.UpdateNonzero(update); err != nil {
641 qlog.Errorx("storing delivery attempt", err, slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
642 return
643 }
644
645 resolveTransport := func(mm Msg) (string, config.Transport, bool) {
646 if mm.Transport != "" {
647 transport, ok := mox.Conf.Static.Transports[mm.Transport]
648 if !ok {
649 return "", config.Transport{}, false
650 }
651 return mm.Transport, transport, ok
652 }
653 route := findRoute(mm.Attempts, mm)
654 return route.Transport, route.ResolvedTransport, true
655 }
656
657 // Find route for transport to use for delivery attempt.
658 m.Attempts--
659 transportName, transport, transportOK := resolveTransport(m)
660 m.Attempts++
661 if !transportOK {
662 var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
663 fail(ctx, qlog, []*Msg{&m}, m.DialedIPs, backoff, remoteMTA, fmt.Errorf("cannot find transport %q", m.Transport))
664 return
665 }
666
667 if transportName != "" {
668 qlog = qlog.With(slog.String("transport", transportName))
669 qlog.Debug("delivering with transport")
670 }
671
672 // Attempt to gather more recipients for this identical message, only with the same
673 // recipient domain, and under the same conditions (recipientdomain, attempts,
674 // requiretls, transport). ../rfc/5321:3759
675 msgs := []*Msg{&m}
676 if m.BaseID != 0 {
677 err := DB.Write(mox.Shutdown, func(tx *bstore.Tx) error {
678 q := bstore.QueryTx[Msg](tx)
679 q.FilterNonzero(Msg{BaseID: m.BaseID, RecipientDomainStr: m.RecipientDomainStr, Attempts: m.Attempts - 1})
680 q.FilterNotEqual("ID", m.ID)
681 q.FilterLessEqual("NextAttempt", origNextAttempt)
682 err := q.ForEach(func(xm Msg) error {
683 mrtls := m.RequireTLS != nil
684 xmrtls := xm.RequireTLS != nil
685 if mrtls != xmrtls || mrtls && *m.RequireTLS != *xm.RequireTLS {
686 return nil
687 }
688 tn, _, ok := resolveTransport(xm)
689 if ok && tn == transportName {
690 msgs = append(msgs, &xm)
691 }
692 return nil
693 })
694 if err != nil {
695 return fmt.Errorf("looking up more recipients: %v", err)
696 }
697
698 // Mark these additional messages as attempted too.
699 for _, mm := range msgs[1:] {
700 mm.Attempts++
701 mm.NextAttempt = m.NextAttempt
702 mm.LastAttempt = m.LastAttempt
703 if err := tx.Update(mm); err != nil {
704 return fmt.Errorf("updating more message recipients for smtp transaction: %v", err)
705 }
706 }
707 return nil
708 })
709 if err != nil {
710 qlog.Errorx("error finding more recipients for message, will attempt to send to single recipient", err)
711 msgs = msgs[:1]
712 }
713 }
714 if len(msgs) > 1 {
715 ids := make([]int64, len(msgs))
716 rcpts := make([]smtp.Path, len(msgs))
717 for i, m := range msgs {
718 ids[i] = m.ID
719 rcpts[i] = m.Recipient()
720 }
721 qlog.Debug("delivering to multiple recipients", slog.Any("msgids", ids), slog.Any("recipients", rcpts))
722 } else {
723 qlog.Debug("delivering to single recipient", slog.Any("msgid", m.ID), slog.Any("recipient", m.Recipient()))
724 }
725
726 // We gather TLS connection successes and failures during delivery, and we store
727 // them in tlsrptdb. Every 24 hours we send an email with a report to the recipient
728 // domains that opt in via a TLSRPT DNS record. For us, the tricky part is
729 // collecting all reporting information. We've got several TLS modes
730 // (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
731 // Failures can happen at various levels: MTA-STS policies (apply to whole delivery
732 // attempt/domain), MX targets (possibly multiple per delivery attempt, both for
733 // MTA-STS and DANE).
734 //
735 // Once the SMTP client has tried a TLS handshake, we register success/failure,
736 // regardless of what happens next on the connection. We also register failures
737 // when they happen before we get to the SMTP client, but only if they are related
738 // to TLS (and some DNSSEC).
739 var recipientDomainResult tlsrpt.Result
740 var hostResults []tlsrpt.Result
741 defer func() {
742 if mox.Conf.Static.NoOutgoingTLSReports || m.RecipientDomain.IsIP() {
743 return
744 }
745
746 now := time.Now()
747 dayUTC := now.UTC().Format("20060102")
748
749 // See if this contains a failure. If not, we'll mark TLS results for delivering
750 // DMARC reports SendReport false, so we won't as easily get into a report sending
751 // loop.
752 var failure bool
753 for _, result := range hostResults {
754 if result.Summary.TotalFailureSessionCount > 0 {
755 failure = true
756 break
757 }
758 }
759 if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
760 failure = true
761 }
762
763 results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
764 tlsaPolicyDomains := map[string]bool{}
765 addResult := func(r tlsrpt.Result, isHost bool) {
766 var zerotype tlsrpt.PolicyType
767 if r.Policy.Type == zerotype {
768 return
769 }
770
771 // Ensure we store policy domain in unicode in database.
772 policyDomain, err := dns.ParseDomain(r.Policy.Domain)
773 if err != nil {
774 qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
775 return
776 }
777
778 if r.Policy.Type == tlsrpt.TLSA {
779 tlsaPolicyDomains[policyDomain.ASCII] = true
780 }
781
782 tlsResult := tlsrptdb.TLSResult{
783 PolicyDomain: policyDomain.Name(),
784 DayUTC: dayUTC,
785 RecipientDomain: m.RecipientDomain.Domain.Name(),
786 IsHost: isHost,
787 SendReport: !m.IsTLSReport && (!m.IsDMARCReport || failure),
788 Results: []tlsrpt.Result{r},
789 }
790 results = append(results, tlsResult)
791 }
792 for _, result := range hostResults {
793 addResult(result, true)
794 }
795 // If we were delivering to a mail host directly (not a domain with MX records), we
796 // are more likely to get a TLSA policy than an STS policy. Don't potentially
797 // confuse operators with both a tlsa and no-policy-found result.
798 // todo spec: ../rfc/8460:440 an explicit no-sts-policy result would be useful.
799 if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
800 addResult(recipientDomainResult, false)
801 }
802
803 if len(results) > 0 {
804 err := tlsrptdb.AddTLSResults(context.Background(), results)
805 qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
806 }
807 }()
808
809 var dialer smtpclient.Dialer = &net.Dialer{}
810 if transport.Submissions != nil {
811 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submissions, true, 465)
812 } else if transport.Submission != nil {
813 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submission, false, 587)
814 } else if transport.SMTP != nil {
815 // todo future: perhaps also gather tlsrpt results for submissions.
816 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.SMTP, false, 25)
817 } else {
818 ourHostname := mox.Conf.Static.HostnameDomain
819 if transport.Socks != nil {
820 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
821 if err != nil {
822 fail(ctx, qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer: %v", err))
823 return
824 } else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
825 fail(ctx, qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer is not a contextdialer"))
826 return
827 } else {
828 dialer = d
829 }
830 ourHostname = transport.Socks.Hostname
831 }
832 recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, msgs, backoff)
833 }
834}
835
836func findRoute(attempt int, m Msg) config.Route {
837 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
838 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
839 return r
840 }
841 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
842 return r
843 }
844 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
845 return r
846 }
847 return config.Route{}
848}
849
850func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
851 for _, r := range routes {
852 if routeMatch(attempt, m, r) {
853 return r, true
854 }
855 }
856 return config.Route{}, false
857}
858
859func routeMatch(attempt int, m Msg, r config.Route) bool {
860 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
861}
862
863func routeMatchDomain(l []string, d dns.Domain) bool {
864 if len(l) == 0 {
865 return true
866 }
867 for _, e := range l {
868 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
869 return true
870 }
871 }
872 return false
873}
874
875// Returns string representing delivery result for err, and number of delivered and
876// failed messages.
877//
878// Values: ok, okpartial, timeout, canceled, temperror, permerror, error.
879func deliveryResult(err error, delivered, failed int) string {
880 var cerr smtpclient.Error
881 switch {
882 case err == nil:
883 if delivered == 0 {
884 return "error"
885 } else if failed > 0 {
886 return "okpartial"
887 }
888 return "ok"
889 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
890 return "timeout"
891 case errors.Is(err, context.Canceled):
892 return "canceled"
893 case errors.As(err, &cerr):
894 if cerr.Permanent {
895 return "permerror"
896 }
897 return "temperror"
898 }
899 return "error"
900}
901