1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
18 "golang.org/x/exp/slog"
19 "golang.org/x/net/proxy"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/prometheus/client_golang/prometheus/promauto"
24 "github.com/mjl-/bstore"
26 "github.com/mjl-/mox/config"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/dsn"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/smtpclient"
35 "github.com/mjl-/mox/store"
36 "github.com/mjl-/mox/tlsrpt"
37 "github.com/mjl-/mox/tlsrptdb"
41 metricConnection = promauto.NewCounterVec(
42 prometheus.CounterOpts{
43 Name: "mox_queue_connection_total",
44 Help: "Queue client connections, outgoing.",
47 "result", // "ok", "timeout", "canceled", "error"
50 metricDelivery = promauto.NewHistogramVec(
51 prometheus.HistogramOpts{
52 Name: "mox_queue_delivery_duration_seconds",
53 Help: "SMTP client delivery attempt to single host.",
54 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
57 "attempt", // Number of attempts.
58 "transport", // empty for default direct delivery.
59 "tlsmode", // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
60 "result", // ok, timeout, canceled, temperror, permerror, error
65var jitter = mox.NewPseudoRand()
67var DBTypes = []any{Msg{}} // Types stored in DB.
68var DB *bstore.DB // Exported for making backups.
70// Set for mox localserve, to prevent queueing.
73// Msg is a message in the queue.
75// Use MakeMsg to make a message with fields that Add needs. Add will further set
76// queueing related fields.
79 Queued time.Time `bstore:"default now"`
80 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
81 SenderLocalpart smtp.Localpart // Should be a local user and domain.
82 SenderDomain dns.IPDomain
83 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
84 RecipientDomain dns.IPDomain
85 RecipientDomainStr string // For filtering.
86 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
87 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
88 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
89 NextAttempt time.Time // For scheduling.
90 LastAttempt *time.Time
93 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
94 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
95 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
96 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
97 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
98 MessageID string // Used when composing a DSN, in its References header.
101 // If set, this message is a DSN and this is a version using utf-8, for the case
102 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
106 // If non-empty, the transport to use for this message. Can be set through cli or
107 // admin interface. If empty (the default for a submitted message), regular routing
111 // RequireTLS influences TLS verification during delivery.
113 // If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
114 // back to optional opportunistic non-verified STARTTLS.
116 // If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
117 // MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
120 // If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
121 // domain's policy is ignored if it does not lead to a successful TLS connection,
122 // i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
127// Sender of message as used in MAIL FROM.
128func (m Msg) Sender() smtp.Path {
129 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
132// Recipient of message as used in RCPT TO.
133func (m Msg) Recipient() smtp.Path {
134 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
137// MessagePath returns the path where the message is stored.
138func (m Msg) MessagePath() string {
139 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
142// Init opens the queue database without starting delivery.
144 qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
145 os.MkdirAll(filepath.Dir(qpath), 0770)
147 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
152 DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
157 return fmt.Errorf("open queue database: %s", err)
162// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
166 mlog.New("queue", nil).Errorx("closing queue db", err)
171// List returns all messages in the delivery queue.
172// Ordered by earliest delivery attempt first.
173func List(ctx context.Context) ([]Msg, error) {
174 qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
178 sort.Slice(qmsgs, func(i, j int) bool {
181 la := a.LastAttempt != nil
182 lb := b.LastAttempt != nil
185 } else if la && !lb {
188 if !la && !lb || a.LastAttempt.Equal(*b.LastAttempt) {
191 return a.LastAttempt.Before(*b.LastAttempt)
196// Count returns the number of messages in the delivery queue.
197func Count(ctx context.Context) (int, error) {
198 return bstore.QueryDB[Msg](ctx, DB).Count()
201// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
202func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
204 SenderAccount: mox.Conf.Static.Postmaster.Account,
205 SenderLocalpart: sender.Localpart,
206 SenderDomain: sender.IPDomain,
207 RecipientLocalpart: recipient.Localpart,
208 RecipientDomain: recipient.IPDomain,
212 MessageID: messageID,
214 RequireTLS: requireTLS,
218// Add a new message to the queue. The queue is kicked immediately to start a
219// first delivery attempt.
221// ID must be 0 and will be set after inserting in the queue.
223// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
224// such as Queued, NextAttempt, LastAttempt, LastError.
225func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
226 // todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once.
../rfc/5321:3759
229 return fmt.Errorf("id of queued message must be 0")
231 qm.Queued = time.Now()
233 qm.NextAttempt = qm.Queued
236 qm.RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
239 if qm.SenderAccount == "" {
240 return fmt.Errorf("cannot queue with localserve without local account")
242 acc, err := store.OpenAccount(log, qm.SenderAccount)
244 return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
248 log.Check(err, "closing account")
250 m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
251 conf, _ := acc.Conf()
252 dest := conf.Destinations[qm.Sender().String()]
253 acc.WithWLock(func() {
254 err = acc.DeliverDestination(log, dest, &m, msgFile)
257 return fmt.Errorf("delivering message: %v", err)
259 log.Debug("immediately delivered from queue to sender")
263 tx, err := DB.Begin(ctx, true)
265 return fmt.Errorf("begin transaction: %w", err)
269 if err := tx.Rollback(); err != nil {
270 log.Errorx("rollback for queue", err)
275 if err := tx.Insert(qm); err != nil {
279 dst := qm.MessagePath()
282 err := os.Remove(dst)
283 log.Check(err, "removing destination message file for queue", slog.String("path", dst))
286 dstDir := filepath.Dir(dst)
287 os.MkdirAll(dstDir, 0770)
288 if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
289 return fmt.Errorf("linking/copying message to new file: %s", err)
290 } else if err := moxio.SyncDir(log, dstDir); err != nil {
291 return fmt.Errorf("sync directory: %v", err)
294 if err := tx.Commit(); err != nil {
295 return fmt.Errorf("commit transaction: %s", err)
304func formatIPDomain(d dns.IPDomain) string {
306 return "[" + d.IP.String() + "]"
308 return d.Domain.Name()
312 kick = make(chan struct{}, 1)
313 deliveryResult = make(chan string, 1)
318 case kick <- struct{}{}:
323// Kick sets the NextAttempt for messages matching all filter parameters (ID,
324// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
325// of those messages. If all parameters are zero, all messages are kicked. If
326// transport is set, the delivery attempts for the matching messages will use the
327// transport. An empty string is the default transport, i.e. direct delivery.
328// Returns number of messages queued for immediate delivery.
329func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
330 q := bstore.QueryDB[Msg](ctx, DB)
335 q.FilterEqual("RecipientDomainStr", toDomain)
338 q.FilterFn(func(qm Msg) bool {
339 return qm.Recipient().XString(true) == recipient
342 up := map[string]any{"NextAttempt": time.Now()}
343 if transport != nil {
344 if *transport != "" {
345 _, ok := mox.Conf.Static.Transports[*transport]
347 return 0, fmt.Errorf("unknown transport %q", *transport)
350 up["Transport"] = *transport
352 n, err := q.UpdateFields(up)
354 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
360// Drop removes messages from the queue that match all nonzero parameters.
361// If all parameters are zero, all messages are removed.
362// Returns number of messages removed.
363func Drop(ctx context.Context, log mlog.Log, ID int64, toDomain string, recipient string) (int, error) {
364 q := bstore.QueryDB[Msg](ctx, DB)
369 q.FilterEqual("RecipientDomainStr", toDomain)
372 q.FilterFn(func(qm Msg) bool {
373 return qm.Recipient().XString(true) == recipient
380 return 0, fmt.Errorf("selecting and deleting messages from queue: %v", err)
382 for _, m := range msgs {
384 if err := os.Remove(p); err != nil {
385 log.Errorx("removing queue message from file system", err, slog.Int64("queuemsgid", m.ID), slog.String("path", p))
391// SaveRequireTLS updates the RequireTLS field of the message with id.
392func SaveRequireTLS(ctx context.Context, id int64, requireTLS *bool) error {
393 return DB.Write(ctx, func(tx *bstore.Tx) error {
395 if err := tx.Get(&m); err != nil {
396 return fmt.Errorf("get message: %w", err)
398 m.RequireTLS = requireTLS
403type ReadReaderAtCloser interface {
408// OpenMessage opens a message present in the queue.
409func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
411 err := DB.Get(ctx, &qm)
415 f, err := os.Open(qm.MessagePath())
417 return nil, fmt.Errorf("open message file: %s", err)
419 r := store.FileMsgReader(qm.MsgPrefix, f)
423const maxConcurrentDeliveries = 10
425// Start opens the database by calling Init, then starts the delivery process.
426func Start(resolver dns.Resolver, done chan struct{}) error {
427 if err := Init(); err != nil {
431 log := mlog.New("queue", nil)
435 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
436 busyDomains := map[string]struct{}{}
438 timer := time.NewTimer(0)
442 case <-mox.Shutdown.Done():
447 case domain := <-deliveryResult:
448 delete(busyDomains, domain)
451 if len(busyDomains) >= maxConcurrentDeliveries {
455 launchWork(log, resolver, busyDomains)
456 timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
462func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
463 q := bstore.QueryDB[Msg](ctx, DB)
464 if len(busyDomains) > 0 {
466 for d := range busyDomains {
467 doms = append(doms, d)
469 q.FilterNotEqual("RecipientDomainStr", doms...)
471 q.SortAsc("NextAttempt")
474 if err == bstore.ErrAbsent {
475 return 24 * time.Hour
476 } else if err != nil {
477 log.Errorx("finding time for next delivery attempt", err)
478 return 1 * time.Minute
480 return time.Until(qm.NextAttempt)
483func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
484 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
485 q.FilterLessEqual("NextAttempt", time.Now())
486 q.SortAsc("NextAttempt")
487 q.Limit(maxConcurrentDeliveries)
488 if len(busyDomains) > 0 {
490 for d := range busyDomains {
491 doms = append(doms, d)
493 q.FilterNotEqual("RecipientDomainStr", doms...)
495 msgs, err := q.List()
497 log.Errorx("querying for work in queue", err)
498 mox.Sleep(mox.Shutdown, 1*time.Second)
502 for _, m := range msgs {
503 busyDomains[formatIPDomain(m.RecipientDomain)] = struct{}{}
504 go deliver(log, resolver, m)
509// Remove message from queue in database and file system.
510func queueDelete(ctx context.Context, msgID int64) error {
511 if err := DB.Delete(ctx, &Msg{ID: msgID}); err != nil {
514 // If removing from database fails, we'll also leave the file in the file system.
516 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(msgID)))
517 if err := os.Remove(p); err != nil {
518 return fmt.Errorf("removing queue message from file system: %v", err)
524// deliver attempts to deliver a message.
525// The queue is updated, either by removing a delivered or permanently failed
526// message, or updating the time for the next attempt. A DSN may be sent.
527func deliver(log mlog.Log, resolver dns.Resolver, m Msg) {
530 qlog := log.WithCid(mox.Cid()).With(slog.Any("from", m.Sender()),
531 slog.Any("recipient", m.Recipient()),
532 slog.Int("attempts", m.Attempts),
533 slog.Int64("msgid", m.ID))
536 deliveryResult <- formatIPDomain(m.RecipientDomain)
540 qlog.Error("deliver panic", slog.Any("panic", x))
542 metrics.PanicInc(metrics.Queue)
546 // We register this attempt by setting last_attempt, and already next_attempt time
547 // in the future with exponential backoff. If we run into trouble delivery below,
548 // at least we won't be bothering the receiving server with our problems.
549 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
550 // 8h, 16h (send permanent failure DSN).
553 backoff := time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
554 for i := 0; i < m.Attempts; i++ {
555 backoff *= time.Duration(2)
560 m.NextAttempt = now.Add(backoff)
561 qup := bstore.QueryDB[Msg](mox.Shutdown, DB)
563 update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
564 if _, err := qup.UpdateNonzero(update); err != nil {
565 qlog.Errorx("storing delivery attempt", err)
569 // Find route for transport to use for delivery attempt.
570 var transport config.Transport
571 var transportName string
572 if m.Transport != "" {
574 transport, ok = mox.Conf.Static.Transports[m.Transport]
577 fail(ctx, qlog, m, backoff, false, remoteMTA, "", fmt.Sprintf("cannot find transport %q", m.Transport))
580 transportName = m.Transport
582 route := findRoute(m.Attempts-1, m)
583 transport = route.ResolvedTransport
584 transportName = route.Transport
587 if transportName != "" {
588 qlog = qlog.With(slog.String("transport", transportName))
589 qlog.Debug("delivering with transport")
592 // We gather TLS connection successes and failures during delivery, and we store
593 // them in tlsrptb. Every 24 hours we send an email with a report to the recipient
594 // domains that opt in via a TLSRPT DNS record. For us, the tricky part is
595 // collecting all reporting information. We've got several TLS modes
596 // (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
597 // Failures can happen at various levels: MTA-STS policies (apply to whole delivery
598 // attempt/domain), MX targets (possibly multiple per delivery attempt, both for
599 // MTA-STS and DANE).
601 // Once the SMTP client has tried a TLS handshake, we register success/failure,
602 // regardless of what happens next on the connection. We also register failures
603 // when they happen before we get to the SMTP client, but only if they are related
604 // to TLS (and some DNSSEC).
605 var recipientDomainResult tlsrpt.Result
606 var hostResults []tlsrpt.Result
608 if mox.Conf.Static.NoOutgoingTLSReports || m.RecipientDomain.IsIP() {
613 dayUTC := now.UTC().Format("20060102")
615 // See if this contains a failure. If not, we'll mark TLS results for delivering
616 // DMARC reports SendReport false, so we won't as easily get into a report sending
619 for _, result := range hostResults {
620 if result.Summary.TotalFailureSessionCount > 0 {
625 if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
629 results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
630 tlsaPolicyDomains := map[string]bool{}
631 addResult := func(r tlsrpt.Result, isHost bool) {
632 var zerotype tlsrpt.PolicyType
633 if r.Policy.Type == zerotype {
637 // Ensure we store policy domain in unicode in database.
638 policyDomain, err := dns.ParseDomain(r.Policy.Domain)
640 qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
644 if r.Policy.Type == tlsrpt.TLSA {
645 tlsaPolicyDomains[policyDomain.ASCII] = true
648 tlsResult := tlsrptdb.TLSResult{
649 PolicyDomain: policyDomain.Name(),
651 RecipientDomain: m.RecipientDomain.Domain.Name(),
653 SendReport: !m.IsTLSReport && (!m.IsDMARCReport || failure),
654 Results: []tlsrpt.Result{r},
656 results = append(results, tlsResult)
658 for _, result := range hostResults {
659 addResult(result, true)
661 // If we were delivering to a mail host directly (not a domain with MX records), we
662 // are more likely to get a TLSA policy than an STS policy. Don't potentially
663 // confuse operators with both a tlsa and no-policy-found result.
665 if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
666 addResult(recipientDomainResult, false)
669 if len(results) > 0 {
670 err := tlsrptdb.AddTLSResults(context.Background(), results)
671 qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
675 var dialer smtpclient.Dialer = &net.Dialer{}
676 if transport.Submissions != nil {
677 deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
678 } else if transport.Submission != nil {
679 deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
680 } else if transport.SMTP != nil {
681 // todo future: perhaps also gather tlsrpt results for submissions.
682 deliverSubmit(qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
684 ourHostname := mox.Conf.Static.HostnameDomain
685 if transport.Socks != nil {
686 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
688 fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", fmt.Sprintf("socks dialer: %v", err))
690 } else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
691 fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", "socks dialer is not a contextdialer")
696 ourHostname = transport.Socks.Hostname
698 recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, m, backoff)
702func findRoute(attempt int, m Msg) config.Route {
703 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
704 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
707 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
710 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
713 return config.Route{}
716func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
717 for _, r := range routes {
718 if routeMatch(attempt, m, r) {
722 return config.Route{}, false
725func routeMatch(attempt int, m Msg, r config.Route) bool {
726 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
729func routeMatchDomain(l []string, d dns.Domain) bool {
733 for _, e := range l {
734 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {