10 cryptrand "crypto/rand"
24 "golang.org/x/exp/maps"
26 "golang.org/x/text/unicode/norm"
28 "github.com/mjl-/bstore"
30 "github.com/mjl-/mox/message"
31 "github.com/mjl-/mox/metrics"
32 "github.com/mjl-/mox/mlog"
33 "github.com/mjl-/mox/mox-"
34 "github.com/mjl-/mox/store"
37type importListener struct {
39 Events chan importEvent
40 Register chan bool // Whether register is successful.
43type importEvent struct {
45 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
46 Event any // nil, importCount, importProblem, importDone, importAborted
47 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
50type importAbortRequest struct {
55var importers = struct {
56 Register chan *importListener
57 Unregister chan *importListener
58 Events chan importEvent
59 Abort chan importAbortRequest
61 make(chan *importListener, 1),
62 make(chan *importListener, 1),
63 make(chan importEvent),
64 make(chan importAbortRequest),
67// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
69 log := mlog.New("httpimport", nil)
71 if x := recover(); x != nil {
72 log.Error("import manage panic", slog.Any("err", x))
74 metrics.PanicInc(metrics.Importmanage)
79 MailboxCounts map[string]int
83 Listeners map[*importListener]struct{}
87 imports := map[string]state{} // Token to state.
90 case l := <-importers.Register:
91 // If we have state, send it so the client is up to date.
92 if s, ok := imports[l.Token]; ok {
94 s.Listeners[l] = struct{}{}
96 sendEvent := func(kind string, v any) {
97 buf, err := json.Marshal(v)
99 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
102 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
105 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
107 log.Debug("dropped initial import event to slow consumer")
111 for m, c := range s.MailboxCounts {
112 sendEvent("count", importCount{m, c})
114 for _, p := range s.Problems {
115 sendEvent("problem", importProblem{p})
118 sendEvent("done", importDone{})
119 } else if s.Aborted != nil {
120 sendEvent("aborted", importAborted{})
126 case l := <-importers.Unregister:
127 delete(imports[l.Token].Listeners, l)
129 case e := <-importers.Events:
130 s, ok := imports[e.Token]
133 MailboxCounts: map[string]int{},
134 Listeners: map[*importListener]struct{}{},
139 for l := range s.Listeners {
143 log.Debug("dropped import event to slow consumer")
147 s := imports[e.Token]
148 switch x := e.Event.(type) {
150 s.MailboxCounts[x.Mailbox] = x.Count
152 s.Problems = append(s.Problems, x.Message)
163 case a := <-importers.Abort:
164 s, ok := imports[a.Token]
166 a.Response <- errors.New("import not found")
170 a.Response <- errors.New("import already finished")
177 // Cleanup old state.
178 for t, s := range imports {
179 if len(s.Listeners) > 0 {
182 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
189type importCount struct {
193type importProblem struct {
196type importDone struct{}
197type importAborted struct{}
198type importStep struct {
202// importStart prepare the import and launches the goroutine to actually import.
203// importStart is responsible for closing f and removing f.
204func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
207 store.CloseRemoveTempFile(log, f, "upload for import")
211 buf := make([]byte, 16)
212 if _, err := cryptrand.Read(buf); err != nil {
213 return "", false, err
215 token := fmt.Sprintf("%x", buf)
217 if _, err := f.Seek(0, 0); err != nil {
218 return "", false, fmt.Errorf("seek to start of file: %v", err)
221 // Recognize file format.
223 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
224 magicGzip := []byte{0x1f, 0x8b}
225 magic := make([]byte, 4)
226 if _, err := f.ReadAt(magic, 0); err != nil {
227 return "", true, fmt.Errorf("detecting file format: %v", err)
229 if bytes.Equal(magic, magicZip) {
231 } else if !bytes.Equal(magic[:2], magicGzip) {
232 return "", true, fmt.Errorf("file is not a zip or gzip file")
240 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
242 zr, err = zip.NewReader(f, fi.Size())
244 return "", true, fmt.Errorf("opening zip file: %v", err)
247 gzr, err := gzip.NewReader(f)
249 return "", true, fmt.Errorf("gunzip: %v", err)
251 tr = tar.NewReader(gzr)
254 acc, err := store.OpenAccount(log, accName)
256 return "", false, fmt.Errorf("open acount: %v", err)
258 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
260 tx, err := acc.DB.Begin(context.Background(), true)
264 log.Check(xerr, "closing account")
265 return "", false, fmt.Errorf("start transaction: %v", err)
268 // Ensure token is registered before returning, with context that can be canceled.
269 ctx, cancel := context.WithCancel(mox.Shutdown)
270 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
272 log.Info("starting import")
273 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
274 f = nil // importMessages is now responsible for closing and removing.
276 return token, false, nil
279// importMessages imports the messages from zip/tgz file f.
280// importMessages is responsible for unlocking and closing acc, and closing tx and f.
281func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
282 // If a fatal processing error occurs, we panic with this type.
283 type importError struct{ Err error }
285 // During import we collect all changes and broadcast them at the end, when successful.
286 var changes []store.Change
288 // ID's of delivered messages. If we have to rollback, we have to remove this files.
289 var deliveredIDs []int64
291 sendEvent := func(kind string, v any) {
292 buf, err := json.Marshal(v)
294 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
297 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
298 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
301 canceled := func() bool {
304 sendEvent("aborted", importAborted{})
311 problemf := func(format string, args ...any) {
312 msg := fmt.Sprintf(format, args...)
313 sendEvent("problem", importProblem{Message: msg})
317 store.CloseRemoveTempFile(log, f, "uploaded messages")
319 for _, id := range deliveredIDs {
320 p := acc.MessagePath(id)
322 log.Check(err, "closing message file after import error", slog.String("path", p))
326 log.Check(err, "rolling back transaction")
331 log.Check(err, "closing account")
338 if err, ok := x.(importError); ok {
339 log.Errorx("import error", err.Err)
340 problemf("%s (aborting)", err.Err)
341 sendEvent("aborted", importAborted{})
343 log.Error("import panic", slog.Any("err", x))
345 metrics.PanicInc(metrics.Importmessages)
349 ximportcheckf := func(err error, format string, args ...any) {
351 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
355 err := acc.ThreadingWait(log)
356 ximportcheckf(err, "waiting for account thread upgrade")
358 conf, _ := acc.Conf()
360 jf, _, err := acc.OpenJunkFilter(ctx, log)
361 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
362 ximportcheckf(err, "open junk filter")
366 err := jf.CloseDiscard()
367 log.Check(err, "closing junk filter")
371 // Mailboxes we imported, and message counts.
372 mailboxes := map[string]store.Mailbox{}
373 messages := map[string]int{}
375 maxSize := acc.QuotaMessageSize()
376 du := store.DiskUsage{ID: 1}
378 ximportcheckf(err, "get disk usage")
381 // For maildirs, we are likely to get a possible dovecot-keywords file after having
382 // imported the messages. Once we see the keywords, we use them. But before that
383 // time we remember which messages miss a keywords. Once the keywords become
384 // available, we'll fix up the flags for the unknown messages
385 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
386 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
388 // We keep the mailboxes we deliver to up to date with count and keywords (non-system flags).
389 destMailboxCounts := map[int64]store.MailboxCounts{}
390 destMailboxKeywords := map[int64]map[string]bool{}
392 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
393 // another 100 messages were added, when adding a message to another mailbox, and
394 // finally at the end as a closing statement.
395 var prevMailbox string
397 var modseq store.ModSeq // Assigned on first message, used for all messages.
399 trainMessage := func(m *store.Message, p message.Part, pos string) {
400 words, err := jf.ParseMessage(p)
402 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
405 err = jf.Train(ctx, !m.Junk, words)
407 problemf("training junk filter for message %s: %v (continuing)", pos, err)
410 m.TrainedJunk = &m.Junk
413 openTrainMessage := func(m *store.Message) {
414 path := acc.MessagePath(m.ID)
415 f, err := os.Open(path)
417 problemf("opening message again for training junk filter: %v (continuing)", err)
422 log.Check(err, "closing file after training junkfilter")
424 p, err := m.LoadPart(f)
426 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
429 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
432 xensureMailbox := func(name string) store.Mailbox {
433 name = norm.NFC.String(name)
434 if strings.ToLower(name) == "inbox" {
438 if mb, ok := mailboxes[name]; ok {
444 for i, e := range strings.Split(name, "/") {
450 if _, ok := mailboxes[p]; ok {
454 q := bstore.QueryTx[store.Mailbox](tx)
455 q.FilterNonzero(store.Mailbox{Name: p})
458 if err == bstore.ErrAbsent {
459 uidvalidity, err := acc.NextUIDValidity(tx)
460 ximportcheckf(err, "finding next uid validity")
463 UIDValidity: uidvalidity,
466 // Do not assign special-use flags. This existing account probably already has such mailboxes.
469 ximportcheckf(err, "inserting mailbox in database")
471 if tx.Get(&store.Subscription{Name: p}) != nil {
472 err := tx.Insert(&store.Subscription{Name: p})
473 ximportcheckf(err, "subscribing to imported mailbox")
475 changes = append(changes, store.ChangeAddMailbox{Mailbox: mb, Flags: []string{`\Subscribed`}})
476 } else if err != nil {
477 ximportcheckf(err, "creating mailbox %s (aborting)", p)
479 if prevMailbox != "" && mb.Name != prevMailbox {
480 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
482 mailboxes[mb.Name] = mb
483 sendEvent("count", importCount{mb.Name, 0})
484 prevMailbox = mb.Name
489 xdeliver := func(mb store.Mailbox, m *store.Message, f *os.File, pos string) {
493 log.Check(err, "closing temporary message file for delivery")
494 err := os.Remove(name)
495 log.Check(err, "removing temporary message file for delivery")
498 m.MailboxOrigID = mb.ID
501 if maxSize > 0 && du.MessageSize+addSize > maxSize {
502 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
507 modseq, err = acc.NextModSeq(tx)
508 ximportcheckf(err, "assigning next modseq")
513 mc := destMailboxCounts[mb.ID]
514 mc.Add(m.MailboxCounts())
515 destMailboxCounts[mb.ID] = mc
517 if len(m.Keywords) > 0 {
518 if destMailboxKeywords[mb.ID] == nil {
519 destMailboxKeywords[mb.ID] = map[string]bool{}
521 for _, k := range m.Keywords {
522 destMailboxKeywords[mb.ID][k] = true
526 // Parse message and store parsed information for later fast retrieval.
527 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
529 problemf("parsing message %s: %s (continuing)", pos, err)
531 m.ParsedBuf, err = json.Marshal(p)
532 ximportcheckf(err, "marshal parsed message structure")
534 // Set fields needed for future threading. By doing it now, DeliverMessage won't
535 // have to parse the Part again.
536 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
537 m.PrepareThreading(log, &p)
539 if m.Received.IsZero() {
540 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
541 m.Received = p.Envelope.Date
543 m.Received = time.Now()
547 // We set the flags that Deliver would set now and train ourselves. This prevents
548 // Deliver from training, which would open the junk filter, change it, and write it
549 // back to disk, for each message (slow).
550 m.JunkFlagsForMailbox(mb, conf)
551 if jf != nil && m.NeedsTraining() {
552 trainMessage(m, p, pos)
557 const nothreads = true
558 const updateDiskUsage = false
559 if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
560 problemf("delivering message %s: %s (continuing)", pos, err)
563 deliveredIDs = append(deliveredIDs, m.ID)
564 changes = append(changes, m.ChangeAddUID())
566 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
567 prevMailbox = mb.Name
568 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
572 ximportMbox := func(mailbox, filename string, r io.Reader) {
574 problemf("empty mailbox name for mbox file %s (skipping)", filename)
577 mb := xensureMailbox(mailbox)
579 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
581 m, mf, pos, err := mr.Next()
584 } else if err != nil {
585 ximportcheckf(err, "next message in mbox file")
588 xdeliver(mb, m, mf, pos)
592 ximportMaildir := func(mailbox, filename string, r io.Reader) {
594 problemf("empty mailbox name for maildir file %s (skipping)", filename)
597 mb := xensureMailbox(mailbox)
599 f, err := store.CreateMessageTemp(log, "import")
600 ximportcheckf(err, "creating temp message")
603 store.CloseRemoveTempFile(log, f, "message to import")
607 // Copy data, changing bare \n into \r\n.
608 br := bufio.NewReader(r)
609 w := bufio.NewWriter(f)
612 line, err := br.ReadBytes('\n')
613 if err != nil && err != io.EOF {
614 ximportcheckf(err, "reading message")
617 if !bytes.HasSuffix(line, []byte("\r\n")) {
618 line = append(line[:len(line)-1], "\r\n"...)
621 n, err := w.Write(line)
622 ximportcheckf(err, "writing message")
630 ximportcheckf(err, "writing message")
632 var received time.Time
633 t := strings.SplitN(path.Base(filename), ".", 2)
634 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
635 received = time.Unix(v, 0)
638 // Parse flags. See https://cr.yp.to/proto/maildir.html.
640 var flags store.Flags
641 keywords := map[string]bool{}
642 t = strings.SplitN(path.Base(filename), ":2,", 2)
644 for _, c := range t[1] {
647 // Passed, doesn't map to a common IMAP flag.
649 flags.Answered = true
659 if c >= 'a' && c <= 'z' {
660 dovecotKeywords, ok := mailboxKeywords[mailbox]
662 // No keywords file seen yet, we'll try later if it comes in.
663 keepFlags += string(c)
664 } else if kw, ok := dovecotKeywords[c]; ok {
665 flagSet(&flags, keywords, kw)
675 Keywords: maps.Keys(keywords),
678 xdeliver(mb, &m, f, filename)
681 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
682 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
684 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
688 importFile := func(name string, r io.Reader) {
691 if strings.HasPrefix(name, skipMailboxPrefix) {
692 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
695 if strings.HasSuffix(name, "/") {
696 name = strings.TrimSuffix(name, "/")
697 dir := path.Dir(name)
698 switch path.Base(dir) {
699 case "new", "cur", "tmp":
700 // Maildir, ensure it exists.
701 mailbox := path.Dir(dir)
702 xensureMailbox(mailbox)
704 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
708 if strings.HasSuffix(path.Base(name), ".mbox") {
709 mailbox := name[:len(name)-len(".mbox")]
710 ximportMbox(mailbox, origName, r)
713 dir := path.Dir(name)
714 dirbase := path.Base(dir)
716 case "new", "cur", "tmp":
717 mailbox := path.Dir(dir)
718 ximportMaildir(mailbox, origName, r)
720 if path.Base(name) == "dovecot-keywords" {
721 mailbox := path.Dir(name)
722 dovecotKeywords := map[rune]string{}
723 words, err := store.ParseDovecotKeywordsFlags(r, log)
724 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
725 for i, kw := range words {
726 dovecotKeywords['a'+rune(i)] = kw
728 mailboxKeywords[mailbox] = dovecotKeywords
730 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
731 var flags, zeroflags store.Flags
732 keywords := map[string]bool{}
733 for _, c := range chars {
734 kw, ok := dovecotKeywords[c]
736 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
739 flagSet(&flags, keywords, kw)
741 if flags == zeroflags && len(keywords) == 0 {
745 m := store.Message{ID: id}
747 ximportcheckf(err, "get imported message for flag update")
749 mc := destMailboxCounts[m.MailboxID]
750 mc.Sub(m.MailboxCounts())
753 m.Flags = m.Flags.Set(flags, flags)
754 m.Keywords = maps.Keys(keywords)
755 sort.Strings(m.Keywords)
757 mc.Add(m.MailboxCounts())
758 destMailboxCounts[m.MailboxID] = mc
760 if len(m.Keywords) > 0 {
761 if destMailboxKeywords[m.MailboxID] == nil {
762 destMailboxKeywords[m.MailboxID] = map[string]bool{}
764 for _, k := range m.Keywords {
765 destMailboxKeywords[m.MailboxID][k] = true
769 // We train before updating, training may set m.TrainedJunk.
770 if jf != nil && m.NeedsTraining() {
774 ximportcheckf(err, "updating message after flag update")
775 changes = append(changes, m.ChangeFlags(oflags))
777 delete(mailboxMissingKeywordMessages, mailbox)
779 problemf("unrecognized file %s (skipping)", origName)
785 for _, f := range zr.File {
791 problemf("opening file %s in zip: %v", f.Name, err)
794 importFile(f.Name, zf)
796 log.Check(err, "closing file from zip")
806 } else if err != nil {
807 problemf("reading next tar header: %v (aborting)", err)
810 importFile(h.Name, tr)
815 for _, count := range messages {
818 log.Debug("messages imported", slog.Int("total", total))
820 // Send final update for count of last-imported mailbox.
821 if prevMailbox != "" {
822 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
826 if len(deliveredIDs) > 0 {
827 sendEvent("step", importStep{"matching messages with threads"})
828 err = acc.AssignThreads(ctx, log, tx, deliveredIDs[0], 0, io.Discard)
829 ximportcheckf(err, "assigning messages to threads")
832 // Update mailboxes with counts and keywords.
833 for mbID, mc := range destMailboxCounts {
834 mb := store.Mailbox{ID: mbID}
836 ximportcheckf(err, "loading mailbox for counts and keywords")
838 if mb.MailboxCounts != mc {
839 mb.MailboxCounts = mc
840 changes = append(changes, mb.ChangeCounts())
843 keywords := destMailboxKeywords[mb.ID]
845 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(keywords))
848 ximportcheckf(err, "updating mailbox count and keywords")
850 changes = append(changes, mb.ChangeKeywords())
854 err = acc.AddMessageSize(log, tx, addSize)
855 ximportcheckf(err, "updating disk usage after import")
859 ximportcheckf(err, "commit")
863 if err := jf.Close(); err != nil {
864 problemf("saving changes of training junk filter: %v (continuing)", err)
865 log.Errorx("saving changes of training junk filter", err)
870 store.BroadcastChanges(acc, changes)
873 log.Check(err, "closing account after import")
876 sendEvent("done", importDone{})
879func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
881 case "forwarded", "$forwarded":
882 flags.Forwarded = true
883 case "junk", "$junk":
885 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
887 case "phishing", "$phishing":
888 flags.Phishing = true
889 case "mdnsent", "$mdnsent":
892 if err := store.CheckKeyword(word); err == nil {
893 keywords[word] = true