1package queue
2
3import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "net"
12 "os"
13 "slices"
14 "time"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/config"
19 "github.com/mjl-/mox/dns"
20 "github.com/mjl-/mox/dsn"
21 "github.com/mjl-/mox/mlog"
22 "github.com/mjl-/mox/mox-"
23 "github.com/mjl-/mox/sasl"
24 "github.com/mjl-/mox/smtp"
25 "github.com/mjl-/mox/smtpclient"
26 "github.com/mjl-/mox/store"
27 "github.com/mjl-/mox/webhook"
28)
29
30// todo: reuse connection? do fewer concurrently (other than with direct delivery).
31
32// deliver via another SMTP server, e.g. relaying to a smart host, possibly
33// with authentication (submission).
34func deliverSubmit(qlog mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, msgs []*Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
35 // todo: configurable timeouts
36
37 // For convenience, all messages share the same relevant values.
38 m0 := msgs[0]
39
40 port := transport.Port
41 if port == 0 {
42 port = defaultPort
43 }
44
45 tlsMode := smtpclient.TLSRequiredStartTLS
46 tlsPKIX := true
47 if dialTLS {
48 tlsMode = smtpclient.TLSImmediate
49 } else if transport.STARTTLSInsecureSkipVerify {
50 tlsMode = smtpclient.TLSRequiredStartTLS
51 tlsPKIX = false
52 } else if transport.NoSTARTTLS {
53 tlsMode = smtpclient.TLSSkip
54 tlsPKIX = false
55 }
56
57 // Prepare values for logging/metrics. They are updated for various error
58 // conditions later on.
59 start := time.Now()
60 var submiterr error // Of whole operation, nil for partial failure/success.
61 var delivered int
62 failed := len(msgs) // Reset and updated after smtp transaction.
63 defer func() {
64 r := deliveryResult(submiterr, delivered, failed)
65 d := float64(time.Since(start)) / float64(time.Second)
66 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m0.Attempts), transportName, string(tlsMode), r).Observe(d)
67
68 qlog.Debugx("queue deliversubmit result", submiterr,
69 slog.Any("host", transport.DNSHost),
70 slog.Int("port", port),
71 slog.String("result", r),
72 slog.Int("delivered", delivered),
73 slog.Int("failed", failed),
74 slog.Any("tlsmode", tlsMode),
75 slog.Bool("tlspkix", tlsPKIX),
76 slog.Duration("duration", time.Since(start)))
77 }()
78
79 // todo: SMTP-DANE should be used when relaying on port 25.
80 // ../rfc/7672:1261
81
82 // todo: for submission, understand SRV records, and even DANE.
83
84 ctx := mox.Shutdown
85
86 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
87 // certificates. If our submission connection is not configured that way, abort.
88 requireTLS := m0.RequireTLS != nil && *m0.RequireTLS
89 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
90 submiterr = smtpclient.Error{
91 Permanent: true,
92 Code: smtp.C554TransactionFailed,
93 Secode: smtp.SePol7MissingReqTLS30,
94 Err: fmt.Errorf("transport %s: message requires verified tls but transport does not verify tls", transportName),
95 }
96 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
97 return
98 }
99
100 dialctx, dialcancel := context.WithTimeout(ctx, 30*time.Second)
101 defer dialcancel()
102 if msgs[0].DialedIPs == nil {
103 msgs[0].DialedIPs = map[string][]net.IP{}
104 m0 = msgs[0]
105 }
106 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog.Logger, resolver, "ip", dns.IPDomain{Domain: transport.DNSHost}, m0.DialedIPs)
107 var conn net.Conn
108 if err == nil {
109 conn, _, err = smtpclient.Dial(dialctx, qlog.Logger, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m0.DialedIPs, mox.Conf.Static.SpecifiedSMTPListenIPs)
110 }
111 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
112 var result string
113 switch {
114 case err == nil:
115 result = "ok"
116 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
117 result = "timeout"
118 case errors.Is(err, context.Canceled):
119 result = "canceled"
120 default:
121 result = "error"
122 }
123 metricConnection.WithLabelValues(result).Inc()
124 if err != nil {
125 if conn != nil {
126 err := conn.Close()
127 qlog.Check(err, "closing connection")
128 }
129 qlog.Errorx("dialing for submission", err, slog.String("remote", addr))
130 submiterr = fmt.Errorf("transport %s: dialing %s for submission: %w", transportName, addr, err)
131 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
132 return
133 }
134 dialcancel()
135
136 var auth func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error)
137 if transport.Auth != nil {
138 a := transport.Auth
139 auth = func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error) {
140 var supportsscramsha1plus, supportsscramsha256plus bool
141 for _, mech := range a.EffectiveMechanisms {
142 if !slices.Contains(mechanisms, mech) {
143 switch mech {
144 case "SCRAM-SHA-1-PLUS":
145 supportsscramsha1plus = cs != nil
146 case "SCRAM-SHA-256-PLUS":
147 supportsscramsha256plus = cs != nil
148 }
149 continue
150 }
151 if mech == "SCRAM-SHA-256-PLUS" && cs != nil {
152 return sasl.NewClientSCRAMSHA256PLUS(a.Username, a.Password, *cs), nil
153 } else if mech == "SCRAM-SHA-256" {
154 return sasl.NewClientSCRAMSHA256(a.Username, a.Password, supportsscramsha256plus), nil
155 } else if mech == "SCRAM-SHA-1-PLUS" && cs != nil {
156 return sasl.NewClientSCRAMSHA1PLUS(a.Username, a.Password, *cs), nil
157 } else if mech == "SCRAM-SHA-1" {
158 return sasl.NewClientSCRAMSHA1(a.Username, a.Password, supportsscramsha1plus), nil
159 } else if mech == "CRAM-MD5" {
160 return sasl.NewClientCRAMMD5(a.Username, a.Password), nil
161 } else if mech == "PLAIN" {
162 return sasl.NewClientPlain(a.Username, a.Password), nil
163 }
164 return nil, fmt.Errorf("internal error: unrecognized authentication mechanism %q for transport %s", mech, transportName)
165 }
166
167 // No mutually supported algorithm.
168 return nil, nil
169 }
170 }
171 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
172 defer clientcancel()
173 opts := smtpclient.Opts{
174 Auth: auth,
175 RootCAs: mox.Conf.Static.TLS.CertPool,
176 }
177 client, err := smtpclient.New(clientctx, qlog.Logger, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
178 if err != nil {
179 smtperr, ok := err.(smtpclient.Error)
180 var remoteMTA dsn.NameIP
181 submiterr = fmt.Errorf("transport %s: establishing smtp session with %s for submission: %w", transportName, addr, err)
182 if ok {
183 remoteMTA.Name = transport.Host
184 smtperr.Err = submiterr
185 submiterr = smtperr
186 }
187 qlog.Errorx("establishing smtp session for submission", submiterr, slog.String("remote", addr))
188 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, remoteMTA, submiterr)
189 return
190 }
191 defer func() {
192 err := client.Close()
193 qlog.Check(err, "closing smtp client after delivery")
194 }()
195 clientcancel()
196
197 var msgr io.ReadCloser
198 var size int64
199 var req8bit, reqsmtputf8 bool
200 if len(m0.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
201 msgr = io.NopCloser(bytes.NewReader(m0.DSNUTF8))
202 reqsmtputf8 = true
203 size = int64(len(m0.DSNUTF8))
204 } else {
205 req8bit = m0.Has8bit // todo: not require this, but just try to submit?
206 size = m0.Size
207
208 p := m0.MessagePath()
209 f, err := os.Open(p)
210 if err != nil {
211 qlog.Errorx("opening message for delivery", err, slog.String("remote", addr), slog.String("path", p))
212 submiterr = fmt.Errorf("transport %s: opening message file for submission: %w", transportName, err)
213 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
214 return
215 }
216 msgr = store.FileMsgReader(m0.MsgPrefix, f)
217 defer func() {
218 if msgr != nil {
219 err := msgr.Close()
220 qlog.Check(err, "closing message after delivery attempt")
221 }
222 }()
223 }
224
225 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
226 defer delivercancel()
227 rcpts := make([]string, len(msgs))
228 for i, m := range msgs {
229 rcpts[i] = m.Recipient().String()
230 }
231 rcptErrs, submiterr := client.DeliverMultiple(deliverctx, m0.Sender().String(), rcpts, size, msgr, req8bit, reqsmtputf8, requireTLS)
232 if submiterr != nil {
233 qlog.Infox("smtp transaction for delivery failed", submiterr)
234 }
235
236 // Must close before processing, because that may try to remove the message file,
237 // and on Windows we can't have it open when we remove it.
238 cerr := msgr.Close()
239 qlog.Check(cerr, "closing message after delivery attempt")
240 msgr = nil
241
242 failed, delivered = processDeliveries(qlog, m0, msgs, addr, transport.Host, backoff, rcptErrs, submiterr)
243}
244
245// Process failures and successful deliveries, retiring/removing messages from
246// queue, queueing webhooks.
247//
248// Also used by deliverLocalserve.
249func processDeliveries(qlog mlog.Log, m0 *Msg, msgs []*Msg, remoteAddr string, remoteHost string, backoff time.Duration, rcptErrs []smtpclient.Response, submiterr error) (failed, delivered int) {
250 var delMsgs []Msg
251 for i, m := range msgs {
252 qmlog := qlog.With(
253 slog.Int64("msgid", m.ID),
254 slog.Any("recipient", m.Recipient()))
255
256 err := submiterr
257 if err == nil && len(rcptErrs) > i {
258 if rcptErrs[i].Code != smtp.C250Completed {
259 err = smtpclient.Error(rcptErrs[i])
260 }
261 }
262 if err != nil {
263 smtperr, ok := err.(smtpclient.Error)
264 err = fmt.Errorf("delivering message to %s: %w", remoteAddr, err)
265 var remoteMTA dsn.NameIP
266 if ok {
267 remoteMTA.Name = remoteHost
268 smtperr.Err = err
269 err = smtperr
270 }
271 qmlog.Errorx("submitting message", err, slog.String("remote", remoteAddr))
272 failMsgsDB(qmlog, []*Msg{m}, m0.DialedIPs, backoff, remoteMTA, err)
273 failed++
274 } else {
275 resp := rcptErrs[i]
276 m.markResult(resp.Code, resp.Secode, "", true)
277 delMsgs = append(delMsgs, *m)
278 qmlog.Info("delivered from queue with transport")
279 delivered++
280 }
281 }
282 if len(delMsgs) > 0 {
283 err := DB.Write(context.Background(), func(tx *bstore.Tx) error {
284 return retireMsgs(qlog, tx, webhook.EventDelivered, 0, "", nil, delMsgs...)
285 })
286 if err != nil {
287 qlog.Errorx("remove queue message from database after delivery", err)
288 } else if err := removeMsgsFS(qlog, delMsgs...); err != nil {
289 qlog.Errorx("remove queue message from file system after delivery", err)
290 }
291 kick()
292 }
293 return
294}
295