1package queue
2
3import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "errors"
8 "fmt"
9 "io"
10 "log/slog"
11 "net"
12 "os"
13 "slices"
14 "time"
15
16 "github.com/mjl-/mox/config"
17 "github.com/mjl-/mox/dns"
18 "github.com/mjl-/mox/dsn"
19 "github.com/mjl-/mox/mlog"
20 "github.com/mjl-/mox/mox-"
21 "github.com/mjl-/mox/sasl"
22 "github.com/mjl-/mox/smtp"
23 "github.com/mjl-/mox/smtpclient"
24 "github.com/mjl-/mox/store"
25)
26
27// todo: reuse connection? do fewer concurrently (other than with direct delivery).
28
29// deliver via another SMTP server, e.g. relaying to a smart host, possibly
30// with authentication (submission).
31func deliverSubmit(qlog mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, msgs []*Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
32 // todo: configurable timeouts
33
34 // For convenience, all messages share the same relevant values.
35 m0 := msgs[0]
36
37 port := transport.Port
38 if port == 0 {
39 port = defaultPort
40 }
41
42 tlsMode := smtpclient.TLSRequiredStartTLS
43 tlsPKIX := true
44 if dialTLS {
45 tlsMode = smtpclient.TLSImmediate
46 } else if transport.STARTTLSInsecureSkipVerify {
47 tlsMode = smtpclient.TLSRequiredStartTLS
48 tlsPKIX = false
49 } else if transport.NoSTARTTLS {
50 tlsMode = smtpclient.TLSSkip
51 tlsPKIX = false
52 }
53
54 // Prepare values for logging/metrics. They are updated for various error
55 // conditions later on.
56 start := time.Now()
57 var submiterr error // Of whole operation, nil for partial failure/success.
58 var delivered int
59 failed := len(msgs) // Reset and updated after smtp transaction.
60 defer func() {
61 r := deliveryResult(submiterr, delivered, failed)
62 d := float64(time.Since(start)) / float64(time.Second)
63 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m0.Attempts), transportName, string(tlsMode), r).Observe(d)
64
65 qlog.Debugx("queue deliversubmit result", submiterr,
66 slog.Any("host", transport.DNSHost),
67 slog.Int("port", port),
68 slog.Int("attempt", m0.Attempts),
69 slog.String("result", r),
70 slog.Int("delivered", delivered),
71 slog.Int("failed", failed),
72 slog.Any("tlsmode", tlsMode),
73 slog.Bool("tlspkix", tlsPKIX),
74 slog.Duration("duration", time.Since(start)))
75 }()
76
77 // todo: SMTP-DANE should be used when relaying on port 25.
78 // ../rfc/7672:1261
79
80 // todo: for submission, understand SRV records, and even DANE.
81
82 ctx := mox.Shutdown
83
84 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
85 // certificates. If our submission connection is not configured that way, abort.
86 requireTLS := m0.RequireTLS != nil && *m0.RequireTLS
87 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
88 submiterr = smtpclient.Error{
89 Permanent: true,
90 Code: smtp.C554TransactionFailed,
91 Secode: smtp.SePol7MissingReqTLS30,
92 Err: fmt.Errorf("transport %s: message requires verified tls but transport does not verify tls", transportName),
93 }
94 fail(ctx, qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
95 return
96 }
97
98 dialctx, dialcancel := context.WithTimeout(ctx, 30*time.Second)
99 defer dialcancel()
100 if msgs[0].DialedIPs == nil {
101 msgs[0].DialedIPs = map[string][]net.IP{}
102 m0 = msgs[0]
103 }
104 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog.Logger, resolver, dns.IPDomain{Domain: transport.DNSHost}, m0.DialedIPs)
105 var conn net.Conn
106 if err == nil {
107 conn, _, err = smtpclient.Dial(dialctx, qlog.Logger, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m0.DialedIPs, mox.Conf.Static.SpecifiedSMTPListenIPs)
108 }
109 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
110 var result string
111 switch {
112 case err == nil:
113 result = "ok"
114 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
115 result = "timeout"
116 case errors.Is(err, context.Canceled):
117 result = "canceled"
118 default:
119 result = "error"
120 }
121 metricConnection.WithLabelValues(result).Inc()
122 if err != nil {
123 if conn != nil {
124 err := conn.Close()
125 qlog.Check(err, "closing connection")
126 }
127 qlog.Errorx("dialing for submission", err, slog.String("remote", addr))
128 submiterr = fmt.Errorf("transport %s: dialing %s for submission: %w", transportName, addr, err)
129 fail(ctx, qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
130 return
131 }
132 dialcancel()
133
134 var auth func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error)
135 if transport.Auth != nil {
136 a := transport.Auth
137 auth = func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error) {
138 var supportsscramsha1plus, supportsscramsha256plus bool
139 for _, mech := range a.EffectiveMechanisms {
140 if !slices.Contains(mechanisms, mech) {
141 switch mech {
142 case "SCRAM-SHA-1-PLUS":
143 supportsscramsha1plus = cs != nil
144 case "SCRAM-SHA-256-PLUS":
145 supportsscramsha256plus = cs != nil
146 }
147 continue
148 }
149 if mech == "SCRAM-SHA-256-PLUS" && cs != nil {
150 return sasl.NewClientSCRAMSHA256PLUS(a.Username, a.Password, *cs), nil
151 } else if mech == "SCRAM-SHA-256" {
152 return sasl.NewClientSCRAMSHA256(a.Username, a.Password, supportsscramsha256plus), nil
153 } else if mech == "SCRAM-SHA-1-PLUS" && cs != nil {
154 return sasl.NewClientSCRAMSHA1PLUS(a.Username, a.Password, *cs), nil
155 } else if mech == "SCRAM-SHA-1" {
156 return sasl.NewClientSCRAMSHA1(a.Username, a.Password, supportsscramsha1plus), nil
157 } else if mech == "CRAM-MD5" {
158 return sasl.NewClientCRAMMD5(a.Username, a.Password), nil
159 } else if mech == "PLAIN" {
160 return sasl.NewClientPlain(a.Username, a.Password), nil
161 }
162 return nil, fmt.Errorf("internal error: unrecognized authentication mechanism %q for transport %s", mech, transportName)
163 }
164
165 // No mutually supported algorithm.
166 return nil, nil
167 }
168 }
169 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
170 defer clientcancel()
171 opts := smtpclient.Opts{
172 Auth: auth,
173 RootCAs: mox.Conf.Static.TLS.CertPool,
174 }
175 client, err := smtpclient.New(clientctx, qlog.Logger, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
176 if err != nil {
177 smtperr, ok := err.(smtpclient.Error)
178 var remoteMTA dsn.NameIP
179 submiterr = fmt.Errorf("transport %s: establishing smtp session with %s for submission: %w", transportName, addr, err)
180 if ok {
181 remoteMTA.Name = transport.Host
182 smtperr.Err = submiterr
183 submiterr = smtperr
184 }
185 qlog.Errorx("establishing smtp session for submission", submiterr, slog.String("remote", addr))
186 fail(ctx, qlog, msgs, m0.DialedIPs, backoff, remoteMTA, submiterr)
187 return
188 }
189 defer func() {
190 err := client.Close()
191 qlog.Check(err, "closing smtp client after delivery")
192 }()
193 clientcancel()
194
195 var msgr io.ReadCloser
196 var size int64
197 var req8bit, reqsmtputf8 bool
198 if len(m0.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
199 msgr = io.NopCloser(bytes.NewReader(m0.DSNUTF8))
200 reqsmtputf8 = true
201 size = int64(len(m0.DSNUTF8))
202 } else {
203 req8bit = m0.Has8bit // todo: not require this, but just try to submit?
204 size = m0.Size
205
206 p := m0.MessagePath()
207 f, err := os.Open(p)
208 if err != nil {
209 qlog.Errorx("opening message for delivery", err, slog.String("remote", addr), slog.String("path", p))
210 submiterr = fmt.Errorf("transport %s: opening message file for submission: %w", transportName, err)
211 fail(ctx, qlog, msgs, m0.DialedIPs, backoff, dsn.NameIP{}, submiterr)
212 return
213 }
214 msgr = store.FileMsgReader(m0.MsgPrefix, f)
215 defer func() {
216 err := msgr.Close()
217 qlog.Check(err, "closing message after delivery attempt")
218 }()
219 }
220
221 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
222 defer delivercancel()
223 rcpts := make([]string, len(msgs))
224 for i, m := range msgs {
225 rcpts[i] = m.Recipient().String()
226 }
227 rcptErrs, submiterr := client.DeliverMultiple(deliverctx, m0.Sender().String(), rcpts, size, msgr, req8bit, reqsmtputf8, requireTLS)
228 if submiterr != nil {
229 qlog.Infox("smtp transaction for delivery failed", submiterr)
230 }
231 failed = 0 // Reset, we are looking at the SMTP results below.
232 var delIDs []int64
233 for i, m := range msgs {
234 qmlog := qlog.With(
235 slog.Int64("msgid", m.ID),
236 slog.Any("recipient", m.Recipient()))
237
238 err := submiterr
239 if err == nil && len(rcptErrs) > i {
240 if rcptErrs[i].Code != smtp.C250Completed {
241 err = smtpclient.Error(rcptErrs[i])
242 }
243 }
244 if err != nil {
245 smtperr, ok := err.(smtpclient.Error)
246 err = fmt.Errorf("transport %s: submitting message to %s: %w", transportName, addr, err)
247 var remoteMTA dsn.NameIP
248 if ok {
249 remoteMTA.Name = transport.Host
250 smtperr.Err = err
251 err = smtperr
252 }
253 qmlog.Errorx("submitting message", err, slog.String("remote", addr))
254 fail(ctx, qmlog, []*Msg{m}, m0.DialedIPs, backoff, remoteMTA, err)
255 failed++
256 } else {
257 delIDs = append(delIDs, m.ID)
258 qmlog.Info("delivered from queue with transport")
259 delivered++
260 }
261 }
262 if len(delIDs) > 0 {
263 if err := queueDelete(context.Background(), delIDs...); err != nil {
264 qlog.Errorx("deleting message from queue after delivery", err)
265 }
266 }
267}
268