1package queue
2
3import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "net"
12 "os"
13 "slices"
14 "time"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/config"
19 "github.com/mjl-/mox/dns"
20 "github.com/mjl-/mox/dsn"
21 "github.com/mjl-/mox/mlog"
22 "github.com/mjl-/mox/mox-"
23 "github.com/mjl-/mox/sasl"
24 "github.com/mjl-/mox/smtp"
25 "github.com/mjl-/mox/smtpclient"
26 "github.com/mjl-/mox/store"
27 "github.com/mjl-/mox/webhook"
28)
29
30// todo: reuse connection? do fewer concurrently (other than with direct delivery).
31
32// deliver via another SMTP server, e.g. relaying to a smart host, possibly
33// with authentication (submission).
34func deliverSubmit(qlog mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, msgs []*Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
35 // todo: configurable timeouts
36
37 // For convenience, all messages share the same relevant values.
38 m0 := msgs[0]
39
40 port := transport.Port
41 if port == 0 {
42 port = defaultPort
43 }
44
45 tlsMode := smtpclient.TLSRequiredStartTLS
46 tlsPKIX := true
47 if dialTLS {
48 tlsMode = smtpclient.TLSImmediate
49 } else if transport.STARTTLSInsecureSkipVerify {
50 tlsMode = smtpclient.TLSRequiredStartTLS
51 tlsPKIX = false
52 } else if transport.NoSTARTTLS {
53 tlsMode = smtpclient.TLSSkip
54 tlsPKIX = false
55 }
56
57 // Prepare values for logging/metrics. They are updated for various error
58 // conditions later on.
59 start := time.Now()
60 var submiterr error // Of whole operation, nil for partial failure/success.
61 var delivered int
62 failed := len(msgs) // Reset and updated after smtp transaction.
63 defer func() {
64 r := deliveryResult(submiterr, delivered, failed)
65 d := float64(time.Since(start)) / float64(time.Second)
66 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m0.Attempts), transportName, string(tlsMode), r).Observe(d)
67
68 qlog.Debugx("queue deliversubmit result", submiterr,
69 slog.Any("host", transport.DNSHost),
70 slog.Int("port", port),
71 slog.Int("attempt", m0.Attempts),
72 slog.String("result", r),
73 slog.Int("delivered", delivered),
74 slog.Int("failed", failed),
75 slog.Any("tlsmode", tlsMode),
76 slog.Bool("tlspkix", tlsPKIX),
77 slog.Duration("duration", time.Since(start)))
78 }()
79
80 // todo: SMTP-DANE should be used when relaying on port 25.
81 // ../rfc/7672:1261
82
83 // todo: for submission, understand SRV records, and even DANE.
84
85 ctx := mox.Shutdown
86
87 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
88 // certificates. If our submission connection is not configured that way, abort.
89 requireTLS := m0.RequireTLS != nil && *m0.RequireTLS
90 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
91 submiterr = smtpclient.Error{
92 Permanent: true,
93 Code: smtp.C554TransactionFailed,
94 Secode: smtp.SePol7MissingReqTLS30,
95 Err: fmt.Errorf("transport %s: message requires verified tls but transport does not verify tls", transportName),
96 }
97 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
98 return
99 }
100
101 dialctx, dialcancel := context.WithTimeout(ctx, 30*time.Second)
102 defer dialcancel()
103 if msgs[0].DialedIPs == nil {
104 msgs[0].DialedIPs = map[string][]net.IP{}
105 m0 = msgs[0]
106 }
107 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog.Logger, resolver, "ip", dns.IPDomain{Domain: transport.DNSHost}, m0.DialedIPs)
108 var conn net.Conn
109 if err == nil {
110 conn, _, err = smtpclient.Dial(dialctx, qlog.Logger, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m0.DialedIPs, mox.Conf.Static.SpecifiedSMTPListenIPs)
111 }
112 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
113 var result string
114 switch {
115 case err == nil:
116 result = "ok"
117 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
118 result = "timeout"
119 case errors.Is(err, context.Canceled):
120 result = "canceled"
121 default:
122 result = "error"
123 }
124 metricConnection.WithLabelValues(result).Inc()
125 if err != nil {
126 if conn != nil {
127 err := conn.Close()
128 qlog.Check(err, "closing connection")
129 }
130 qlog.Errorx("dialing for submission", err, slog.String("remote", addr))
131 submiterr = fmt.Errorf("transport %s: dialing %s for submission: %w", transportName, addr, err)
132 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
133 return
134 }
135 dialcancel()
136
137 var auth func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error)
138 if transport.Auth != nil {
139 a := transport.Auth
140 auth = func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error) {
141 var supportsscramsha1plus, supportsscramsha256plus bool
142 for _, mech := range a.EffectiveMechanisms {
143 if !slices.Contains(mechanisms, mech) {
144 switch mech {
145 case "SCRAM-SHA-1-PLUS":
146 supportsscramsha1plus = cs != nil
147 case "SCRAM-SHA-256-PLUS":
148 supportsscramsha256plus = cs != nil
149 }
150 continue
151 }
152 if mech == "SCRAM-SHA-256-PLUS" && cs != nil {
153 return sasl.NewClientSCRAMSHA256PLUS(a.Username, a.Password, *cs), nil
154 } else if mech == "SCRAM-SHA-256" {
155 return sasl.NewClientSCRAMSHA256(a.Username, a.Password, supportsscramsha256plus), nil
156 } else if mech == "SCRAM-SHA-1-PLUS" && cs != nil {
157 return sasl.NewClientSCRAMSHA1PLUS(a.Username, a.Password, *cs), nil
158 } else if mech == "SCRAM-SHA-1" {
159 return sasl.NewClientSCRAMSHA1(a.Username, a.Password, supportsscramsha1plus), nil
160 } else if mech == "CRAM-MD5" {
161 return sasl.NewClientCRAMMD5(a.Username, a.Password), nil
162 } else if mech == "PLAIN" {
163 return sasl.NewClientPlain(a.Username, a.Password), nil
164 }
165 return nil, fmt.Errorf("internal error: unrecognized authentication mechanism %q for transport %s", mech, transportName)
166 }
167
168 // No mutually supported algorithm.
169 return nil, nil
170 }
171 }
172 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
173 defer clientcancel()
174 opts := smtpclient.Opts{
175 Auth: auth,
176 RootCAs: mox.Conf.Static.TLS.CertPool,
177 }
178 client, err := smtpclient.New(clientctx, qlog.Logger, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
179 if err != nil {
180 smtperr, ok := err.(smtpclient.Error)
181 var remoteMTA dsn.NameIP
182 submiterr = fmt.Errorf("transport %s: establishing smtp session with %s for submission: %w", transportName, addr, err)
183 if ok {
184 remoteMTA.Name = transport.Host
185 smtperr.Err = submiterr
186 submiterr = smtperr
187 }
188 qlog.Errorx("establishing smtp session for submission", submiterr, slog.String("remote", addr))
189 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, remoteMTA, submiterr)
190 return
191 }
192 defer func() {
193 err := client.Close()
194 qlog.Check(err, "closing smtp client after delivery")
195 }()
196 clientcancel()
197
198 var msgr io.ReadCloser
199 var size int64
200 var req8bit, reqsmtputf8 bool
201 if len(m0.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
202 msgr = io.NopCloser(bytes.NewReader(m0.DSNUTF8))
203 reqsmtputf8 = true
204 size = int64(len(m0.DSNUTF8))
205 } else {
206 req8bit = m0.Has8bit // todo: not require this, but just try to submit?
207 size = m0.Size
208
209 p := m0.MessagePath()
210 f, err := os.Open(p)
211 if err != nil {
212 qlog.Errorx("opening message for delivery", err, slog.String("remote", addr), slog.String("path", p))
213 submiterr = fmt.Errorf("transport %s: opening message file for submission: %w", transportName, err)
214 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
215 return
216 }
217 msgr = store.FileMsgReader(m0.MsgPrefix, f)
218 defer func() {
219 err := msgr.Close()
220 qlog.Check(err, "closing message after delivery attempt")
221 }()
222 }
223
224 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
225 defer delivercancel()
226 rcpts := make([]string, len(msgs))
227 for i, m := range msgs {
228 rcpts[i] = m.Recipient().String()
229 }
230 rcptErrs, submiterr := client.DeliverMultiple(deliverctx, m0.Sender().String(), rcpts, size, msgr, req8bit, reqsmtputf8, requireTLS)
231 if submiterr != nil {
232 qlog.Infox("smtp transaction for delivery failed", submiterr)
233 }
234 failed, delivered = processDeliveries(qlog, m0, msgs, addr, transport.Host, backoff, rcptErrs, submiterr)
235}
236
237// Process failures and successful deliveries, retiring/removing messages from
238// queue, queueing webhooks.
239//
240// Also used by deliverLocalserve.
241func processDeliveries(qlog mlog.Log, m0 *Msg, msgs []*Msg, remoteAddr string, remoteHost string, backoff time.Duration, rcptErrs []smtpclient.Response, submiterr error) (failed, delivered int) {
242 var delMsgs []Msg
243 for i, m := range msgs {
244 qmlog := qlog.With(
245 slog.Int64("msgid", m.ID),
246 slog.Any("recipient", m.Recipient()))
247
248 err := submiterr
249 if err == nil && len(rcptErrs) > i {
250 if rcptErrs[i].Code != smtp.C250Completed {
251 err = smtpclient.Error(rcptErrs[i])
252 }
253 }
254 if err != nil {
255 smtperr, ok := err.(smtpclient.Error)
256 err = fmt.Errorf("delivering message to %s: %w", remoteAddr, err)
257 var remoteMTA dsn.NameIP
258 if ok {
259 remoteMTA.Name = remoteHost
260 smtperr.Err = err
261 err = smtperr
262 }
263 qmlog.Errorx("submitting message", err, slog.String("remote", remoteAddr))
264 failMsgsDB(qmlog, []*Msg{m}, m0.DialedIPs, backoff, remoteMTA, err)
265 failed++
266 } else {
267 resp := rcptErrs[i]
268 m.markResult(resp.Code, resp.Secode, "", true)
269 delMsgs = append(delMsgs, *m)
270 qmlog.Info("delivered from queue with transport")
271 delivered++
272 }
273 }
274 if len(delMsgs) > 0 {
275 err := DB.Write(context.Background(), func(tx *bstore.Tx) error {
276 return retireMsgs(qlog, tx, webhook.EventDelivered, 0, "", nil, delMsgs...)
277 })
278 if err != nil {
279 qlog.Errorx("remove queue message from database after delivery", err)
280 } else if err := removeMsgsFS(qlog, delMsgs...); err != nil {
281 qlog.Errorx("remove queue message from file system after delivery", err)
282 }
283 kick()
284 }
285 return
286}
287