10 cryptrand "crypto/rand"
25 "golang.org/x/text/unicode/norm"
27 "github.com/mjl-/bstore"
29 "github.com/mjl-/mox/message"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/store"
36type importListener struct {
38 Events chan importEvent
39 Register chan bool // Whether register is successful.
42type importEvent struct {
44 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
45 Event any // nil, importCount, importProblem, importDone, importAborted
46 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
49type importAbortRequest struct {
54var importers = struct {
55 Register chan *importListener
56 Unregister chan *importListener
57 Events chan importEvent
58 Abort chan importAbortRequest
61 make(chan *importListener, 1),
62 make(chan *importListener, 1),
63 make(chan importEvent),
64 make(chan importAbortRequest),
68// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
70 log := mlog.New("httpimport", nil)
72 if x := recover(); x != nil {
73 log.Error("import manage panic", slog.Any("err", x))
75 metrics.PanicInc(metrics.Importmanage)
80 MailboxCounts map[string]int
84 Listeners map[*importListener]struct{}
88 imports := map[string]state{} // Token to state.
91 case l := <-importers.Register:
92 // If we have state, send it so the client is up to date.
93 s, ok := imports[l.Token]
98 s.Listeners[l] = struct{}{}
100 sendEvent := func(kind string, v any) {
101 buf, err := json.Marshal(v)
103 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
106 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
109 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
111 log.Debug("dropped initial import event to slow consumer")
115 for m, c := range s.MailboxCounts {
116 sendEvent("count", importCount{m, c})
118 for _, p := range s.Problems {
119 sendEvent("problem", importProblem{p})
122 sendEvent("done", importDone{})
123 } else if s.Aborted != nil {
124 sendEvent("aborted", importAborted{})
127 case l := <-importers.Unregister:
128 delete(imports[l.Token].Listeners, l)
130 case e := <-importers.Events:
131 s, ok := imports[e.Token]
134 MailboxCounts: map[string]int{},
135 Listeners: map[*importListener]struct{}{},
140 for l := range s.Listeners {
144 log.Debug("dropped import event to slow consumer")
148 s := imports[e.Token]
149 switch x := e.Event.(type) {
151 s.MailboxCounts[x.Mailbox] = x.Count
153 s.Problems = append(s.Problems, x.Message)
164 case a := <-importers.Abort:
165 s, ok := imports[a.Token]
167 a.Response <- errors.New("import not found")
171 a.Response <- errors.New("import already finished")
177 case <-importers.Stop:
181 // Cleanup old state.
182 for t, s := range imports {
183 if len(s.Listeners) > 0 {
186 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
193type importCount struct {
197type importProblem struct {
200type importDone struct{}
201type importAborted struct{}
202type importStep struct {
206// importStart prepare the import and launches the goroutine to actually import.
207// importStart is responsible for closing f and removing f.
208func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
211 store.CloseRemoveTempFile(log, f, "upload for import")
216 cryptrand.Read(buf[:])
217 token := fmt.Sprintf("%x", buf)
219 if _, err := f.Seek(0, 0); err != nil {
220 return "", false, fmt.Errorf("seek to start of file: %v", err)
223 // Recognize file format.
225 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
226 magicGzip := []byte{0x1f, 0x8b}
227 magic := make([]byte, 4)
228 if _, err := f.ReadAt(magic, 0); err != nil {
229 return "", true, fmt.Errorf("detecting file format: %v", err)
231 if bytes.Equal(magic, magicZip) {
233 } else if !bytes.Equal(magic[:2], magicGzip) {
234 return "", true, fmt.Errorf("file is not a zip or gzip file")
242 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
244 zr, err = zip.NewReader(f, fi.Size())
246 return "", true, fmt.Errorf("opening zip file: %v", err)
249 gzr, err := gzip.NewReader(f)
251 return "", true, fmt.Errorf("gunzip: %v", err)
253 tr = tar.NewReader(gzr)
256 acc, err := store.OpenAccount(log, accName, false)
258 return "", false, fmt.Errorf("open acount: %v", err)
260 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
262 tx, err := acc.DB.Begin(context.Background(), true)
266 log.Check(xerr, "closing account")
267 return "", false, fmt.Errorf("start transaction: %v", err)
270 // Ensure token is registered before returning, with context that can be canceled.
271 ctx, cancel := context.WithCancel(mox.Shutdown)
272 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
274 log.Info("starting import")
275 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
276 f = nil // importMessages is now responsible for closing and removing.
278 return token, false, nil
281// importMessages imports the messages from zip/tgz file f.
282// importMessages is responsible for unlocking and closing acc, and closing tx and f.
283func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
284 // If a fatal processing error occurs, we panic with this type.
285 type importError struct{ Err error }
287 // During import we collect all changes and broadcast them at the end, when successful.
288 var changes []store.Change
290 // ID's of delivered messages. If we have to rollback, we have to remove this files.
293 sendEvent := func(kind string, v any) {
294 buf, err := json.Marshal(v)
296 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
299 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
300 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
303 canceled := func() bool {
306 sendEvent("aborted", importAborted{})
313 problemf := func(format string, args ...any) {
314 msg := fmt.Sprintf(format, args...)
315 sendEvent("problem", importProblem{Message: msg})
319 store.CloseRemoveTempFile(log, f, "uploaded messages")
321 for _, id := range newIDs {
322 p := acc.MessagePath(id)
324 log.Check(err, "closing message file after import error", slog.String("path", p))
328 log.Check(err, "rolling back transaction")
333 log.Check(err, "closing account")
340 if err, ok := x.(importError); ok {
341 log.Errorx("import error", err.Err)
342 problemf("%s (aborting)", err.Err)
343 sendEvent("aborted", importAborted{})
345 log.Error("import panic", slog.Any("err", x))
347 metrics.PanicInc(metrics.Importmessages)
351 ximportcheckf := func(err error, format string, args ...any) {
353 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
357 err := acc.ThreadingWait(log)
358 ximportcheckf(err, "waiting for account thread upgrade")
360 conf, _ := acc.Conf()
362 jf, _, err := acc.OpenJunkFilter(ctx, log)
363 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
364 ximportcheckf(err, "open junk filter")
368 err := jf.CloseDiscard()
369 log.Check(err, "closing junk filter")
373 // Mailboxes we imported, and message counts.
374 mailboxNames := map[string]*store.Mailbox{}
375 mailboxIDs := map[int64]*store.Mailbox{}
376 mailboxKeywordCounts := map[int64]int{}
377 messages := map[string]int{}
379 maxSize := acc.QuotaMessageSize()
380 du := store.DiskUsage{ID: 1}
382 ximportcheckf(err, "get disk usage")
385 // For maildirs, we are likely to get a possible dovecot-keywords file after having
386 // imported the messages. Once we see the keywords, we use them. But before that
387 // time we remember which messages miss keywords. Once the keywords become
388 // available, we'll fix up the flags for the unknown messages
389 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
390 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
392 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
393 // another 100 messages were added, when adding a message to another mailbox, and
394 // finally at the end as a closing statement.
395 var prevMailbox string
397 var modseq store.ModSeq // Assigned on first message, used for all messages.
399 trainMessage := func(m *store.Message, p message.Part, pos string) {
400 words, err := jf.ParseMessage(p)
402 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
405 err = jf.Train(ctx, !m.Junk, words)
407 problemf("training junk filter for message %s: %v (continuing)", pos, err)
410 m.TrainedJunk = &m.Junk
413 openTrainMessage := func(m *store.Message) {
414 path := acc.MessagePath(m.ID)
415 f, err := os.Open(path)
417 problemf("opening message again for training junk filter: %v (continuing)", err)
422 log.Check(err, "closing file after training junkfilter")
424 p, err := m.LoadPart(f)
426 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
429 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
432 xensureMailbox := func(name string) *store.Mailbox {
433 // Ensure name is normalized.
434 name = norm.NFC.String(name)
435 name, _, err := store.CheckMailboxName(name, true)
436 ximportcheckf(err, "checking mailbox name")
438 if mb, ok := mailboxNames[name]; ok {
443 var mb *store.Mailbox
444 var parent store.Mailbox
445 for i, e := range strings.Split(name, "/") {
451 if _, ok := mailboxNames[p]; ok {
455 mb, err = acc.MailboxFind(tx, p)
456 ximportcheckf(err, "looking up mailbox %s to import to (aborting)", p)
458 uidvalidity, err := acc.NextUIDValidity(tx)
459 ximportcheckf(err, "finding next uid validity")
463 modseq, err = acc.NextModSeq(tx)
464 ximportcheckf(err, "assigning next modseq")
472 UIDValidity: uidvalidity,
475 // Do not assign special-use flags. This existing account probably already has such mailboxes.
478 ximportcheckf(err, "inserting mailbox in database")
481 if tx.Get(&store.Subscription{Name: p}) != nil {
482 err := tx.Insert(&store.Subscription{Name: p})
483 ximportcheckf(err, "subscribing to imported mailbox")
485 changes = append(changes, store.ChangeAddMailbox{Mailbox: *mb, Flags: []string{`\Subscribed`}})
487 if prevMailbox != "" && mb.Name != prevMailbox {
488 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
490 mailboxKeywordCounts[mb.ID] = len(mb.Keywords)
491 mailboxNames[mb.Name] = mb
492 mailboxIDs[mb.ID] = mb
493 sendEvent("count", importCount{mb.Name, 0})
494 prevMailbox = mb.Name
499 xdeliver := func(mb *store.Mailbox, m *store.Message, f *os.File, pos string) {
500 defer store.CloseRemoveTempFile(log, f, "message file for import")
502 m.MailboxOrigID = mb.ID
505 if maxSize > 0 && du.MessageSize+addSize > maxSize {
506 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
511 modseq, err = acc.NextModSeq(tx)
512 ximportcheckf(err, "assigning next modseq")
517 // Parse message and store parsed information for later fast retrieval.
518 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
520 problemf("parsing message %s: %s (continuing)", pos, err)
522 m.ParsedBuf, err = json.Marshal(p)
523 ximportcheckf(err, "marshal parsed message structure")
525 // Set fields needed for future threading. By doing it now, MessageAdd won't
526 // have to parse the Part again.
527 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
528 m.PrepareThreading(log, &p)
530 if m.Received.IsZero() {
531 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
532 m.Received = p.Envelope.Date
534 m.Received = time.Now()
538 // We set the flags that Deliver would set now and train ourselves. This prevents
539 // Deliver from training, which would open the junk filter, change it, and write it
540 // back to disk, for each message (slow).
541 m.JunkFlagsForMailbox(*mb, conf)
542 if jf != nil && m.NeedsTraining() {
543 trainMessage(m, p, pos)
546 opts := store.AddOpts{
550 SkipUpdateDiskUsage: true,
551 SkipCheckQuota: true,
554 if err := acc.MessageAdd(log, tx, mb, m, f, opts); err != nil {
555 problemf("delivering message %s: %s (continuing)", pos, err)
558 newIDs = append(newIDs, m.ID)
559 changes = append(changes, m.ChangeAddUID(*mb))
561 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
562 prevMailbox = mb.Name
563 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
567 ximportMbox := func(mailbox, filename string, r io.Reader) {
569 problemf("empty mailbox name for mbox file %s (skipping)", filename)
572 mb := xensureMailbox(mailbox)
574 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
576 m, mf, pos, err := mr.Next()
579 } else if err != nil {
580 ximportcheckf(err, "next message in mbox file")
583 xdeliver(mb, m, mf, pos)
587 ximportMaildir := func(mailbox, filename string, r io.Reader) {
589 problemf("empty mailbox name for maildir file %s (skipping)", filename)
592 mb := xensureMailbox(mailbox)
594 f, err := store.CreateMessageTemp(log, "import")
595 ximportcheckf(err, "creating temp message")
598 store.CloseRemoveTempFile(log, f, "message to import")
602 // Copy data, changing bare \n into \r\n.
603 br := bufio.NewReader(r)
604 w := bufio.NewWriter(f)
607 line, err := br.ReadBytes('\n')
608 if err != nil && err != io.EOF {
609 ximportcheckf(err, "reading message")
612 if !bytes.HasSuffix(line, []byte("\r\n")) {
613 line = append(line[:len(line)-1], "\r\n"...)
616 n, err := w.Write(line)
617 ximportcheckf(err, "writing message")
625 ximportcheckf(err, "writing message")
627 var received time.Time
628 t := strings.SplitN(path.Base(filename), ".", 2)
629 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
630 received = time.Unix(v, 0)
633 // Parse flags. See https://cr.yp.to/proto/maildir.html.
635 var flags store.Flags
636 keywords := map[string]bool{}
637 t = strings.SplitN(path.Base(filename), ":2,", 2)
639 for _, c := range t[1] {
642 // Passed, doesn't map to a common IMAP flag.
644 flags.Answered = true
654 if c >= 'a' && c <= 'z' {
655 dovecotKeywords, ok := mailboxKeywords[mailbox]
657 // No keywords file seen yet, we'll try later if it comes in.
658 keepFlags += string(c)
659 } else if kw, ok := dovecotKeywords[c]; ok {
660 flagSet(&flags, keywords, kw)
670 Keywords: slices.Sorted(maps.Keys(keywords)),
673 xdeliver(mb, &m, f, filename)
676 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
677 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
679 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
683 importFile := func(name string, r io.Reader) {
686 if strings.HasPrefix(name, skipMailboxPrefix) {
687 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
690 if strings.HasSuffix(name, "/") {
691 name = strings.TrimSuffix(name, "/")
692 dir := path.Dir(name)
693 switch path.Base(dir) {
694 case "new", "cur", "tmp":
695 // Maildir, ensure it exists.
696 mailbox := path.Dir(dir)
697 xensureMailbox(mailbox)
699 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
703 if strings.HasSuffix(path.Base(name), ".mbox") {
704 mailbox := name[:len(name)-len(".mbox")]
705 ximportMbox(mailbox, origName, r)
708 dir := path.Dir(name)
709 dirbase := path.Base(dir)
711 case "new", "cur", "tmp":
712 mailbox := path.Dir(dir)
713 ximportMaildir(mailbox, origName, r)
717 if path.Base(name) != "dovecot-keywords" {
718 problemf("unrecognized file %s (skipping)", origName)
722 // Handle dovecot-keywords.
723 mailbox := path.Dir(name)
724 dovecotKeywords := map[rune]string{}
725 words, err := store.ParseDovecotKeywordsFlags(r, log)
726 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
727 for i, kw := range words {
728 dovecotKeywords['a'+rune(i)] = kw
730 mailboxKeywords[mailbox] = dovecotKeywords
732 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
733 var flags, zeroflags store.Flags
734 keywords := map[string]bool{}
735 for _, c := range chars {
736 kw, ok := dovecotKeywords[c]
738 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
741 flagSet(&flags, keywords, kw)
743 if flags == zeroflags && len(keywords) == 0 {
747 m := store.Message{ID: id}
749 ximportcheckf(err, "get imported message for flag update")
751 mb := mailboxIDs[m.MailboxID]
752 mb.Sub(m.MailboxCounts())
755 m.Flags = m.Flags.Set(flags, flags)
756 m.Keywords = slices.Sorted(maps.Keys(keywords))
758 mb.Add(m.MailboxCounts())
760 mb.Keywords, _ = store.MergeKeywords(mb.Keywords, m.Keywords)
762 // We train before updating, training may set m.TrainedJunk.
763 if jf != nil && m.NeedsTraining() {
767 ximportcheckf(err, "updating message after flag update")
768 changes = append(changes, m.ChangeFlags(oflags, *mb))
770 delete(mailboxMissingKeywordMessages, mailbox)
774 for _, f := range zr.File {
780 problemf("opening file %s in zip: %v", f.Name, err)
783 importFile(f.Name, zf)
785 log.Check(err, "closing file from zip")
795 } else if err != nil {
796 problemf("reading next tar header: %v (aborting)", err)
799 importFile(h.Name, tr)
804 for _, count := range messages {
807 log.Debug("messages imported", slog.Int("total", total))
809 // Send final update for count of last-imported mailbox.
810 if prevMailbox != "" {
811 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
816 sendEvent("step", importStep{"matching messages with threads"})
817 err = acc.AssignThreads(ctx, log, tx, newIDs[0], 0, io.Discard)
818 ximportcheckf(err, "assigning messages to threads")
821 // Update mailboxes with counts and keywords.
822 for _, mb := range mailboxIDs {
824 ximportcheckf(err, "updating mailbox count and keywords")
826 changes = append(changes, mb.ChangeCounts())
827 if len(mb.Keywords) != mailboxKeywordCounts[mb.ID] {
828 changes = append(changes, mb.ChangeKeywords())
832 err = acc.AddMessageSize(log, tx, addSize)
833 ximportcheckf(err, "updating disk usage after import")
837 ximportcheckf(err, "commit")
841 if err := jf.Close(); err != nil {
842 problemf("saving changes of training junk filter: %v (continuing)", err)
843 log.Errorx("saving changes of training junk filter", err)
848 store.BroadcastChanges(acc, changes)
851 log.Check(err, "closing account after import")
854 sendEvent("done", importDone{})
857func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
859 case "forwarded", "$forwarded":
860 flags.Forwarded = true
861 case "junk", "$junk":
863 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
865 case "phishing", "$phishing":
866 flags.Phishing = true
867 case "mdnsent", "$mdnsent":
870 if err := store.CheckKeyword(word); err == nil {
871 keywords[word] = true