1package queue
2
3import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net"
10 "os"
11 "slices"
12 "strings"
13 "time"
14
15 "github.com/prometheus/client_golang/prometheus"
16 "github.com/prometheus/client_golang/prometheus/promauto"
17
18 "github.com/mjl-/bstore"
19
20 "github.com/mjl-/mox/dns"
21 "github.com/mjl-/mox/dsn"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/mlog"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/smtp"
26 "github.com/mjl-/mox/smtpclient"
27 "github.com/mjl-/mox/store"
28 "github.com/mjl-/mox/webhook"
29)
30
31var (
32 metricDMARCReportFailure = promauto.NewCounter(
33 prometheus.CounterOpts{
34 Name: "mox_queue_dmarcreport_failure_total",
35 Help: "Permanent failures to deliver a DMARC report.",
36 },
37 )
38)
39
40// failMsgsDB calls failMsgsTx with a new transaction, logging transaction errors.
41func failMsgsDB(qlog mlog.Log, msgs []*Msg, dialedIPs map[string][]net.IP, backoff time.Duration, remoteMTA dsn.NameIP, err error) {
42 xerr := DB.Write(context.Background(), func(tx *bstore.Tx) error {
43 failMsgsTx(qlog, tx, msgs, dialedIPs, backoff, remoteMTA, err)
44 return nil
45 })
46 if xerr != nil {
47 for _, m := range msgs {
48 qlog.Errorx("error marking delivery as failed", xerr,
49 slog.String("delivererr", err.Error()),
50 slog.Int64("msgid", m.ID),
51 slog.Any("recipient", m.Recipient()),
52 slog.Duration("backoff", backoff),
53 slog.Time("nextattempt", m.NextAttempt))
54 }
55 }
56 kick()
57}
58
59// todo: perhaps put some of the params in a delivery struct so we don't pass all the params all the time?
60
61// failMsgsTx processes a failure to deliver msgs. If the error is permanent, a DSN
62// is delivered to the sender account.
63// Caller must call kick() after commiting the transaction for any (re)scheduling
64// of messages and webhooks.
65func failMsgsTx(qlog mlog.Log, tx *bstore.Tx, msgs []*Msg, dialedIPs map[string][]net.IP, backoff time.Duration, remoteMTA dsn.NameIP, err error) {
66 // todo future: when we implement relaying, we should be able to send DSNs to non-local users. and possibly specify a null mailfrom. ../rfc/5321:1503
67 // todo future: when we implement relaying, and a dsn cannot be delivered, and requiretls was active, we cannot drop the message. instead deliver to local postmaster? though ../rfc/8689:383 may intend to say the dsn should be delivered without requiretls?
68 // todo future: when we implement smtp dsn extension, parameter RET=FULL must be disregarded for messages with REQUIRETLS. ../rfc/8689:379
69
70 m0 := msgs[0]
71
72 var smtpLines []string
73 var cerr smtpclient.Error
74 var permanent bool
75 var errmsg = err.Error()
76 var code int
77 var secodeOpt string
78 var event webhook.OutgoingEvent
79 if errors.As(err, &cerr) {
80 if cerr.Line != "" {
81 smtpLines = append([]string{cerr.Line}, cerr.MoreLines...)
82 }
83 permanent = cerr.Permanent
84 code = cerr.Code
85 secodeOpt = cerr.Secode
86 }
87 qlog = qlog.With(
88 slog.Bool("permanent", permanent),
89 slog.Int("code", code),
90 slog.String("secode", secodeOpt),
91 )
92
93 ids := make([]int64, len(msgs))
94 for i, m := range msgs {
95 ids[i] = m.ID
96 }
97
98 if permanent || m0.MaxAttempts == 0 && m0.Attempts >= 8 || m0.MaxAttempts > 0 && m0.Attempts >= m0.MaxAttempts {
99 event = webhook.EventFailed
100 if errors.Is(err, errSuppressed) {
101 event = webhook.EventSuppressed
102 }
103
104 rmsgs := make([]Msg, len(msgs))
105 var scl []suppressionCheck
106 for i, m := range msgs {
107 rm := *m
108 rm.DialedIPs = dialedIPs
109 rm.markResult(code, secodeOpt, errmsg, false)
110
111 qmlog := qlog.With(slog.Int64("msgid", rm.ID), slog.Any("recipient", m.Recipient()))
112 qmlog.Errorx("permanent failure delivering from queue", err)
113 deliverDSNFailure(qmlog, rm, remoteMTA, secodeOpt, errmsg, smtpLines)
114
115 rmsgs[i] = rm
116
117 // If this was an smtp error from remote, we'll pass the failure to the
118 // suppression list.
119 if code == 0 {
120 continue
121 }
122 sc := suppressionCheck{
123 MsgID: rm.ID,
124 Account: rm.SenderAccount,
125 Recipient: rm.Recipient(),
126 Code: code,
127 Secode: secodeOpt,
128 Source: "queue",
129 }
130 scl = append(scl, sc)
131 }
132 var suppressedMsgIDs []int64
133 if len(scl) > 0 {
134 var err error
135 suppressedMsgIDs, err = suppressionProcess(qlog, tx, scl...)
136 if err != nil {
137 qlog.Errorx("processing delivery failure in suppression list", err)
138 return
139 }
140 }
141 err := retireMsgs(qlog, tx, event, code, secodeOpt, suppressedMsgIDs, rmsgs...)
142 if err != nil {
143 qlog.Errorx("deleting queue messages from database after permanent failure", err)
144 } else if err := removeMsgsFS(qlog, rmsgs...); err != nil {
145 qlog.Errorx("remove queue messages from file system after permanent failure", err)
146 }
147
148 return
149 }
150
151 if m0.Attempts == 5 {
152 // We've attempted deliveries at these intervals: 0, 7.5m, 15m, 30m, 1h, 2u.
153 // Let sender know delivery is delayed.
154
155 retryUntil := m0.LastAttempt.Add((4 + 8 + 16) * time.Hour)
156 for _, m := range msgs {
157 qmlog := qlog.With(slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
158 qmlog.Errorx("temporary failure delivering from queue, sending delayed dsn", err, slog.Duration("backoff", backoff))
159 deliverDSNDelay(qmlog, *m, remoteMTA, secodeOpt, errmsg, smtpLines, retryUntil)
160 }
161 } else {
162 for _, m := range msgs {
163 qlog.Errorx("temporary failure delivering from queue", err,
164 slog.Int64("msgid", m.ID),
165 slog.Any("recipient", m.Recipient()),
166 slog.Duration("backoff", backoff),
167 slog.Time("nextattempt", m0.NextAttempt))
168 }
169 }
170
171 process := func() error {
172 // Update DialedIPs in message, and record the result.
173 qup := bstore.QueryTx[Msg](tx)
174 qup.FilterIDs(ids)
175 umsgs, err := qup.List()
176 if err != nil {
177 return fmt.Errorf("retrieving messages for marking temporary delivery error: %v", err)
178 }
179 for _, um := range umsgs {
180 // All messages should have the same DialedIPs.
181 um.DialedIPs = dialedIPs
182 um.markResult(code, secodeOpt, errmsg, false)
183 if err := tx.Update(&um); err != nil {
184 return fmt.Errorf("updating message after temporary failure to deliver: %v", err)
185 }
186 }
187
188 // If configured, we'll queue webhooks for delivery.
189 accConf, ok := mox.Conf.Account(m0.SenderAccount)
190 if !(ok && accConf.OutgoingWebhook != nil && (len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(webhook.EventDelayed)))) {
191 return nil
192 }
193
194 hooks := make([]Hook, len(msgs))
195 for i, m := range msgs {
196 var err error
197 hooks[i], err = hookCompose(*m, accConf.OutgoingWebhook.URL, accConf.OutgoingWebhook.Authorization, webhook.EventDelayed, false, code, secodeOpt)
198 if err != nil {
199 return fmt.Errorf("composing webhook for failed delivery attempt for msg id %d: %v", m.ID, err)
200 }
201 }
202 now := time.Now()
203 for i := range hooks {
204 if err := hookInsert(tx, &hooks[i], now, accConf.KeepRetiredWebhookPeriod); err != nil {
205 return fmt.Errorf("inserting webhook into queue: %v", err)
206 }
207 qlog.Debug("queueing webhook for temporary delivery errors", hooks[i].attrs()...)
208 }
209 return nil
210 }
211 if err := process(); err != nil {
212 qlog.Errorx("processing temporary delivery error", err, slog.String("deliveryerror", errmsg))
213 }
214}
215
216func deliverDSNFailure(log mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg string, smtpLines []string) {
217 const subject = "mail delivery failed"
218 message := fmt.Sprintf(`
219Delivery has failed permanently for your email to:
220
221 %s
222
223No further deliveries will be attempted.
224
225Error during the last delivery attempt:
226
227 %s
228`, m.Recipient().XString(m.SMTPUTF8), errmsg)
229 if len(smtpLines) > 0 {
230 message += "\nFull SMTP response:\n\n\t" + strings.Join(smtpLines, "\n\t") + "\n"
231 }
232
233 deliverDSN(log, m, remoteMTA, secodeOpt, errmsg, smtpLines, true, nil, subject, message)
234}
235
236func deliverDSNDelay(log mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg string, smtpLines []string, retryUntil time.Time) {
237 // Should not happen, but doesn't hurt to prevent sending delayed delivery
238 // notifications for DMARC reports. We don't want to waste postmaster attention.
239 if m.IsDMARCReport {
240 return
241 }
242
243 const subject = "mail delivery delayed"
244 message := fmt.Sprintf(`
245Delivery has been delayed of your email to:
246
247 %s
248
249Next attempts to deliver: in 4 hours, 8 hours and 16 hours.
250If these attempts all fail, you will receive a notice.
251
252Error during the last delivery attempt:
253
254 %s
255`, m.Recipient().XString(false), errmsg)
256 if len(smtpLines) > 0 {
257 message += "\nFull SMTP response:\n\n\t" + strings.Join(smtpLines, "\n\t") + "\n"
258 }
259
260 deliverDSN(log, m, remoteMTA, secodeOpt, errmsg, smtpLines, false, &retryUntil, subject, message)
261}
262
263// We only queue DSNs for delivery failures for emails submitted by authenticated
264// users. So we are delivering to local users. ../rfc/5321:1466
265// ../rfc/5321:1494
266// ../rfc/7208:490
267func deliverDSN(log mlog.Log, m Msg, remoteMTA dsn.NameIP, secodeOpt, errmsg string, smtpLines []string, permanent bool, retryUntil *time.Time, subject, textBody string) {
268 kind := "delayed delivery"
269 if permanent {
270 kind = "failure"
271 }
272
273 qlog := func(text string, err error) {
274 log.Errorx("queue dsn: "+text+": sender will not be informed about dsn", err, slog.String("sender", m.Sender().XString(m.SMTPUTF8)), slog.String("kind", kind))
275 }
276
277 msgf, err := os.Open(m.MessagePath())
278 if err != nil {
279 qlog("opening queued message", err)
280 return
281 }
282 msgr := store.FileMsgReader(m.MsgPrefix, msgf)
283 defer func() {
284 err := msgr.Close()
285 log.Check(err, "closing message reader after queuing dsn")
286 }()
287 headers, err := message.ReadHeaders(bufio.NewReader(msgr))
288 if err != nil {
289 qlog("reading headers of queued message", err)
290 return
291 }
292
293 var action dsn.Action
294 var status string
295 if permanent {
296 status = "5."
297 action = dsn.Failed
298 } else {
299 action = dsn.Delayed
300 status = "4."
301 }
302 if secodeOpt != "" {
303 status += secodeOpt
304 } else {
305 status += "0.0"
306 }
307
308 // ../rfc/3461:1329
309 var smtpDiag string
310 if len(smtpLines) > 0 {
311 smtpDiag = strings.Join(smtpLines, " ")
312 }
313
314 dsnMsg := &dsn.Message{
315 SMTPUTF8: m.SMTPUTF8,
316 From: smtp.Path{Localpart: "postmaster", IPDomain: dns.IPDomain{Domain: mox.Conf.Static.HostnameDomain}},
317 To: m.Sender(),
318 Subject: subject,
319 MessageID: mox.MessageIDGen(false),
320 References: m.MessageID,
321 TextBody: textBody,
322
323 ReportingMTA: mox.Conf.Static.HostnameDomain.ASCII,
324 ArrivalDate: m.Queued,
325 FutureReleaseRequest: m.FutureReleaseRequest,
326
327 Recipients: []dsn.Recipient{
328 {
329 FinalRecipient: m.Recipient(),
330 Action: action,
331 Status: status,
332 StatusComment: errmsg,
333 RemoteMTA: remoteMTA,
334 DiagnosticCodeSMTP: smtpDiag,
335 LastAttemptDate: *m.LastAttempt,
336 WillRetryUntil: retryUntil,
337 },
338 },
339
340 Original: headers,
341 }
342 msgData, err := dsnMsg.Compose(log, m.SMTPUTF8)
343 if err != nil {
344 qlog("composing dsn", err)
345 return
346 }
347
348 prefix := []byte("Return-Path: <" + dsnMsg.From.XString(m.SMTPUTF8) + ">\r\n" + "Delivered-To: " + m.Sender().XString(m.SMTPUTF8) + "\r\n")
349 msgData = append(prefix, msgData...)
350
351 mailbox := "Inbox"
352 senderAccount := m.SenderAccount
353 if m.IsDMARCReport {
354 // senderAccount should already by postmaster, but doesn't hurt to ensure it.
355 senderAccount = mox.Conf.Static.Postmaster.Account
356 }
357 acc, err := store.OpenAccount(log, senderAccount)
358 if err != nil {
359 acc, err = store.OpenAccount(log, mox.Conf.Static.Postmaster.Account)
360 if err != nil {
361 qlog("looking up postmaster account after sender account was not found", err)
362 return
363 }
364 mailbox = mox.Conf.Static.Postmaster.Mailbox
365 }
366 defer func() {
367 err := acc.Close()
368 log.Check(err, "queue dsn: closing account", slog.String("sender", m.Sender().XString(m.SMTPUTF8)), slog.String("kind", kind))
369 }()
370
371 msgFile, err := store.CreateMessageTemp(log, "queue-dsn")
372 if err != nil {
373 qlog("creating temporary message file", err)
374 return
375 }
376 defer store.CloseRemoveTempFile(log, msgFile, "dsn message")
377
378 msgWriter := message.NewWriter(msgFile)
379 if _, err := msgWriter.Write(msgData); err != nil {
380 qlog("writing dsn message", err)
381 return
382 }
383
384 msg := store.Message{
385 Received: time.Now(),
386 Size: msgWriter.Size,
387 MsgPrefix: []byte{},
388 DSN: true,
389 }
390
391 // If this is a DMARC report, deliver it as seen message to a submailbox of the
392 // postmaster mailbox. We mark it as seen so it doesn't waste postmaster attention,
393 // but we deliver them so they can be checked in case of problems.
394 if m.IsDMARCReport {
395 mailbox = fmt.Sprintf("%s/dmarc", mox.Conf.Static.Postmaster.Mailbox)
396 msg.Seen = true
397 metricDMARCReportFailure.Inc()
398 log.Info("delivering dsn for failure to deliver outgoing dmarc report")
399 }
400
401 acc.WithWLock(func() {
402 if err := acc.DeliverMailbox(log, mailbox, &msg, msgFile); err != nil {
403 qlog("delivering dsn to mailbox", err)
404 return
405 }
406 })
407}
408