10 cryptrand "crypto/rand"
25 "golang.org/x/text/unicode/norm"
27 "github.com/mjl-/bstore"
29 "github.com/mjl-/mox/message"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/store"
36type importListener struct {
38 Events chan importEvent
39 Register chan bool // Whether register is successful.
42type importEvent struct {
44 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
45 Event any // nil, importCount, importProblem, importDone, importAborted
46 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
49type importAbortRequest struct {
54var importers = struct {
55 Register chan *importListener
56 Unregister chan *importListener
57 Events chan importEvent
58 Abort chan importAbortRequest
61 make(chan *importListener, 1),
62 make(chan *importListener, 1),
63 make(chan importEvent),
64 make(chan importAbortRequest),
68// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
70 log := mlog.New("httpimport", nil)
72 if x := recover(); x != nil {
73 log.Error("import manage panic", slog.Any("err", x))
75 metrics.PanicInc(metrics.Importmanage)
80 MailboxCounts map[string]int
84 Listeners map[*importListener]struct{}
88 imports := map[string]state{} // Token to state.
91 case l := <-importers.Register:
92 // If we have state, send it so the client is up to date.
93 s, ok := imports[l.Token]
98 s.Listeners[l] = struct{}{}
100 sendEvent := func(kind string, v any) {
101 buf, err := json.Marshal(v)
103 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
106 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
109 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
111 log.Debug("dropped initial import event to slow consumer")
115 for m, c := range s.MailboxCounts {
116 sendEvent("count", importCount{m, c})
118 for _, p := range s.Problems {
119 sendEvent("problem", importProblem{p})
122 sendEvent("done", importDone{})
123 } else if s.Aborted != nil {
124 sendEvent("aborted", importAborted{})
127 case l := <-importers.Unregister:
128 delete(imports[l.Token].Listeners, l)
130 case e := <-importers.Events:
131 s, ok := imports[e.Token]
134 MailboxCounts: map[string]int{},
135 Listeners: map[*importListener]struct{}{},
140 for l := range s.Listeners {
144 log.Debug("dropped import event to slow consumer")
148 s := imports[e.Token]
149 switch x := e.Event.(type) {
151 s.MailboxCounts[x.Mailbox] = x.Count
153 s.Problems = append(s.Problems, x.Message)
164 case a := <-importers.Abort:
165 s, ok := imports[a.Token]
167 a.Response <- errors.New("import not found")
171 a.Response <- errors.New("import already finished")
177 case <-importers.Stop:
181 // Cleanup old state.
182 for t, s := range imports {
183 if len(s.Listeners) > 0 {
186 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
193type importCount struct {
197type importProblem struct {
200type importDone struct{}
201type importAborted struct{}
202type importStep struct {
206// importStart prepare the import and launches the goroutine to actually import.
207// importStart is responsible for closing f and removing f.
208func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
211 store.CloseRemoveTempFile(log, f, "upload for import")
215 buf := make([]byte, 16)
216 if _, err := cryptrand.Read(buf); err != nil {
217 return "", false, err
219 token := fmt.Sprintf("%x", buf)
221 if _, err := f.Seek(0, 0); err != nil {
222 return "", false, fmt.Errorf("seek to start of file: %v", err)
225 // Recognize file format.
227 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
228 magicGzip := []byte{0x1f, 0x8b}
229 magic := make([]byte, 4)
230 if _, err := f.ReadAt(magic, 0); err != nil {
231 return "", true, fmt.Errorf("detecting file format: %v", err)
233 if bytes.Equal(magic, magicZip) {
235 } else if !bytes.Equal(magic[:2], magicGzip) {
236 return "", true, fmt.Errorf("file is not a zip or gzip file")
244 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
246 zr, err = zip.NewReader(f, fi.Size())
248 return "", true, fmt.Errorf("opening zip file: %v", err)
251 gzr, err := gzip.NewReader(f)
253 return "", true, fmt.Errorf("gunzip: %v", err)
255 tr = tar.NewReader(gzr)
258 acc, err := store.OpenAccount(log, accName, false)
260 return "", false, fmt.Errorf("open acount: %v", err)
262 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
264 tx, err := acc.DB.Begin(context.Background(), true)
268 log.Check(xerr, "closing account")
269 return "", false, fmt.Errorf("start transaction: %v", err)
272 // Ensure token is registered before returning, with context that can be canceled.
273 ctx, cancel := context.WithCancel(mox.Shutdown)
274 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
276 log.Info("starting import")
277 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
278 f = nil // importMessages is now responsible for closing and removing.
280 return token, false, nil
283// importMessages imports the messages from zip/tgz file f.
284// importMessages is responsible for unlocking and closing acc, and closing tx and f.
285func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
286 // If a fatal processing error occurs, we panic with this type.
287 type importError struct{ Err error }
289 // During import we collect all changes and broadcast them at the end, when successful.
290 var changes []store.Change
292 // ID's of delivered messages. If we have to rollback, we have to remove this files.
295 sendEvent := func(kind string, v any) {
296 buf, err := json.Marshal(v)
298 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
301 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
302 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
305 canceled := func() bool {
308 sendEvent("aborted", importAborted{})
315 problemf := func(format string, args ...any) {
316 msg := fmt.Sprintf(format, args...)
317 sendEvent("problem", importProblem{Message: msg})
321 store.CloseRemoveTempFile(log, f, "uploaded messages")
323 for _, id := range newIDs {
324 p := acc.MessagePath(id)
326 log.Check(err, "closing message file after import error", slog.String("path", p))
330 log.Check(err, "rolling back transaction")
335 log.Check(err, "closing account")
342 if err, ok := x.(importError); ok {
343 log.Errorx("import error", err.Err)
344 problemf("%s (aborting)", err.Err)
345 sendEvent("aborted", importAborted{})
347 log.Error("import panic", slog.Any("err", x))
349 metrics.PanicInc(metrics.Importmessages)
353 ximportcheckf := func(err error, format string, args ...any) {
355 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
359 err := acc.ThreadingWait(log)
360 ximportcheckf(err, "waiting for account thread upgrade")
362 conf, _ := acc.Conf()
364 jf, _, err := acc.OpenJunkFilter(ctx, log)
365 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
366 ximportcheckf(err, "open junk filter")
370 err := jf.CloseDiscard()
371 log.Check(err, "closing junk filter")
375 // Mailboxes we imported, and message counts.
376 mailboxNames := map[string]*store.Mailbox{}
377 mailboxIDs := map[int64]*store.Mailbox{}
378 mailboxKeywordCounts := map[int64]int{}
379 messages := map[string]int{}
381 maxSize := acc.QuotaMessageSize()
382 du := store.DiskUsage{ID: 1}
384 ximportcheckf(err, "get disk usage")
387 // For maildirs, we are likely to get a possible dovecot-keywords file after having
388 // imported the messages. Once we see the keywords, we use them. But before that
389 // time we remember which messages miss keywords. Once the keywords become
390 // available, we'll fix up the flags for the unknown messages
391 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
392 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
394 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
395 // another 100 messages were added, when adding a message to another mailbox, and
396 // finally at the end as a closing statement.
397 var prevMailbox string
399 var modseq store.ModSeq // Assigned on first message, used for all messages.
401 trainMessage := func(m *store.Message, p message.Part, pos string) {
402 words, err := jf.ParseMessage(p)
404 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
407 err = jf.Train(ctx, !m.Junk, words)
409 problemf("training junk filter for message %s: %v (continuing)", pos, err)
412 m.TrainedJunk = &m.Junk
415 openTrainMessage := func(m *store.Message) {
416 path := acc.MessagePath(m.ID)
417 f, err := os.Open(path)
419 problemf("opening message again for training junk filter: %v (continuing)", err)
424 log.Check(err, "closing file after training junkfilter")
426 p, err := m.LoadPart(f)
428 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
431 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
434 xensureMailbox := func(name string) *store.Mailbox {
435 // Ensure name is normalized.
436 name = norm.NFC.String(name)
437 name, _, err := store.CheckMailboxName(name, true)
438 ximportcheckf(err, "checking mailbox name")
440 if mb, ok := mailboxNames[name]; ok {
445 var mb *store.Mailbox
446 var parent store.Mailbox
447 for i, e := range strings.Split(name, "/") {
453 if _, ok := mailboxNames[p]; ok {
457 mb, err = acc.MailboxFind(tx, p)
458 ximportcheckf(err, "looking up mailbox %s to import to (aborting)", p)
460 uidvalidity, err := acc.NextUIDValidity(tx)
461 ximportcheckf(err, "finding next uid validity")
465 modseq, err = acc.NextModSeq(tx)
466 ximportcheckf(err, "assigning next modseq")
474 UIDValidity: uidvalidity,
477 // Do not assign special-use flags. This existing account probably already has such mailboxes.
480 ximportcheckf(err, "inserting mailbox in database")
483 if tx.Get(&store.Subscription{Name: p}) != nil {
484 err := tx.Insert(&store.Subscription{Name: p})
485 ximportcheckf(err, "subscribing to imported mailbox")
487 changes = append(changes, store.ChangeAddMailbox{Mailbox: *mb, Flags: []string{`\Subscribed`}})
489 if prevMailbox != "" && mb.Name != prevMailbox {
490 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
492 mailboxKeywordCounts[mb.ID] = len(mb.Keywords)
493 mailboxNames[mb.Name] = mb
494 mailboxIDs[mb.ID] = mb
495 sendEvent("count", importCount{mb.Name, 0})
496 prevMailbox = mb.Name
501 xdeliver := func(mb *store.Mailbox, m *store.Message, f *os.File, pos string) {
502 defer store.CloseRemoveTempFile(log, f, "message file for import")
504 m.MailboxOrigID = mb.ID
507 if maxSize > 0 && du.MessageSize+addSize > maxSize {
508 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
513 modseq, err = acc.NextModSeq(tx)
514 ximportcheckf(err, "assigning next modseq")
519 // Parse message and store parsed information for later fast retrieval.
520 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
522 problemf("parsing message %s: %s (continuing)", pos, err)
524 m.ParsedBuf, err = json.Marshal(p)
525 ximportcheckf(err, "marshal parsed message structure")
527 // Set fields needed for future threading. By doing it now, MessageAdd won't
528 // have to parse the Part again.
529 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
530 m.PrepareThreading(log, &p)
532 if m.Received.IsZero() {
533 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
534 m.Received = p.Envelope.Date
536 m.Received = time.Now()
540 // We set the flags that Deliver would set now and train ourselves. This prevents
541 // Deliver from training, which would open the junk filter, change it, and write it
542 // back to disk, for each message (slow).
543 m.JunkFlagsForMailbox(*mb, conf)
544 if jf != nil && m.NeedsTraining() {
545 trainMessage(m, p, pos)
548 opts := store.AddOpts{
552 SkipUpdateDiskUsage: true,
553 SkipCheckQuota: true,
556 if err := acc.MessageAdd(log, tx, mb, m, f, opts); err != nil {
557 problemf("delivering message %s: %s (continuing)", pos, err)
560 newIDs = append(newIDs, m.ID)
561 changes = append(changes, m.ChangeAddUID())
563 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
564 prevMailbox = mb.Name
565 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
569 ximportMbox := func(mailbox, filename string, r io.Reader) {
571 problemf("empty mailbox name for mbox file %s (skipping)", filename)
574 mb := xensureMailbox(mailbox)
576 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
578 m, mf, pos, err := mr.Next()
581 } else if err != nil {
582 ximportcheckf(err, "next message in mbox file")
585 xdeliver(mb, m, mf, pos)
589 ximportMaildir := func(mailbox, filename string, r io.Reader) {
591 problemf("empty mailbox name for maildir file %s (skipping)", filename)
594 mb := xensureMailbox(mailbox)
596 f, err := store.CreateMessageTemp(log, "import")
597 ximportcheckf(err, "creating temp message")
600 store.CloseRemoveTempFile(log, f, "message to import")
604 // Copy data, changing bare \n into \r\n.
605 br := bufio.NewReader(r)
606 w := bufio.NewWriter(f)
609 line, err := br.ReadBytes('\n')
610 if err != nil && err != io.EOF {
611 ximportcheckf(err, "reading message")
614 if !bytes.HasSuffix(line, []byte("\r\n")) {
615 line = append(line[:len(line)-1], "\r\n"...)
618 n, err := w.Write(line)
619 ximportcheckf(err, "writing message")
627 ximportcheckf(err, "writing message")
629 var received time.Time
630 t := strings.SplitN(path.Base(filename), ".", 2)
631 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
632 received = time.Unix(v, 0)
635 // Parse flags. See https://cr.yp.to/proto/maildir.html.
637 var flags store.Flags
638 keywords := map[string]bool{}
639 t = strings.SplitN(path.Base(filename), ":2,", 2)
641 for _, c := range t[1] {
644 // Passed, doesn't map to a common IMAP flag.
646 flags.Answered = true
656 if c >= 'a' && c <= 'z' {
657 dovecotKeywords, ok := mailboxKeywords[mailbox]
659 // No keywords file seen yet, we'll try later if it comes in.
660 keepFlags += string(c)
661 } else if kw, ok := dovecotKeywords[c]; ok {
662 flagSet(&flags, keywords, kw)
672 Keywords: slices.Sorted(maps.Keys(keywords)),
675 xdeliver(mb, &m, f, filename)
678 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
679 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
681 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
685 importFile := func(name string, r io.Reader) {
688 if strings.HasPrefix(name, skipMailboxPrefix) {
689 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
692 if strings.HasSuffix(name, "/") {
693 name = strings.TrimSuffix(name, "/")
694 dir := path.Dir(name)
695 switch path.Base(dir) {
696 case "new", "cur", "tmp":
697 // Maildir, ensure it exists.
698 mailbox := path.Dir(dir)
699 xensureMailbox(mailbox)
701 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
705 if strings.HasSuffix(path.Base(name), ".mbox") {
706 mailbox := name[:len(name)-len(".mbox")]
707 ximportMbox(mailbox, origName, r)
710 dir := path.Dir(name)
711 dirbase := path.Base(dir)
713 case "new", "cur", "tmp":
714 mailbox := path.Dir(dir)
715 ximportMaildir(mailbox, origName, r)
719 if path.Base(name) != "dovecot-keywords" {
720 problemf("unrecognized file %s (skipping)", origName)
724 // Handle dovecot-keywords.
725 mailbox := path.Dir(name)
726 dovecotKeywords := map[rune]string{}
727 words, err := store.ParseDovecotKeywordsFlags(r, log)
728 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
729 for i, kw := range words {
730 dovecotKeywords['a'+rune(i)] = kw
732 mailboxKeywords[mailbox] = dovecotKeywords
734 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
735 var flags, zeroflags store.Flags
736 keywords := map[string]bool{}
737 for _, c := range chars {
738 kw, ok := dovecotKeywords[c]
740 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
743 flagSet(&flags, keywords, kw)
745 if flags == zeroflags && len(keywords) == 0 {
749 m := store.Message{ID: id}
751 ximportcheckf(err, "get imported message for flag update")
753 mb := mailboxIDs[m.MailboxID]
754 mb.Sub(m.MailboxCounts())
757 m.Flags = m.Flags.Set(flags, flags)
758 m.Keywords = slices.Sorted(maps.Keys(keywords))
760 mb.Add(m.MailboxCounts())
762 mb.Keywords, _ = store.MergeKeywords(mb.Keywords, m.Keywords)
764 // We train before updating, training may set m.TrainedJunk.
765 if jf != nil && m.NeedsTraining() {
769 ximportcheckf(err, "updating message after flag update")
770 changes = append(changes, m.ChangeFlags(oflags))
772 delete(mailboxMissingKeywordMessages, mailbox)
776 for _, f := range zr.File {
782 problemf("opening file %s in zip: %v", f.Name, err)
785 importFile(f.Name, zf)
787 log.Check(err, "closing file from zip")
797 } else if err != nil {
798 problemf("reading next tar header: %v (aborting)", err)
801 importFile(h.Name, tr)
806 for _, count := range messages {
809 log.Debug("messages imported", slog.Int("total", total))
811 // Send final update for count of last-imported mailbox.
812 if prevMailbox != "" {
813 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
818 sendEvent("step", importStep{"matching messages with threads"})
819 err = acc.AssignThreads(ctx, log, tx, newIDs[0], 0, io.Discard)
820 ximportcheckf(err, "assigning messages to threads")
823 // Update mailboxes with counts and keywords.
824 for _, mb := range mailboxIDs {
826 ximportcheckf(err, "updating mailbox count and keywords")
828 changes = append(changes, mb.ChangeCounts())
829 if len(mb.Keywords) != mailboxKeywordCounts[mb.ID] {
830 changes = append(changes, mb.ChangeKeywords())
834 err = acc.AddMessageSize(log, tx, addSize)
835 ximportcheckf(err, "updating disk usage after import")
839 ximportcheckf(err, "commit")
843 if err := jf.Close(); err != nil {
844 problemf("saving changes of training junk filter: %v (continuing)", err)
845 log.Errorx("saving changes of training junk filter", err)
850 store.BroadcastChanges(acc, changes)
853 log.Check(err, "closing account after import")
856 sendEvent("done", importDone{})
859func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
861 case "forwarded", "$forwarded":
862 flags.Forwarded = true
863 case "junk", "$junk":
865 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
867 case "phishing", "$phishing":
868 flags.Phishing = true
869 case "mdnsent", "$mdnsent":
872 if err := store.CheckKeyword(word); err == nil {
873 keywords[word] = true