1package queue
2
3import (
4 "bytes"
5 "context"
6 "crypto/tls"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "os"
12 "time"
13
14 "golang.org/x/exp/slices"
15 "golang.org/x/exp/slog"
16
17 "github.com/mjl-/mox/config"
18 "github.com/mjl-/mox/dns"
19 "github.com/mjl-/mox/dsn"
20 "github.com/mjl-/mox/mlog"
21 "github.com/mjl-/mox/mox-"
22 "github.com/mjl-/mox/sasl"
23 "github.com/mjl-/mox/smtp"
24 "github.com/mjl-/mox/smtpclient"
25 "github.com/mjl-/mox/store"
26)
27
28// todo: reuse connection? do fewer concurrently (other than with direct delivery).
29
30// deliver via another SMTP server, e.g. relaying to a smart host, possibly
31// with authentication (submission).
32func deliverSubmit(qlog mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, m Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
33 // todo: configurable timeouts
34
35 port := transport.Port
36 if port == 0 {
37 port = defaultPort
38 }
39
40 tlsMode := smtpclient.TLSRequiredStartTLS
41 tlsPKIX := true
42 if dialTLS {
43 tlsMode = smtpclient.TLSImmediate
44 } else if transport.STARTTLSInsecureSkipVerify {
45 tlsMode = smtpclient.TLSRequiredStartTLS
46 tlsPKIX = false
47 } else if transport.NoSTARTTLS {
48 tlsMode = smtpclient.TLSSkip
49 tlsPKIX = false
50 }
51 start := time.Now()
52 var deliveryResult string
53 var permanent bool
54 var secodeOpt string
55 var errmsg string
56 var success bool
57 defer func() {
58 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m.Attempts), transportName, string(tlsMode), deliveryResult).Observe(float64(time.Since(start)) / float64(time.Second))
59 qlog.Debug("queue deliversubmit result",
60 slog.Any("host", transport.DNSHost),
61 slog.Int("port", port),
62 slog.Int("attempt", m.Attempts),
63 slog.Bool("permanent", permanent),
64 slog.String("secodeopt", secodeOpt),
65 slog.String("errmsg", errmsg),
66 slog.Bool("ok", success),
67 slog.Duration("duration", time.Since(start)))
68 }()
69
70 // todo: SMTP-DANE should be used when relaying on port 25.
71 // ../rfc/7672:1261
72
73 // todo: for submission, understand SRV records, and even DANE.
74
75 ctx := mox.Shutdown
76
77 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
78 // certificates. If our submission connection is not configured that way, abort.
79 requireTLS := m.RequireTLS != nil && *m.RequireTLS
80 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
81 errmsg = fmt.Sprintf("transport %s: message requires verified tls but transport does not verify tls", transportName)
82 fail(ctx, qlog, m, backoff, true, dsn.NameIP{}, smtp.SePol7MissingReqTLS, errmsg)
83 return
84 }
85
86 dialctx, dialcancel := context.WithTimeout(ctx, 30*time.Second)
87 defer dialcancel()
88 if m.DialedIPs == nil {
89 m.DialedIPs = map[string][]net.IP{}
90 }
91 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog.Logger, resolver, dns.IPDomain{Domain: transport.DNSHost}, m.DialedIPs)
92 var conn net.Conn
93 if err == nil {
94 if m.DialedIPs == nil {
95 m.DialedIPs = map[string][]net.IP{}
96 }
97 conn, _, err = smtpclient.Dial(dialctx, qlog.Logger, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m.DialedIPs, mox.Conf.Static.SpecifiedSMTPListenIPs)
98 }
99 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
100 var result string
101 switch {
102 case err == nil:
103 result = "ok"
104 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
105 result = "timeout"
106 case errors.Is(err, context.Canceled):
107 result = "canceled"
108 default:
109 result = "error"
110 }
111 metricConnection.WithLabelValues(result).Inc()
112 if err != nil {
113 if conn != nil {
114 err := conn.Close()
115 qlog.Check(err, "closing connection")
116 }
117 qlog.Errorx("dialing for submission", err, slog.String("remote", addr))
118 errmsg = fmt.Sprintf("transport %s: dialing %s for submission: %v", transportName, addr, err)
119 fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
120 return
121 }
122 dialcancel()
123
124 var auth func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error)
125 if transport.Auth != nil {
126 a := transport.Auth
127 auth = func(mechanisms []string, cs *tls.ConnectionState) (sasl.Client, error) {
128 var supportsscramsha1plus, supportsscramsha256plus bool
129 for _, mech := range a.EffectiveMechanisms {
130 if !slices.Contains(mechanisms, mech) {
131 switch mech {
132 case "SCRAM-SHA-1-PLUS":
133 supportsscramsha1plus = cs != nil
134 case "SCRAM-SHA-256-PLUS":
135 supportsscramsha256plus = cs != nil
136 }
137 continue
138 }
139 if mech == "SCRAM-SHA-256-PLUS" && cs != nil {
140 return sasl.NewClientSCRAMSHA256PLUS(a.Username, a.Password, *cs), nil
141 } else if mech == "SCRAM-SHA-256" {
142 return sasl.NewClientSCRAMSHA256(a.Username, a.Password, supportsscramsha256plus), nil
143 } else if mech == "SCRAM-SHA-1-PLUS" && cs != nil {
144 return sasl.NewClientSCRAMSHA1PLUS(a.Username, a.Password, *cs), nil
145 } else if mech == "SCRAM-SHA-1" {
146 return sasl.NewClientSCRAMSHA1(a.Username, a.Password, supportsscramsha1plus), nil
147 } else if mech == "CRAM-MD5" {
148 return sasl.NewClientCRAMMD5(a.Username, a.Password), nil
149 } else if mech == "PLAIN" {
150 return sasl.NewClientPlain(a.Username, a.Password), nil
151 }
152 return nil, fmt.Errorf("internal error: unrecognized authentication mechanism %q for transport %s", mech, transportName)
153 }
154
155 // No mutually supported algorithm.
156 return nil, nil
157 }
158 }
159 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
160 defer clientcancel()
161 opts := smtpclient.Opts{
162 Auth: auth,
163 RootCAs: mox.Conf.Static.TLS.CertPool,
164 }
165 client, err := smtpclient.New(clientctx, qlog.Logger, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
166 if err != nil {
167 smtperr, ok := err.(smtpclient.Error)
168 var remoteMTA dsn.NameIP
169 if ok {
170 remoteMTA.Name = transport.Host
171 }
172 qlog.Errorx("establishing smtp session for submission", err, slog.String("remote", addr))
173 errmsg = fmt.Sprintf("transport %s: establishing smtp session with %s for submission: %v", transportName, addr, err)
174 secodeOpt = smtperr.Secode
175 fail(ctx, qlog, m, backoff, false, remoteMTA, secodeOpt, errmsg)
176 return
177 }
178 defer func() {
179 err := client.Close()
180 qlog.Check(err, "closing smtp client after delivery")
181 }()
182 clientcancel()
183
184 var msgr io.ReadCloser
185 var size int64
186 var req8bit, reqsmtputf8 bool
187 if len(m.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
188 msgr = io.NopCloser(bytes.NewReader(m.DSNUTF8))
189 reqsmtputf8 = true
190 size = int64(len(m.DSNUTF8))
191 } else {
192 req8bit = m.Has8bit // todo: not require this, but just try to submit?
193 size = m.Size
194
195 p := m.MessagePath()
196 f, err := os.Open(p)
197 if err != nil {
198 qlog.Errorx("opening message for delivery", err, slog.String("remote", addr), slog.String("path", p))
199 errmsg = fmt.Sprintf("transport %s: opening message file for submission: %v", transportName, err)
200 fail(ctx, qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
201 return
202 }
203 msgr = store.FileMsgReader(m.MsgPrefix, f)
204 defer func() {
205 err := msgr.Close()
206 qlog.Check(err, "closing message after delivery attempt")
207 }()
208 }
209
210 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
211 defer delivercancel()
212 err = client.Deliver(deliverctx, m.Sender().String(), m.Recipient().String(), size, msgr, req8bit, reqsmtputf8, requireTLS)
213 if err != nil {
214 qlog.Infox("delivery failed", err)
215 }
216 var cerr smtpclient.Error
217 switch {
218 case err == nil:
219 deliveryResult = "ok"
220 success = true
221 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
222 deliveryResult = "timeout"
223 case errors.Is(err, context.Canceled):
224 deliveryResult = "canceled"
225 case errors.As(err, &cerr):
226 deliveryResult = "temperror"
227 if cerr.Permanent {
228 deliveryResult = "permerror"
229 }
230 default:
231 deliveryResult = "error"
232 }
233 if err != nil {
234 smtperr, ok := err.(smtpclient.Error)
235 var remoteMTA dsn.NameIP
236 if ok {
237 remoteMTA.Name = transport.Host
238 }
239 qlog.Errorx("submitting email", err, slog.String("remote", addr))
240 permanent = smtperr.Permanent
241 secodeOpt = smtperr.Secode
242 errmsg = fmt.Sprintf("transport %s: submitting email to %s: %v", transportName, addr, err)
243 fail(ctx, qlog, m, backoff, permanent, remoteMTA, secodeOpt, errmsg)
244 return
245 }
246 qlog.Info("delivered from queue with transport")
247 if err := queueDelete(context.Background(), m.ID); err != nil {
248 qlog.Errorx("deleting message from queue after delivery", err)
249 }
250}
251