1package dmarcdb
2
3// Sending TLS reports and DMARC reports is very similar. See ../dmarcdb/eval.go:/similar and ../tlsrptsend/send.go:/similar.
4
5import (
6 "compress/gzip"
7 "context"
8 "encoding/xml"
9 "errors"
10 "fmt"
11 "io"
12 "log/slog"
13 "mime"
14 "mime/multipart"
15 "net/textproto"
16 "net/url"
17 "os"
18 "path/filepath"
19 "runtime/debug"
20 "slices"
21 "sort"
22 "strings"
23 "sync"
24 "time"
25
26 "golang.org/x/exp/maps"
27
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/prometheus/client_golang/prometheus/promauto"
30
31 "github.com/mjl-/bstore"
32
33 "github.com/mjl-/mox/dkim"
34 "github.com/mjl-/mox/dmarc"
35 "github.com/mjl-/mox/dmarcrpt"
36 "github.com/mjl-/mox/dns"
37 "github.com/mjl-/mox/message"
38 "github.com/mjl-/mox/metrics"
39 "github.com/mjl-/mox/mlog"
40 "github.com/mjl-/mox/mox-"
41 "github.com/mjl-/mox/moxio"
42 "github.com/mjl-/mox/moxvar"
43 "github.com/mjl-/mox/publicsuffix"
44 "github.com/mjl-/mox/queue"
45 "github.com/mjl-/mox/smtp"
46 "github.com/mjl-/mox/store"
47)
48
49var (
50 metricReport = promauto.NewCounter(
51 prometheus.CounterOpts{
52 Name: "mox_dmarcdb_report_queued_total",
53 Help: "Total messages with DMARC aggregate/error reports queued.",
54 },
55 )
56 metricReportError = promauto.NewCounter(
57 prometheus.CounterOpts{
58 Name: "mox_dmarcdb_report_error_total",
59 Help: "Total errors while composing or queueing DMARC aggregate/error reports.",
60 },
61 )
62)
63
64var (
65 EvalDBTypes = []any{Evaluation{}, SuppressAddress{}} // Types stored in DB.
66 // Exported for backups. For incoming deliveries the SMTP server adds evaluations
67 // to the database. Every hour, a goroutine wakes up that gathers evaluations from
68 // the last hour(s), sends a report, and removes the evaluations from the database.
69 EvalDB *bstore.DB
70 evalMutex sync.Mutex
71)
72
73// Evaluation is the result of an evaluation of a DMARC policy, to be included
74// in a DMARC report.
75type Evaluation struct {
76 ID int64
77
78 // Domain where DMARC policy was found, could be the organizational domain while
79 // evaluation was for a subdomain. Unicode. Same as domain found in
80 // PolicyPublished. A separate field for its index.
81 PolicyDomain string `bstore:"index"`
82
83 // Time of evaluation, determines which report (covering whole hours) this
84 // evaluation will be included in.
85 Evaluated time.Time `bstore:"default now"`
86
87 // If optional, this evaluation is not a reason to send a DMARC report, but it will
88 // be included when a report is sent due to other non-optional evaluations. Set for
89 // evaluations of incoming DMARC reports. We don't want such deliveries causing us to
90 // send a report, or we would keep exchanging reporting messages forever. Also set
91 // for when evaluation is a DMARC reject for domains we haven't positively
92 // interacted with, to prevent being used to flood an unsuspecting domain with
93 // reports.
94 Optional bool
95
96 // Effective aggregate reporting interval in hours. Between 1 and 24, rounded up
97 // from seconds from policy to first number that can divide 24.
98 IntervalHours int
99
100 // "rua" in DMARC record, we only store evaluations for records with aggregate reporting addresses, so always non-empty.
101 Addresses []string
102
103 // Policy used for evaluation. We don't store the "fo" field for failure reporting
104 // options, since we don't send failure reports for individual messages.
105 PolicyPublished dmarcrpt.PolicyPublished
106
107 // For "row" in a report record.
108 SourceIP string
109 Disposition dmarcrpt.Disposition
110 AlignedDKIMPass bool
111 AlignedSPFPass bool
112 OverrideReasons []dmarcrpt.PolicyOverrideReason
113
114 // For "identifiers" in a report record.
115 EnvelopeTo string
116 EnvelopeFrom string
117 HeaderFrom string
118
119 // For "auth_results" in a report record.
120 DKIMResults []dmarcrpt.DKIMAuthResult
121 SPFResults []dmarcrpt.SPFAuthResult
122}
123
124// SuppressAddress is a reporting address for which outgoing DMARC reports
125// will be suppressed for a period.
126type SuppressAddress struct {
127 ID int64
128 Inserted time.Time `bstore:"default now"`
129 ReportingAddress string `bstore:"unique"`
130 Until time.Time `bstore:"nonzero"`
131 Comment string
132}
133
134var dmarcResults = map[bool]dmarcrpt.DMARCResult{
135 false: dmarcrpt.DMARCFail,
136 true: dmarcrpt.DMARCPass,
137}
138
139// ReportRecord turns an evaluation into a record that can be included in a
140// report.
141func (e Evaluation) ReportRecord(count int) dmarcrpt.ReportRecord {
142 return dmarcrpt.ReportRecord{
143 Row: dmarcrpt.Row{
144 SourceIP: e.SourceIP,
145 Count: count,
146 PolicyEvaluated: dmarcrpt.PolicyEvaluated{
147 Disposition: e.Disposition,
148 DKIM: dmarcResults[e.AlignedDKIMPass],
149 SPF: dmarcResults[e.AlignedSPFPass],
150 Reasons: e.OverrideReasons,
151 },
152 },
153 Identifiers: dmarcrpt.Identifiers{
154 EnvelopeTo: e.EnvelopeTo,
155 EnvelopeFrom: e.EnvelopeFrom,
156 HeaderFrom: e.HeaderFrom,
157 },
158 AuthResults: dmarcrpt.AuthResults{
159 DKIM: e.DKIMResults,
160 SPF: e.SPFResults,
161 },
162 }
163}
164
165func evalDB(ctx context.Context) (rdb *bstore.DB, rerr error) {
166 evalMutex.Lock()
167 defer evalMutex.Unlock()
168 if EvalDB == nil {
169 p := mox.DataDirPath("dmarceval.db")
170 os.MkdirAll(filepath.Dir(p), 0770)
171 db, err := bstore.Open(ctx, p, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, EvalDBTypes...)
172 if err != nil {
173 return nil, err
174 }
175 EvalDB = db
176 }
177 return EvalDB, nil
178}
179
180var intervalOpts = []int{24, 12, 8, 6, 4, 3, 2}
181
182func intervalHours(seconds int) int {
183 hours := (seconds + 3600 - 1) / 3600
184 for _, opt := range intervalOpts {
185 if hours >= opt {
186 return opt
187 }
188 }
189 return 1
190}
191
192// AddEvaluation adds the result of a DMARC evaluation for an incoming message
193// to the database.
194//
195// AddEvaluation sets Evaluation.IntervalHours based on
196// aggregateReportingIntervalSeconds.
197func AddEvaluation(ctx context.Context, aggregateReportingIntervalSeconds int, e *Evaluation) error {
198 e.IntervalHours = intervalHours(aggregateReportingIntervalSeconds)
199
200 db, err := evalDB(ctx)
201 if err != nil {
202 return err
203 }
204
205 e.ID = 0
206 return db.Insert(ctx, e)
207}
208
209// Evaluations returns all evaluations in the database.
210func Evaluations(ctx context.Context) ([]Evaluation, error) {
211 db, err := evalDB(ctx)
212 if err != nil {
213 return nil, err
214 }
215
216 q := bstore.QueryDB[Evaluation](ctx, db)
217 q.SortAsc("Evaluated")
218 return q.List()
219}
220
221// EvaluationStat summarizes stored evaluations, for inclusion in an upcoming
222// aggregate report, for a domain.
223type EvaluationStat struct {
224 Domain dns.Domain
225 Dispositions []string
226 Count int
227 SendReport bool
228}
229
230// EvaluationStats returns evaluation counts and report-sending status per domain.
231func EvaluationStats(ctx context.Context) (map[string]EvaluationStat, error) {
232 db, err := evalDB(ctx)
233 if err != nil {
234 return nil, err
235 }
236
237 r := map[string]EvaluationStat{}
238
239 err = bstore.QueryDB[Evaluation](ctx, db).ForEach(func(e Evaluation) error {
240 if stat, ok := r[e.PolicyDomain]; ok {
241 if !slices.Contains(stat.Dispositions, string(e.Disposition)) {
242 stat.Dispositions = append(stat.Dispositions, string(e.Disposition))
243 }
244 stat.Count++
245 stat.SendReport = stat.SendReport || !e.Optional
246 r[e.PolicyDomain] = stat
247 } else {
248 dom, err := dns.ParseDomain(e.PolicyDomain)
249 if err != nil {
250 return fmt.Errorf("parsing domain %q: %v", e.PolicyDomain, err)
251 }
252 r[e.PolicyDomain] = EvaluationStat{
253 Domain: dom,
254 Dispositions: []string{string(e.Disposition)},
255 Count: 1,
256 SendReport: !e.Optional,
257 }
258 }
259 return nil
260 })
261 return r, err
262}
263
264// EvaluationsDomain returns all evaluations for a domain.
265func EvaluationsDomain(ctx context.Context, domain dns.Domain) ([]Evaluation, error) {
266 db, err := evalDB(ctx)
267 if err != nil {
268 return nil, err
269 }
270
271 q := bstore.QueryDB[Evaluation](ctx, db)
272 q.FilterNonzero(Evaluation{PolicyDomain: domain.Name()})
273 q.SortAsc("Evaluated")
274 return q.List()
275}
276
277// RemoveEvaluationsDomain removes evaluations for domain so they won't be sent in
278// an aggregate report.
279func RemoveEvaluationsDomain(ctx context.Context, domain dns.Domain) error {
280 db, err := evalDB(ctx)
281 if err != nil {
282 return err
283 }
284
285 q := bstore.QueryDB[Evaluation](ctx, db)
286 q.FilterNonzero(Evaluation{PolicyDomain: domain.Name()})
287 _, err = q.Delete()
288 return err
289}
290
291var jitterRand = mox.NewPseudoRand()
292
293// time to sleep until next whole hour t, replaced by tests.
294// Jitter so we don't cause load at exactly whole hours, other processes may
295// already be doing that.
296var jitteredTimeUntil = func(t time.Time) time.Duration {
297 return time.Until(t.Add(time.Duration(30+jitterRand.Intn(60)) * time.Second))
298}
299
300// Start launches a goroutine that wakes up at each whole hour (plus jitter) and
301// sends DMARC reports to domains that requested them.
302func Start(resolver dns.Resolver) {
303 go func() {
304 log := mlog.New("dmarcdb", nil)
305
306 defer func() {
307 // In case of panic don't take the whole program down.
308 x := recover()
309 if x != nil {
310 log.Error("recover from panic", slog.Any("panic", x))
311 debug.PrintStack()
312 metrics.PanicInc(metrics.Dmarcdb)
313 }
314 }()
315
316 timer := time.NewTimer(time.Hour)
317 defer timer.Stop()
318
319 ctx := mox.Shutdown
320
321 db, err := evalDB(ctx)
322 if err != nil {
323 log.Errorx("opening dmarc evaluations database for sending dmarc aggregate reports, not sending reports", err)
324 return
325 }
326
327 for {
328 now := time.Now()
329 nextEnd := nextWholeHour(now)
330 timer.Reset(jitteredTimeUntil(nextEnd))
331
332 select {
333 case <-ctx.Done():
334 log.Info("dmarc aggregate report sender shutting down")
335 return
336 case <-timer.C:
337 }
338
339 // Gather report intervals we want to process now. Multiples of hours that can
340 // divide 24, starting from UTC.
341 // ../rfc/7489:1750
342 utchour := nextEnd.UTC().Hour()
343 if utchour == 0 {
344 utchour = 24
345 }
346 intervals := []int{}
347 for _, ival := range intervalOpts {
348 if ival*(utchour/ival) == utchour {
349 intervals = append(intervals, ival)
350 }
351 }
352 intervals = append(intervals, 1)
353
354 // Remove evaluations older than 48 hours (2 reports with the default and maximum
355 // 24 hour interval). They should have been processed by now. We may have kept them
356 // during temporary errors, but persistent temporary errors shouldn't fill up our
357 // database. This also cleans up evaluations that were all optional for a domain.
358 _, err := bstore.QueryDB[Evaluation](ctx, db).FilterLess("Evaluated", nextEnd.Add(-48*time.Hour)).Delete()
359 log.Check(err, "removing stale dmarc evaluations from database")
360
361 clog := log.WithCid(mox.Cid())
362 clog.Info("sending dmarc aggregate reports", slog.Time("end", nextEnd.UTC()), slog.Any("intervals", intervals))
363 if err := sendReports(ctx, clog, resolver, db, nextEnd, intervals); err != nil {
364 clog.Errorx("sending dmarc aggregate reports", err)
365 metricReportError.Inc()
366 } else {
367 clog.Info("finished sending dmarc aggregate reports")
368 }
369 }
370 }()
371}
372
373func nextWholeHour(now time.Time) time.Time {
374 t := now
375 t = t.Add(time.Hour)
376 return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
377}
378
379// We don't send reports at full speed. In the future, we could try to stretch out
380// reports a bit smarter. E.g. over 5 minutes with some minimum interval, and
381// perhaps faster and in parallel when there are lots of reports. Perhaps also
382// depending on reporting interval (faster for 1h, slower for 24h).
383// Replaced by tests.
384var sleepBetween = func(ctx context.Context, between time.Duration) (ok bool) {
385 t := time.NewTimer(between)
386 select {
387 case <-ctx.Done():
388 t.Stop()
389 return false
390 case <-t.C:
391 return true
392 }
393}
394
395// sendReports gathers all policy domains that have evaluations that should
396// receive a DMARC report and sends a report to each.
397func sendReports(ctx context.Context, log mlog.Log, resolver dns.Resolver, db *bstore.DB, endTime time.Time, intervals []int) error {
398 ivals := make([]any, len(intervals))
399 for i, v := range intervals {
400 ivals[i] = v
401 }
402
403 destDomains := map[string]bool{}
404
405 // Gather all domains that we plan to send to.
406 nsend := 0
407 q := bstore.QueryDB[Evaluation](ctx, db)
408 q.FilterLess("Evaluated", endTime)
409 q.FilterEqual("IntervalHours", ivals...)
410 err := q.ForEach(func(e Evaluation) error {
411 if !e.Optional && !destDomains[e.PolicyPublished.Domain] {
412 nsend++
413 }
414 destDomains[e.PolicyPublished.Domain] = destDomains[e.PolicyPublished.Domain] || !e.Optional
415 return nil
416 })
417 if err != nil {
418 return fmt.Errorf("looking for domains to send reports to: %v", err)
419 }
420
421 var wg sync.WaitGroup
422
423 // Sleep in between sending reports. We process hourly, and spread the reports over
424 // the hour, but with max 5 minute interval.
425 between := 45 * time.Minute
426 if nsend > 0 {
427 between /= time.Duration(nsend)
428 }
429 if between > 5*time.Minute {
430 between = 5 * time.Minute
431 }
432
433 // Attempt to send report to each domain.
434 n := 0
435 for d, send := range destDomains {
436 // Cleanup evaluations for domain with only optionals.
437 if !send {
438 removeEvaluations(ctx, log, db, endTime, d)
439 continue
440 }
441
442 if n > 0 {
443 if ok := sleepBetween(ctx, between); !ok {
444 return nil
445 }
446 }
447 n++
448
449 // Send in goroutine, so a slow process doesn't block progress.
450 wg.Add(1)
451 go func(domain string) {
452 defer func() {
453 // In case of panic don't take the whole program down.
454 x := recover()
455 if x != nil {
456 log.Error("unhandled panic in dmarcdb sendReports", slog.Any("panic", x))
457 debug.PrintStack()
458 metrics.PanicInc(metrics.Dmarcdb)
459 }
460 }()
461 defer wg.Done()
462
463 rlog := log.WithCid(mox.Cid()).With(slog.Any("domain", domain))
464 rlog.Info("sending dmarc report")
465 if _, err := sendReportDomain(ctx, rlog, resolver, db, endTime, domain); err != nil {
466 rlog.Errorx("sending dmarc aggregate report to domain", err)
467 metricReportError.Inc()
468 }
469 }(d)
470 }
471
472 wg.Wait()
473
474 return nil
475}
476
477type recipient struct {
478 address smtp.Address
479 maxSize uint64
480}
481
482func parseRecipient(log mlog.Log, uri dmarc.URI) (r recipient, ok bool) {
483 log = log.With(slog.Any("uri", uri.Address))
484
485 u, err := url.Parse(uri.Address)
486 if err != nil {
487 log.Debugx("parsing uri in dmarc record rua value", err)
488 return r, false
489 }
490 if !strings.EqualFold(u.Scheme, "mailto") {
491 log.Debug("skipping unrecognized scheme in dmarc record rua value")
492 return r, false
493 }
494 addr, err := smtp.ParseAddress(u.Opaque)
495 if err != nil {
496 log.Debugx("parsing mailto uri in dmarc record rua value", err)
497 return r, false
498 }
499
500 r = recipient{addr, uri.MaxSize}
501 // ../rfc/7489:1197
502 switch uri.Unit {
503 case "k", "K":
504 r.maxSize *= 1024
505 case "m", "M":
506 r.maxSize *= 1024 * 1024
507 case "g", "G":
508 r.maxSize *= 1024 * 1024 * 1024
509 case "t", "T":
510 // Oh yeah, terabyte-sized reports!
511 r.maxSize *= 1024 * 1024 * 1024 * 1024
512 case "":
513 default:
514 log.Debug("unrecognized max size unit in dmarc record rua value", slog.String("unit", uri.Unit))
515 return r, false
516 }
517
518 return r, true
519}
520
521func removeEvaluations(ctx context.Context, log mlog.Log, db *bstore.DB, endTime time.Time, domain string) {
522 q := bstore.QueryDB[Evaluation](ctx, db)
523 q.FilterLess("Evaluated", endTime)
524 q.FilterNonzero(Evaluation{PolicyDomain: domain})
525 _, err := q.Delete()
526 log.Check(err, "removing evaluations after processing for dmarc aggregate report")
527}
528
529// replaceable for testing.
530var queueAdd = queue.Add
531
532func sendReportDomain(ctx context.Context, log mlog.Log, resolver dns.Resolver, db *bstore.DB, endTime time.Time, domain string) (cleanup bool, rerr error) {
533 dom, err := dns.ParseDomain(domain)
534 if err != nil {
535 return false, fmt.Errorf("parsing domain for sending reports: %v", err)
536 }
537
538 // We'll cleanup records by default.
539 cleanup = true
540 // If we encounter a temporary error we cancel cleanup of evaluations on error.
541 tempError := false
542
543 defer func() {
544 if !cleanup || tempError {
545 log.Debug("not cleaning up evaluations after attempting to send dmarc aggregate report")
546 } else {
547 removeEvaluations(ctx, log, db, endTime, domain)
548 }
549 }()
550
551 // We're going to build up this report.
552 report := dmarcrpt.Feedback{
553 Version: "1.0",
554 ReportMetadata: dmarcrpt.ReportMetadata{
555 OrgName: mox.Conf.Static.HostnameDomain.ASCII,
556 Email: "postmaster@" + mox.Conf.Static.HostnameDomain.ASCII,
557 // ReportID and DateRange are set after we've seen evaluations.
558 // Errors is filled below when we encounter problems.
559 },
560 // We'll fill the records below.
561 Records: []dmarcrpt.ReportRecord{},
562 }
563
564 var errors []string // For report.ReportMetaData.Errors
565
566 // Check if we should be sending a report at all: if there are rua URIs in the
567 // current DMARC record. The interval may have changed too, but we'll flush out our
568 // evaluations regardless. We always use the latest DMARC record when sending, but
569 // we'll lump all policies of the last interval into one report.
570 // ../rfc/7489:1714
571 status, _, record, _, _, err := dmarc.Lookup(ctx, log.Logger, resolver, dom)
572 if err != nil {
573 // todo future: we could perhaps still send this report, assuming the values we know. in case of temporary error, we could also schedule again regardless of next interval hour (we would now only retry a 24h-interval report after 24h passed).
574 // Remove records unless it was a temporary error. We'll try again next round.
575 cleanup = status != dmarc.StatusTemperror
576 return cleanup, fmt.Errorf("looking up current dmarc record for reporting address: %v", err)
577 }
578
579 var recipients []recipient
580
581 // Gather all aggregate reporting addresses to try to send to. We'll start with
582 // those in the initial DMARC record, but will follow external reporting addresses
583 // and possibly update the list.
584 for _, uri := range record.AggregateReportAddresses {
585 r, ok := parseRecipient(log, uri)
586 if !ok {
587 continue
588 }
589
590 // Check if domain of rua recipient has the same organizational domain as for the
591 // evaluations. If not, we need to verify we are allowed to send.
592 ruaOrgDom := publicsuffix.Lookup(ctx, log.Logger, r.address.Domain)
593 evalOrgDom := publicsuffix.Lookup(ctx, log.Logger, dom)
594
595 if ruaOrgDom == evalOrgDom {
596 recipients = append(recipients, r)
597 continue
598 }
599
600 // Verify and follow addresses in other organizational domain through
601 // <policydomain>._report._dmarc.<host> lookup.
602 // ../rfc/7489:1556
603 accepts, status, records, _, _, err := dmarc.LookupExternalReportsAccepted(ctx, log.Logger, resolver, evalOrgDom, r.address.Domain)
604 log.Debugx("checking if rua address with different organization domain has opted into receiving dmarc reports", err,
605 slog.Any("policydomain", evalOrgDom),
606 slog.Any("destinationdomain", r.address.Domain),
607 slog.Bool("accepts", accepts),
608 slog.Any("status", status))
609 if status == dmarc.StatusTemperror {
610 // With a temporary error, we'll try to get the report the delivered anyway,
611 // perhaps there are multiple recipients.
612 // ../rfc/7489:1578
613 tempError = true
614 errors = append(errors, "temporary error checking authorization for report delegation to external address")
615 }
616 if !accepts {
617 errors = append(errors, fmt.Sprintf("rua %s is external domain that does not opt-in to receiving dmarc records through _report dmarc record", r.address))
618 continue
619 }
620
621 // We can follow a _report DMARC DNS record once. In that record, a domain may
622 // specify alternative addresses that we should send reports to instead. Such
623 // alternative address(es) must have the same host. If not, we ignore the new
624 // value. Behaviour for multiple records and/or multiple new addresses is
625 // underspecified. We'll replace an address with one or more new addresses, and
626 // keep the original if there was no candidate (which covers the case of invalid
627 // alternative addresses and no new address specified).
628 // ../rfc/7489:1600
629 foundReplacement := false
630 rlog := log.With(slog.Any("followedaddress", uri.Address))
631 for _, record := range records {
632 for _, exturi := range record.AggregateReportAddresses {
633 extr, ok := parseRecipient(rlog, exturi)
634 if !ok {
635 continue
636 }
637 if extr.address.Domain != r.address.Domain {
638 rlog.Debug("rua address in external _report dmarc record has different host than initial dmarc record, ignoring new name", slog.Any("externaladdress", extr.address))
639 errors = append(errors, fmt.Sprintf("rua %s is external domain with a replacement address %s with different host", r.address, extr.address))
640 } else {
641 rlog.Debug("using replacement rua address from external _report dmarc record", slog.Any("externaladdress", extr.address))
642 foundReplacement = true
643 recipients = append(recipients, extr)
644 }
645 }
646 }
647 if !foundReplacement {
648 recipients = append(recipients, r)
649 }
650 }
651
652 if len(recipients) == 0 {
653 // No reports requested, perfectly fine, no work to do for us.
654 log.Debug("no aggregate reporting addresses configured")
655 return true, nil
656 }
657
658 // We count idential records. Can be common with a domain sending quite some email.
659 // Though less if the sending domain has many IPs. In the future, we may want to
660 // remove some details from records so we can aggregate them into fewer rows.
661 type recordCount struct {
662 dmarcrpt.ReportRecord
663 count int
664 }
665 counts := map[string]recordCount{}
666
667 var first, last Evaluation // For daterange.
668 var sendReport bool
669
670 q := bstore.QueryDB[Evaluation](ctx, db)
671 q.FilterLess("Evaluated", endTime)
672 q.FilterNonzero(Evaluation{PolicyDomain: domain})
673 q.SortAsc("Evaluated")
674 err = q.ForEach(func(e Evaluation) error {
675 if first.ID == 0 {
676 first = e
677 }
678 last = e
679
680 record := e.ReportRecord(0)
681
682 // todo future: if we see many unique records from a single ip (exact ipv4 or ipv6 subnet), we may want to coalesce them into a single record, leaving out the fields that make them: a single ip could cause a report to contain many records with many unique domains, selectors, etc. it may compress relatively well, but the reports could still be huge.
683
684 // Simple but inefficient way to aggregate identical records. We may want to turn
685 // records into smaller representation in the future.
686 recbuf, err := xml.Marshal(record)
687 if err != nil {
688 return fmt.Errorf("xml marshal of report record: %v", err)
689 }
690 recstr := string(recbuf)
691 counts[recstr] = recordCount{record, counts[recstr].count + 1}
692 if !e.Optional {
693 sendReport = true
694 }
695 return nil
696 })
697 if err != nil {
698 return false, fmt.Errorf("gathering evaluations for report: %v", err)
699 }
700
701 if !sendReport {
702 log.Debug("no non-optional evaluations for domain, not sending dmarc aggregate report")
703 return true, nil
704 }
705
706 // Set begin and end date range. We try to set it to whole intervals as requested
707 // by the domain owner. The typical, default and maximum interval is 24 hours. But
708 // we allow any whole number of hours that can divide 24 hours. If we have an
709 // evaluation that is older, we may have had a failure to send earlier. We include
710 // those earlier intervals in this report as well.
711 //
712 // Although "end" could be interpreted as exclusive, to be on the safe side
713 // regarding client behaviour, and (related) to mimic large existing DMARC report
714 // senders, we set it to the last second of the period this report covers.
715 report.ReportMetadata.DateRange.End = endTime.Add(-time.Second).Unix()
716 interval := time.Duration(first.IntervalHours) * time.Hour
717 beginTime := endTime.Add(-interval)
718 for first.Evaluated.Before(beginTime) {
719 beginTime = beginTime.Add(-interval)
720 }
721 report.ReportMetadata.DateRange.Begin = beginTime.Unix()
722
723 // yyyymmddHH, we only send one report per hour, so should be unique per policy
724 // domain. We also add a truly unique id based on first evaluation id used without
725 // revealing the number of evaluations we have. Reuse of ReceivedID is not great,
726 // but shouldn't hurt.
727 report.ReportMetadata.ReportID = endTime.UTC().Format("20060102.15") + "." + mox.ReceivedID(first.ID)
728
729 // We may include errors we encountered when composing the report. We
730 // don't currently include errors about dmarc evaluations, e.g. DNS
731 // lookup errors during incoming deliveries.
732 report.ReportMetadata.Errors = errors
733
734 // We'll fill this with the last-used record, not the one we fetch fresh from DSN.
735 // They will almost always be the same, but if not, the fresh record was never
736 // actually used for evaluations, so no point in reporting it.
737 report.PolicyPublished = last.PolicyPublished
738
739 // Process records in-order for testable results.
740 recstrs := maps.Keys(counts)
741 sort.Strings(recstrs)
742 for _, recstr := range recstrs {
743 rc := counts[recstr]
744 rc.ReportRecord.Row.Count = rc.count
745 report.Records = append(report.Records, rc.ReportRecord)
746 }
747
748 reportFile, err := store.CreateMessageTemp(log, "dmarcreportout")
749 if err != nil {
750 return false, fmt.Errorf("creating temporary file for outgoing dmarc aggregate report: %v", err)
751 }
752 defer store.CloseRemoveTempFile(log, reportFile, "generated dmarc aggregate report")
753
754 gzw := gzip.NewWriter(reportFile)
755 _, err = fmt.Fprint(gzw, xml.Header)
756 enc := xml.NewEncoder(gzw)
757 enc.Indent("", "\t") // Keep up pretention that xml is human-readable.
758 if err == nil {
759 err = enc.Encode(report)
760 }
761 if err == nil {
762 err = enc.Close()
763 }
764 if err == nil {
765 err = gzw.Close()
766 }
767 if err != nil {
768 return true, fmt.Errorf("writing dmarc aggregate report as xml with gzip: %v", err)
769 }
770
771 msgf, err := store.CreateMessageTemp(log, "dmarcreportmsgout")
772 if err != nil {
773 return false, fmt.Errorf("creating temporary message file with outgoing dmarc aggregate report: %v", err)
774 }
775 defer store.CloseRemoveTempFile(log, msgf, "message with generated dmarc aggregate report")
776
777 // We are sending reports from our host's postmaster address. In a
778 // typical setup the host is a subdomain of a configured domain with
779 // DKIM keys, so we can DKIM-sign our reports. SPF should pass anyway.
780 // A single report can contain deliveries from a single policy domain
781 // to multiple of our configured domains.
782 from := smtp.Address{Localpart: "postmaster", Domain: mox.Conf.Static.HostnameDomain}
783
784 // Subject follows the form in RFC. ../rfc/7489:1871
785 subject := fmt.Sprintf("Report Domain: %s Submitter: %s Report-ID: <%s>", dom.ASCII, mox.Conf.Static.HostnameDomain.ASCII, report.ReportMetadata.ReportID)
786
787 // Human-readable part for convenience. ../rfc/7489:1803
788 text := fmt.Sprintf(`Attached is an aggregate DMARC report with results of evaluations of the DMARC
789policy of your domain for messages received by us that have your domain in the
790message From header. You are receiving this message because your address is
791specified in the "rua" field of the DMARC record for your domain.
792
793Report domain: %s
794Submitter: %s
795Report-ID: %s
796Period: %s - %s UTC
797`, dom, mox.Conf.Static.HostnameDomain, report.ReportMetadata.ReportID, beginTime.UTC().Format(time.DateTime), endTime.UTC().Format(time.DateTime))
798
799 // The attached file follows the naming convention from the RFC. ../rfc/7489:1812
800 reportFilename := fmt.Sprintf("%s!%s!%d!%d.xml.gz", mox.Conf.Static.HostnameDomain.ASCII, dom.ASCII, beginTime.Unix(), endTime.Add(-time.Second).Unix())
801
802 var addrs []message.NameAddress
803 for _, rcpt := range recipients {
804 addrs = append(addrs, message.NameAddress{Address: rcpt.address})
805 }
806
807 // Compose the message.
808 msgPrefix, has8bit, smtputf8, messageID, err := composeAggregateReport(ctx, log, msgf, from, addrs, subject, text, reportFilename, reportFile)
809 if err != nil {
810 return false, fmt.Errorf("composing message with outgoing dmarc aggregate report: %v", err)
811 }
812
813 // Get size of message after all compression and encodings (base64 makes it big
814 // again), and go through potentials recipients (rua). If they are willing to
815 // accept the report, queue it.
816 msgInfo, err := msgf.Stat()
817 if err != nil {
818 return false, fmt.Errorf("stat message with outgoing dmarc aggregate report: %v", err)
819 }
820 msgSize := int64(len(msgPrefix)) + msgInfo.Size()
821 var queued bool
822 for _, rcpt := range recipients {
823 // If recipient is on suppression list, we won't queue the reporting message.
824 q := bstore.QueryDB[SuppressAddress](ctx, db)
825 q.FilterNonzero(SuppressAddress{ReportingAddress: rcpt.address.Path().String()})
826 q.FilterGreater("Until", time.Now())
827 exists, err := q.Exists()
828 if err != nil {
829 return false, fmt.Errorf("querying suppress list: %v", err)
830 }
831 if exists {
832 log.Info("suppressing outgoing dmarc aggregate report", slog.Any("reportingaddress", rcpt.address))
833 continue
834 }
835
836 // Only send to addresses where we don't exceed their size limit. The RFC mentions
837 // the size of the report, but then continues about the size after compression and
838 // transport encodings (i.e. gzip and the mime base64 attachment, so the intention
839 // is probably to compare against the size of the message that contains the report.
840 // ../rfc/7489:1773
841 if rcpt.maxSize > 0 && msgSize > int64(rcpt.maxSize) {
842 continue
843 }
844
845 qm := queue.MakeMsg(from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil, time.Now(), subject)
846 // Don't try as long as regular deliveries, and stop before we would send the
847 // delayed DSN. Though we also won't send that due to IsDMARCReport.
848 qm.MaxAttempts = 5
849 qm.IsDMARCReport = true
850
851 err = queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm)
852 if err != nil {
853 tempError = true
854 log.Errorx("queueing message with dmarc aggregate report", err)
855 metricReportError.Inc()
856 } else {
857 log.Debug("dmarc aggregate report queued", slog.Any("recipient", rcpt.address))
858 queued = true
859 metricReport.Inc()
860 }
861 }
862
863 if !queued {
864 if err := sendErrorReport(ctx, log, db, from, addrs, dom, report.ReportMetadata.ReportID, msgSize); err != nil {
865 log.Errorx("sending dmarc error reports", err)
866 metricReportError.Inc()
867 }
868 }
869
870 // Regardless of whether we queued a report, we are not going to keep the
871 // evaluations around. Though this can be overridden if tempError is set.
872 // ../rfc/7489:1785
873
874 return true, nil
875}
876
877func composeAggregateReport(ctx context.Context, log mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []message.NameAddress, subject, text, filename string, reportXMLGzipFile *os.File) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) {
878 // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains.
879 smtputf8 = fromAddr.Localpart.IsInternational()
880 for _, r := range recipients {
881 if smtputf8 {
882 smtputf8 = r.Address.Localpart.IsInternational()
883 break
884 }
885 }
886 xc := message.NewComposer(mf, 100*1024*1024, smtputf8)
887 defer func() {
888 x := recover()
889 if x == nil {
890 return
891 }
892 if err, ok := x.(error); ok && errors.Is(err, message.ErrCompose) {
893 rerr = err
894 return
895 }
896 panic(x)
897 }()
898
899 xc.HeaderAddrs("From", []message.NameAddress{{Address: fromAddr}})
900 xc.HeaderAddrs("To", recipients)
901 xc.Subject(subject)
902 messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8))
903 xc.Header("Message-Id", messageID)
904 xc.Header("Date", time.Now().Format(message.RFC5322Z))
905 xc.Header("User-Agent", "mox/"+moxvar.Version)
906 xc.Header("MIME-Version", "1.0")
907
908 // Multipart message, with a text/plain and the report attached.
909 mp := multipart.NewWriter(xc)
910 xc.Header("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, mp.Boundary()))
911 xc.Line()
912
913 // Textual part, just mentioning this is a DMARC report.
914 textBody, ct, cte := xc.TextPart("plain", text)
915 textHdr := textproto.MIMEHeader{}
916 textHdr.Set("Content-Type", ct)
917 textHdr.Set("Content-Transfer-Encoding", cte)
918 textp, err := mp.CreatePart(textHdr)
919 xc.Checkf(err, "adding text part to message")
920 _, err = textp.Write(textBody)
921 xc.Checkf(err, "writing text part")
922
923 // DMARC report as attachment.
924 ahdr := textproto.MIMEHeader{}
925 ahdr.Set("Content-Type", "application/gzip")
926 ahdr.Set("Content-Transfer-Encoding", "base64")
927 cd := mime.FormatMediaType("attachment", map[string]string{"filename": filename})
928 ahdr.Set("Content-Disposition", cd)
929 ap, err := mp.CreatePart(ahdr)
930 xc.Checkf(err, "adding dmarc aggregate report to message")
931 wc := moxio.Base64Writer(ap)
932 _, err = io.Copy(wc, &moxio.AtReader{R: reportXMLGzipFile})
933 xc.Checkf(err, "adding attachment")
934 err = wc.Close()
935 xc.Checkf(err, "flushing attachment")
936
937 err = mp.Close()
938 xc.Checkf(err, "closing multipart")
939
940 xc.Flush()
941
942 msgPrefix = dkimSign(ctx, log, fromAddr, xc.SMTPUTF8, mf)
943
944 return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil
945}
946
947// Though this functionality is quite underspecified, we'll do our best to send our
948// an error report in case our report is too large for all recipients.
949// ../rfc/7489:1918
950func sendErrorReport(ctx context.Context, log mlog.Log, db *bstore.DB, fromAddr smtp.Address, recipients []message.NameAddress, reportDomain dns.Domain, reportID string, reportMsgSize int64) error {
951 log.Debug("no reporting addresses willing to accept report given size, queuing short error message")
952
953 msgf, err := store.CreateMessageTemp(log, "dmarcreportmsg-out")
954 if err != nil {
955 return fmt.Errorf("creating temporary message file for outgoing dmarc error report: %v", err)
956 }
957 defer store.CloseRemoveTempFile(log, msgf, "outgoing dmarc error report message")
958
959 var recipientStrs []string
960 for _, rcpt := range recipients {
961 recipientStrs = append(recipientStrs, rcpt.Address.String())
962 }
963
964 subject := fmt.Sprintf("DMARC aggregate reporting error report for %s", reportDomain.ASCII)
965 // ../rfc/7489:1926
966 text := fmt.Sprintf(`Report-Date: %s
967Report-Domain: %s
968Report-ID: %s
969Report-Size: %d
970Submitter: %s
971Submitting-URI: %s
972`, time.Now().Format(message.RFC5322Z), reportDomain.ASCII, reportID, reportMsgSize, mox.Conf.Static.HostnameDomain.ASCII, strings.Join(recipientStrs, ","))
973 text = strings.ReplaceAll(text, "\n", "\r\n")
974
975 msgPrefix, has8bit, smtputf8, messageID, err := composeErrorReport(ctx, log, msgf, fromAddr, recipients, subject, text)
976 if err != nil {
977 return err
978 }
979
980 msgInfo, err := msgf.Stat()
981 if err != nil {
982 return fmt.Errorf("stat message with outgoing dmarc error report: %v", err)
983 }
984 msgSize := int64(len(msgPrefix)) + msgInfo.Size()
985
986 for _, rcpt := range recipients {
987 // If recipient is on suppression list, we won't queue the reporting message.
988 q := bstore.QueryDB[SuppressAddress](ctx, db)
989 q.FilterNonzero(SuppressAddress{ReportingAddress: rcpt.Address.Path().String()})
990 q.FilterGreater("Until", time.Now())
991 exists, err := q.Exists()
992 if err != nil {
993 return fmt.Errorf("querying suppress list: %v", err)
994 }
995 if exists {
996 log.Info("suppressing outgoing dmarc error report", slog.Any("reportingaddress", rcpt.Address))
997 continue
998 }
999
1000 qm := queue.MakeMsg(fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil, time.Now(), subject)
1001 // Don't try as long as regular deliveries, and stop before we would send the
1002 // delayed DSN. Though we also won't send that due to IsDMARCReport.
1003 qm.MaxAttempts = 5
1004 qm.IsDMARCReport = true
1005
1006 if err := queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm); err != nil {
1007 log.Errorx("queueing message with dmarc error report", err)
1008 metricReportError.Inc()
1009 } else {
1010 log.Debug("dmarc error report queued", slog.Any("recipient", rcpt))
1011 metricReport.Inc()
1012 }
1013 }
1014 return nil
1015}
1016
1017func composeErrorReport(ctx context.Context, log mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []message.NameAddress, subject, text string) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) {
1018 // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains.
1019 smtputf8 = fromAddr.Localpart.IsInternational()
1020 for _, r := range recipients {
1021 if smtputf8 {
1022 smtputf8 = r.Address.Localpart.IsInternational()
1023 break
1024 }
1025 }
1026 xc := message.NewComposer(mf, 100*1024*1024, smtputf8)
1027 defer func() {
1028 x := recover()
1029 if x == nil {
1030 return
1031 }
1032 if err, ok := x.(error); ok && errors.Is(err, message.ErrCompose) {
1033 rerr = err
1034 return
1035 }
1036 panic(x)
1037 }()
1038
1039 xc.HeaderAddrs("From", []message.NameAddress{{Address: fromAddr}})
1040 xc.HeaderAddrs("To", recipients)
1041 xc.Header("Subject", subject)
1042 messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8))
1043 xc.Header("Message-Id", messageID)
1044 xc.Header("Date", time.Now().Format(message.RFC5322Z))
1045 xc.Header("User-Agent", "mox/"+moxvar.Version)
1046 xc.Header("MIME-Version", "1.0")
1047
1048 textBody, ct, cte := xc.TextPart("plain", text)
1049 xc.Header("Content-Type", ct)
1050 xc.Header("Content-Transfer-Encoding", cte)
1051 xc.Line()
1052 _, err := xc.Write(textBody)
1053 xc.Checkf(err, "writing text")
1054
1055 xc.Flush()
1056
1057 msgPrefix = dkimSign(ctx, log, fromAddr, smtputf8, mf)
1058
1059 return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil
1060}
1061
1062func dkimSign(ctx context.Context, log mlog.Log, fromAddr smtp.Address, smtputf8 bool, mf *os.File) string {
1063 // Add DKIM-Signature headers if we have a key for (a higher) domain than the from
1064 // address, which is a host name. A signature will only be useful with higher-level
1065 // domains if they have a relaxed dkim check (which is the default). If the dkim
1066 // check is strict, there is no harm, there will simply not be a dkim pass.
1067 fd := fromAddr.Domain
1068 var zerodom dns.Domain
1069 for fd != zerodom {
1070 confDom, ok := mox.Conf.Domain(fd)
1071 selectors := mox.DKIMSelectors(confDom.DKIM)
1072 if len(selectors) > 0 {
1073 dkimHeaders, err := dkim.Sign(ctx, log.Logger, fromAddr.Localpart, fd, selectors, smtputf8, mf)
1074 if err != nil {
1075 log.Errorx("dkim-signing dmarc report, continuing without signature", err)
1076 metricReportError.Inc()
1077 return ""
1078 }
1079 return dkimHeaders
1080 } else if ok {
1081 return ""
1082 }
1083
1084 var nfd dns.Domain
1085 _, nfd.ASCII, _ = strings.Cut(fd.ASCII, ".")
1086 _, nfd.Unicode, _ = strings.Cut(fd.Unicode, ".")
1087 fd = nfd
1088 }
1089 return ""
1090}
1091
1092// SuppressAdd adds an address to the suppress list.
1093func SuppressAdd(ctx context.Context, ba *SuppressAddress) error {
1094 db, err := evalDB(ctx)
1095 if err != nil {
1096 return err
1097 }
1098
1099 return db.Insert(ctx, ba)
1100}
1101
1102// SuppressList returns all reporting addresses on the suppress list.
1103func SuppressList(ctx context.Context) ([]SuppressAddress, error) {
1104 db, err := evalDB(ctx)
1105 if err != nil {
1106 return nil, err
1107 }
1108
1109 return bstore.QueryDB[SuppressAddress](ctx, db).SortDesc("ID").List()
1110}
1111
1112// SuppressRemove removes a reporting address record from the suppress list.
1113func SuppressRemove(ctx context.Context, id int64) error {
1114 db, err := evalDB(ctx)
1115 if err != nil {
1116 return err
1117 }
1118
1119 return db.Delete(ctx, &SuppressAddress{ID: id})
1120}
1121
1122// SuppressUpdate updates the until field of a reporting address record.
1123func SuppressUpdate(ctx context.Context, id int64, until time.Time) error {
1124 db, err := evalDB(ctx)
1125 if err != nil {
1126 return err
1127 }
1128
1129 ba := SuppressAddress{ID: id}
1130 err = db.Get(ctx, &ba)
1131 if err != nil {
1132 return err
1133 }
1134 ba.Until = until
1135 return db.Update(ctx, &ba)
1136}
1137