16 "github.com/mjl-/bstore"
18 "github.com/mjl-/mox/config"
19 "github.com/mjl-/mox/dns"
20 "github.com/mjl-/mox/dsn"
21 "github.com/mjl-/mox/mlog"
22 "github.com/mjl-/mox/mox-"
23 "github.com/mjl-/mox/sasl"
24 "github.com/mjl-/mox/smtp"
25 "github.com/mjl-/mox/smtpclient"
26 "github.com/mjl-/mox/store"
27 "github.com/mjl-/mox/webhook"
30// todo: reuse connection? do fewer concurrently (other than with direct delivery).
32// deliver via another SMTP server, e.g. relaying to a smart host, possibly
33// with authentication (submission).
34func deliverSubmit(qlog mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, msgs []*Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
35 // todo: configurable timeouts
37 // For convenience, all messages share the same relevant values.
40 port := transport.Port
45 tlsMode := smtpclient.TLSRequiredStartTLS
48 tlsMode = smtpclient.TLSImmediate
49 } else if transport.STARTTLSInsecureSkipVerify {
50 tlsMode = smtpclient.TLSRequiredStartTLS
52 } else if transport.NoSTARTTLS {
53 tlsMode = smtpclient.TLSSkip
57 // Prepare values for logging/metrics. They are updated for various error
58 // conditions later on.
60 var submiterr error // Of whole operation, nil for partial failure/success.
62 failed := len(msgs) // Reset and updated after smtp transaction.
64 r := deliveryResult(submiterr, delivered, failed)
65 d := float64(time.Since(start)) / float64(time.Second)
66 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m0.Attempts), transportName, string(tlsMode), r).Observe(d)
68 qlog.Debugx("queue deliversubmit result", submiterr,
69 slog.Any("host", transport.DNSHost),
70 slog.Int("port", port),
71 slog.Int("attempt", m0.Attempts),
72 slog.String("result", r),
73 slog.Int("delivered", delivered),
74 slog.Int("failed", failed),
75 slog.Any("tlsmode", tlsMode),
76 slog.Bool("tlspkix", tlsPKIX),
77 slog.Duration("duration", time.Since(start)))
80 // todo: SMTP-DANE should be used when relaying on port 25.
83 // todo: for submission, understand SRV records, and even DANE.
87 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
88 // certificates. If our submission connection is not configured that way, abort.
89 requireTLS := m0.RequireTLS != nil && *m0.RequireTLS
90 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
91 submiterr = smtpclient.Error{
93 Code: smtp.C554TransactionFailed,
94 Secode: smtp.SePol7MissingReqTLS30,
95 Err: fmt.Errorf("transport %s: message requires verified tls but transport does not verify tls", transportName),
97 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
101 dialctx, dialcancel := context.WithTimeout(ctx, 30*time.Second)
103 if msgs[0].DialedIPs == nil {
104 msgs[0].DialedIPs = map[string][]net.IP{}
107 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog.Logger, resolver, "ip", dns.IPDomain{Domain: transport.DNSHost}, m0.DialedIPs)
110 conn, _, err = smtpclient.Dial(dialctx, qlog.Logger, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m0.DialedIPs, mox.Conf.Static.SpecifiedSMTPListenIPs)
112 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
117 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
119 case errors.Is(err, context.Canceled):
124 metricConnection.WithLabelValues(result).Inc()
128 qlog.Check(err, "closing connection")
130 qlog.Errorx("dialing for submission", err, slog.String("remote", addr))
131 submiterr = fmt.Errorf("transport %s: dialing %s for submission: %w", transportName, addr, err)
132 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
137 var auth func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error)
138 if transport.Auth != nil {
140 auth = func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error) {
141 var supportsscramsha1plus, supportsscramsha256plus bool
142 for _, mech := range a.EffectiveMechanisms {
143 if !slices.Contains(mechanisms, mech) {
145 case "SCRAM-SHA-1-PLUS":
146 supportsscramsha1plus = cs != nil
147 case "SCRAM-SHA-256-PLUS":
148 supportsscramsha256plus = cs != nil
152 if mech == "SCRAM-SHA-256-PLUS" && cs != nil {
153 return sasl.NewClientSCRAMSHA256PLUS(a.Username, a.Password, *cs), nil
154 } else if mech == "SCRAM-SHA-256" {
155 return sasl.NewClientSCRAMSHA256(a.Username, a.Password, supportsscramsha256plus), nil
156 } else if mech == "SCRAM-SHA-1-PLUS" && cs != nil {
157 return sasl.NewClientSCRAMSHA1PLUS(a.Username, a.Password, *cs), nil
158 } else if mech == "SCRAM-SHA-1" {
159 return sasl.NewClientSCRAMSHA1(a.Username, a.Password, supportsscramsha1plus), nil
160 } else if mech == "CRAM-MD5" {
161 return sasl.NewClientCRAMMD5(a.Username, a.Password), nil
162 } else if mech == "PLAIN" {
163 return sasl.NewClientPlain(a.Username, a.Password), nil
165 return nil, fmt.Errorf("internal error: unrecognized authentication mechanism %q for transport %s", mech, transportName)
168 // No mutually supported algorithm.
172 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
174 opts := smtpclient.Opts{
176 RootCAs: mox.Conf.Static.TLS.CertPool,
178 client, err := smtpclient.New(clientctx, qlog.Logger, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
180 smtperr, ok := err.(smtpclient.Error)
181 var remoteMTA dsn.NameIP
182 submiterr = fmt.Errorf("transport %s: establishing smtp session with %s for submission: %w", transportName, addr, err)
184 remoteMTA.Name = transport.Host
185 smtperr.Err = submiterr
188 qlog.Errorx("establishing smtp session for submission", submiterr, slog.String("remote", addr))
189 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, remoteMTA, submiterr)
193 err := client.Close()
194 qlog.Check(err, "closing smtp client after delivery")
198 var msgr io.ReadCloser
200 var req8bit, reqsmtputf8 bool
201 if len(m0.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
202 msgr = io.NopCloser(bytes.NewReader(m0.DSNUTF8))
204 size = int64(len(m0.DSNUTF8))
206 req8bit = m0.Has8bit // todo: not require this, but just try to submit?
209 p := m0.MessagePath()
212 qlog.Errorx("opening message for delivery", err, slog.String("remote", addr), slog.String("path", p))
213 submiterr = fmt.Errorf("transport %s: opening message file for submission: %w", transportName, err)
214 failMsgsDB(qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
217 msgr = store.FileMsgReader(m0.MsgPrefix, f)
220 qlog.Check(err, "closing message after delivery attempt")
224 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
225 defer delivercancel()
226 rcpts := make([]string, len(msgs))
227 for i, m := range msgs {
228 rcpts[i] = m.Recipient().String()
230 rcptErrs, submiterr := client.DeliverMultiple(deliverctx, m0.Sender().String(), rcpts, size, msgr, req8bit, reqsmtputf8, requireTLS)
231 if submiterr != nil {
232 qlog.Infox("smtp transaction for delivery failed", submiterr)
234 failed, delivered = processDeliveries(qlog, m0, msgs, addr, transport.Host, backoff, rcptErrs, submiterr)
237// Process failures and successful deliveries, retiring/removing messages from
238// queue, queueing webhooks.
240// Also used by deliverLocalserve.
241func processDeliveries(qlog mlog.Log, m0 *Msg, msgs []*Msg, remoteAddr string, remoteHost string, backoff time.Duration, rcptErrs []smtpclient.Response, submiterr error) (failed, delivered int) {
243 for i, m := range msgs {
245 slog.Int64("msgid", m.ID),
246 slog.Any("recipient", m.Recipient()))
249 if err == nil && len(rcptErrs) > i {
250 if rcptErrs[i].Code != smtp.C250Completed {
251 err = smtpclient.Error(rcptErrs[i])
255 smtperr, ok := err.(smtpclient.Error)
256 err = fmt.Errorf("delivering message to %s: %w", remoteAddr, err)
257 var remoteMTA dsn.NameIP
259 remoteMTA.Name = remoteHost
263 qmlog.Errorx("submitting message", err, slog.String("remote", remoteAddr))
264 failMsgsDB(qmlog, []*Msg{m}, m0.DialedIPs, backoff, remoteMTA, err)
268 m.markResult(resp.Code, resp.Secode, "", true)
269 delMsgs = append(delMsgs, *m)
270 qmlog.Info("delivered from queue with transport")
274 if len(delMsgs) > 0 {
275 err := DB.Write(context.Background(), func(tx *bstore.Tx) error {
276 return retireMsgs(qlog, tx, webhook.EventDelivered, 0, "", nil, delMsgs...)
279 qlog.Errorx("remove queue message from database after delivery", err)
280 } else if err := removeMsgsFS(qlog, delMsgs...); err != nil {
281 qlog.Errorx("remove queue message from file system after delivery", err)