10	cryptrand "crypto/rand"
 
24	"golang.org/x/exp/maps"
 
26	"golang.org/x/text/unicode/norm"
 
28	"github.com/mjl-/bstore"
 
30	"github.com/mjl-/mox/message"
 
31	"github.com/mjl-/mox/metrics"
 
32	"github.com/mjl-/mox/mlog"
 
33	"github.com/mjl-/mox/mox-"
 
34	"github.com/mjl-/mox/store"
 
37type importListener struct {
 
39	Events   chan importEvent
 
40	Register chan bool // Whether register is successful.
 
43type importEvent struct {
 
45	SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
 
46	Event  any    // nil, importCount, importProblem, importDone, importAborted
 
47	Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
 
50type importAbortRequest struct {
 
55var importers = struct {
 
56	Register   chan *importListener
 
57	Unregister chan *importListener
 
58	Events     chan importEvent
 
59	Abort      chan importAbortRequest
 
62	make(chan *importListener, 1),
 
63	make(chan *importListener, 1),
 
64	make(chan importEvent),
 
65	make(chan importAbortRequest),
 
69// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
 
71	log := mlog.New("httpimport", nil)
 
73		if x := recover(); x != nil {
 
74			log.Error("import manage panic", slog.Any("err", x))
 
76			metrics.PanicInc(metrics.Importmanage)
 
81		MailboxCounts map[string]int
 
85		Listeners     map[*importListener]struct{}
 
89	imports := map[string]state{} // Token to state.
 
92		case l := <-importers.Register:
 
93			// If we have state, send it so the client is up to date.
 
94			s, ok := imports[l.Token]
 
99			s.Listeners[l] = struct{}{}
 
101			sendEvent := func(kind string, v any) {
 
102				buf, err := json.Marshal(v)
 
104					log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
 
107				ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
 
110				case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
 
112					log.Debug("dropped initial import event to slow consumer")
 
116			for m, c := range s.MailboxCounts {
 
117				sendEvent("count", importCount{m, c})
 
119			for _, p := range s.Problems {
 
120				sendEvent("problem", importProblem{p})
 
123				sendEvent("done", importDone{})
 
124			} else if s.Aborted != nil {
 
125				sendEvent("aborted", importAborted{})
 
128		case l := <-importers.Unregister:
 
129			delete(imports[l.Token].Listeners, l)
 
131		case e := <-importers.Events:
 
132			s, ok := imports[e.Token]
 
135					MailboxCounts: map[string]int{},
 
136					Listeners:     map[*importListener]struct{}{},
 
141			for l := range s.Listeners {
 
145					log.Debug("dropped import event to slow consumer")
 
149				s := imports[e.Token]
 
150				switch x := e.Event.(type) {
 
152					s.MailboxCounts[x.Mailbox] = x.Count
 
154					s.Problems = append(s.Problems, x.Message)
 
165		case a := <-importers.Abort:
 
166			s, ok := imports[a.Token]
 
168				a.Response <- errors.New("import not found")
 
172				a.Response <- errors.New("import already finished")
 
178		case <-importers.Stop:
 
182		// Cleanup old state.
 
183		for t, s := range imports {
 
184			if len(s.Listeners) > 0 {
 
187			if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
 
194type importCount struct {
 
198type importProblem struct {
 
201type importDone struct{}
 
202type importAborted struct{}
 
203type importStep struct {
 
207// importStart prepare the import and launches the goroutine to actually import.
 
208// importStart is responsible for closing f and removing f.
 
209func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
 
212			store.CloseRemoveTempFile(log, f, "upload for import")
 
216	buf := make([]byte, 16)
 
217	if _, err := cryptrand.Read(buf); err != nil {
 
218		return "", false, err
 
220	token := fmt.Sprintf("%x", buf)
 
222	if _, err := f.Seek(0, 0); err != nil {
 
223		return "", false, fmt.Errorf("seek to start of file: %v", err)
 
226	// Recognize file format.
 
228	magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
 
229	magicGzip := []byte{0x1f, 0x8b}
 
230	magic := make([]byte, 4)
 
231	if _, err := f.ReadAt(magic, 0); err != nil {
 
232		return "", true, fmt.Errorf("detecting file format: %v", err)
 
234	if bytes.Equal(magic, magicZip) {
 
236	} else if !bytes.Equal(magic[:2], magicGzip) {
 
237		return "", true, fmt.Errorf("file is not a zip or gzip file")
 
245			return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
 
247		zr, err = zip.NewReader(f, fi.Size())
 
249			return "", true, fmt.Errorf("opening zip file: %v", err)
 
252		gzr, err := gzip.NewReader(f)
 
254			return "", true, fmt.Errorf("gunzip: %v", err)
 
256		tr = tar.NewReader(gzr)
 
259	acc, err := store.OpenAccount(log, accName)
 
261		return "", false, fmt.Errorf("open acount: %v", err)
 
263	acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
 
265	tx, err := acc.DB.Begin(context.Background(), true)
 
269		log.Check(xerr, "closing account")
 
270		return "", false, fmt.Errorf("start transaction: %v", err)
 
273	// Ensure token is registered before returning, with context that can be canceled.
 
274	ctx, cancel := context.WithCancel(mox.Shutdown)
 
275	importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
 
277	log.Info("starting import")
 
278	go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
 
279	f = nil // importMessages is now responsible for closing and removing.
 
281	return token, false, nil
 
284// importMessages imports the messages from zip/tgz file f.
 
285// importMessages is responsible for unlocking and closing acc, and closing tx and f.
 
286func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
 
287	// If a fatal processing error occurs, we panic with this type.
 
288	type importError struct{ Err error }
 
290	// During import we collect all changes and broadcast them at the end, when successful.
 
291	var changes []store.Change
 
293	// ID's of delivered messages. If we have to rollback, we have to remove this files.
 
294	var deliveredIDs []int64
 
296	sendEvent := func(kind string, v any) {
 
297		buf, err := json.Marshal(v)
 
299			log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
 
302		ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
 
303		importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
 
306	canceled := func() bool {
 
309			sendEvent("aborted", importAborted{})
 
316	problemf := func(format string, args ...any) {
 
317		msg := fmt.Sprintf(format, args...)
 
318		sendEvent("problem", importProblem{Message: msg})
 
322		store.CloseRemoveTempFile(log, f, "uploaded messages")
 
324		for _, id := range deliveredIDs {
 
325			p := acc.MessagePath(id)
 
327			log.Check(err, "closing message file after import error", slog.String("path", p))
 
331			log.Check(err, "rolling back transaction")
 
336			log.Check(err, "closing account")
 
343		if err, ok := x.(importError); ok {
 
344			log.Errorx("import error", err.Err)
 
345			problemf("%s (aborting)", err.Err)
 
346			sendEvent("aborted", importAborted{})
 
348			log.Error("import panic", slog.Any("err", x))
 
350			metrics.PanicInc(metrics.Importmessages)
 
354	ximportcheckf := func(err error, format string, args ...any) {
 
356			panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
 
360	err := acc.ThreadingWait(log)
 
361	ximportcheckf(err, "waiting for account thread upgrade")
 
363	conf, _ := acc.Conf()
 
365	jf, _, err := acc.OpenJunkFilter(ctx, log)
 
366	if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
 
367		ximportcheckf(err, "open junk filter")
 
371			err := jf.CloseDiscard()
 
372			log.Check(err, "closing junk filter")
 
376	// Mailboxes we imported, and message counts.
 
377	mailboxes := map[string]store.Mailbox{}
 
378	messages := map[string]int{}
 
380	maxSize := acc.QuotaMessageSize()
 
381	du := store.DiskUsage{ID: 1}
 
383	ximportcheckf(err, "get disk usage")
 
386	// For maildirs, we are likely to get a possible dovecot-keywords file after having
 
387	// imported the messages. Once we see the keywords, we use them. But before that
 
388	// time we remember which messages miss a keywords. Once the keywords become
 
389	// available, we'll fix up the flags for the unknown messages
 
390	mailboxKeywords := map[string]map[rune]string{}                // Mailbox to 'a'-'z' to flag name.
 
391	mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
 
393	// We keep the mailboxes we deliver to up to date with count and keywords (non-system flags).
 
394	destMailboxCounts := map[int64]store.MailboxCounts{}
 
395	destMailboxKeywords := map[int64]map[string]bool{}
 
397	// Previous mailbox an event was sent for. We send an event for new mailboxes, when
 
398	// another 100 messages were added, when adding a message to another mailbox, and
 
399	// finally at the end as a closing statement.
 
400	var prevMailbox string
 
402	var modseq store.ModSeq // Assigned on first message, used for all messages.
 
404	trainMessage := func(m *store.Message, p message.Part, pos string) {
 
405		words, err := jf.ParseMessage(p)
 
407			problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
 
410		err = jf.Train(ctx, !m.Junk, words)
 
412			problemf("training junk filter for message %s: %v (continuing)", pos, err)
 
415		m.TrainedJunk = &m.Junk
 
418	openTrainMessage := func(m *store.Message) {
 
419		path := acc.MessagePath(m.ID)
 
420		f, err := os.Open(path)
 
422			problemf("opening message again for training junk filter: %v (continuing)", err)
 
427			log.Check(err, "closing file after training junkfilter")
 
429		p, err := m.LoadPart(f)
 
431			problemf("loading parsed message again for training junk filter: %v (continuing)", err)
 
434		trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
 
437	xensureMailbox := func(name string) store.Mailbox {
 
438		name = norm.NFC.String(name)
 
439		if strings.ToLower(name) == "inbox" {
 
443		if mb, ok := mailboxes[name]; ok {
 
449		for i, e := range strings.Split(name, "/") {
 
455			if _, ok := mailboxes[p]; ok {
 
459			q := bstore.QueryTx[store.Mailbox](tx)
 
460			q.FilterNonzero(store.Mailbox{Name: p})
 
463			if err == bstore.ErrAbsent {
 
464				uidvalidity, err := acc.NextUIDValidity(tx)
 
465				ximportcheckf(err, "finding next uid validity")
 
468					UIDValidity: uidvalidity,
 
471					// Do not assign special-use flags. This existing account probably already has such mailboxes.
 
474				ximportcheckf(err, "inserting mailbox in database")
 
476				if tx.Get(&store.Subscription{Name: p}) != nil {
 
477					err := tx.Insert(&store.Subscription{Name: p})
 
478					ximportcheckf(err, "subscribing to imported mailbox")
 
480				changes = append(changes, store.ChangeAddMailbox{Mailbox: mb, Flags: []string{`\Subscribed`}})
 
481			} else if err != nil {
 
482				ximportcheckf(err, "creating mailbox %s (aborting)", p)
 
484			if prevMailbox != "" && mb.Name != prevMailbox {
 
485				sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
 
487			mailboxes[mb.Name] = mb
 
488			sendEvent("count", importCount{mb.Name, 0})
 
489			prevMailbox = mb.Name
 
494	xdeliver := func(mb store.Mailbox, m *store.Message, f *os.File, pos string) {
 
498			log.Check(err, "closing temporary message file for delivery")
 
499			err := os.Remove(name)
 
500			log.Check(err, "removing temporary message file for delivery")
 
503		m.MailboxOrigID = mb.ID
 
506		if maxSize > 0 && du.MessageSize+addSize > maxSize {
 
507			ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
 
512			modseq, err = acc.NextModSeq(tx)
 
513			ximportcheckf(err, "assigning next modseq")
 
518		mc := destMailboxCounts[mb.ID]
 
519		mc.Add(m.MailboxCounts())
 
520		destMailboxCounts[mb.ID] = mc
 
522		if len(m.Keywords) > 0 {
 
523			if destMailboxKeywords[mb.ID] == nil {
 
524				destMailboxKeywords[mb.ID] = map[string]bool{}
 
526			for _, k := range m.Keywords {
 
527				destMailboxKeywords[mb.ID][k] = true
 
531		// Parse message and store parsed information for later fast retrieval.
 
532		p, err := message.EnsurePart(log.Logger, false, f, m.Size)
 
534			problemf("parsing message %s: %s (continuing)", pos, err)
 
536		m.ParsedBuf, err = json.Marshal(p)
 
537		ximportcheckf(err, "marshal parsed message structure")
 
539		// Set fields needed for future threading. By doing it now, DeliverMessage won't
 
540		// have to parse the Part again.
 
541		p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
 
542		m.PrepareThreading(log, &p)
 
544		if m.Received.IsZero() {
 
545			if p.Envelope != nil && !p.Envelope.Date.IsZero() {
 
546				m.Received = p.Envelope.Date
 
548				m.Received = time.Now()
 
552		// We set the flags that Deliver would set now and train ourselves. This prevents
 
553		// Deliver from training, which would open the junk filter, change it, and write it
 
554		// back to disk, for each message (slow).
 
555		m.JunkFlagsForMailbox(mb, conf)
 
556		if jf != nil && m.NeedsTraining() {
 
557			trainMessage(m, p, pos)
 
562		const nothreads = true
 
563		const updateDiskUsage = false
 
564		if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
 
565			problemf("delivering message %s: %s (continuing)", pos, err)
 
568		deliveredIDs = append(deliveredIDs, m.ID)
 
569		changes = append(changes, m.ChangeAddUID())
 
571		if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
 
572			prevMailbox = mb.Name
 
573			sendEvent("count", importCount{mb.Name, messages[mb.Name]})
 
577	ximportMbox := func(mailbox, filename string, r io.Reader) {
 
579			problemf("empty mailbox name for mbox file %s (skipping)", filename)
 
582		mb := xensureMailbox(mailbox)
 
584		mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
 
586			m, mf, pos, err := mr.Next()
 
589			} else if err != nil {
 
590				ximportcheckf(err, "next message in mbox file")
 
593			xdeliver(mb, m, mf, pos)
 
597	ximportMaildir := func(mailbox, filename string, r io.Reader) {
 
599			problemf("empty mailbox name for maildir file %s (skipping)", filename)
 
602		mb := xensureMailbox(mailbox)
 
604		f, err := store.CreateMessageTemp(log, "import")
 
605		ximportcheckf(err, "creating temp message")
 
608				store.CloseRemoveTempFile(log, f, "message to import")
 
612		// Copy data, changing bare \n into \r\n.
 
613		br := bufio.NewReader(r)
 
614		w := bufio.NewWriter(f)
 
617			line, err := br.ReadBytes('\n')
 
618			if err != nil && err != io.EOF {
 
619				ximportcheckf(err, "reading message")
 
622				if !bytes.HasSuffix(line, []byte("\r\n")) {
 
623					line = append(line[:len(line)-1], "\r\n"...)
 
626				n, err := w.Write(line)
 
627				ximportcheckf(err, "writing message")
 
635		ximportcheckf(err, "writing message")
 
637		var received time.Time
 
638		t := strings.SplitN(path.Base(filename), ".", 2)
 
639		if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
 
640			received = time.Unix(v, 0)
 
643		// Parse flags. See https://cr.yp.to/proto/maildir.html.
 
645		var flags store.Flags
 
646		keywords := map[string]bool{}
 
647		t = strings.SplitN(path.Base(filename), ":2,", 2)
 
649			for _, c := range t[1] {
 
652					// Passed, doesn't map to a common IMAP flag.
 
654					flags.Answered = true
 
664					if c >= 'a' && c <= 'z' {
 
665						dovecotKeywords, ok := mailboxKeywords[mailbox]
 
667							// No keywords file seen yet, we'll try later if it comes in.
 
668							keepFlags += string(c)
 
669						} else if kw, ok := dovecotKeywords[c]; ok {
 
670							flagSet(&flags, keywords, kw)
 
680			Keywords: maps.Keys(keywords),
 
683		xdeliver(mb, &m, f, filename)
 
686			if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
 
687				mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
 
689			mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
 
693	importFile := func(name string, r io.Reader) {
 
696		if strings.HasPrefix(name, skipMailboxPrefix) {
 
697			name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
 
700		if strings.HasSuffix(name, "/") {
 
701			name = strings.TrimSuffix(name, "/")
 
702			dir := path.Dir(name)
 
703			switch path.Base(dir) {
 
704			case "new", "cur", "tmp":
 
705				// Maildir, ensure it exists.
 
706				mailbox := path.Dir(dir)
 
707				xensureMailbox(mailbox)
 
709			// Otherwise, this is just a directory that probably holds mbox files and maildirs.
 
713		if strings.HasSuffix(path.Base(name), ".mbox") {
 
714			mailbox := name[:len(name)-len(".mbox")]
 
715			ximportMbox(mailbox, origName, r)
 
718		dir := path.Dir(name)
 
719		dirbase := path.Base(dir)
 
721		case "new", "cur", "tmp":
 
722			mailbox := path.Dir(dir)
 
723			ximportMaildir(mailbox, origName, r)
 
725			if path.Base(name) == "dovecot-keywords" {
 
726				mailbox := path.Dir(name)
 
727				dovecotKeywords := map[rune]string{}
 
728				words, err := store.ParseDovecotKeywordsFlags(r, log)
 
729				log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
 
730				for i, kw := range words {
 
731					dovecotKeywords['a'+rune(i)] = kw
 
733				mailboxKeywords[mailbox] = dovecotKeywords
 
735				for id, chars := range mailboxMissingKeywordMessages[mailbox] {
 
736					var flags, zeroflags store.Flags
 
737					keywords := map[string]bool{}
 
738					for _, c := range chars {
 
739						kw, ok := dovecotKeywords[c]
 
741							problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
 
744						flagSet(&flags, keywords, kw)
 
746					if flags == zeroflags && len(keywords) == 0 {
 
750					m := store.Message{ID: id}
 
752					ximportcheckf(err, "get imported message for flag update")
 
754					mc := destMailboxCounts[m.MailboxID]
 
755					mc.Sub(m.MailboxCounts())
 
758					m.Flags = m.Flags.Set(flags, flags)
 
759					m.Keywords = maps.Keys(keywords)
 
760					sort.Strings(m.Keywords)
 
762					mc.Add(m.MailboxCounts())
 
763					destMailboxCounts[m.MailboxID] = mc
 
765					if len(m.Keywords) > 0 {
 
766						if destMailboxKeywords[m.MailboxID] == nil {
 
767							destMailboxKeywords[m.MailboxID] = map[string]bool{}
 
769						for _, k := range m.Keywords {
 
770							destMailboxKeywords[m.MailboxID][k] = true
 
774					// We train before updating, training may set m.TrainedJunk.
 
775					if jf != nil && m.NeedsTraining() {
 
779					ximportcheckf(err, "updating message after flag update")
 
780					changes = append(changes, m.ChangeFlags(oflags))
 
782				delete(mailboxMissingKeywordMessages, mailbox)
 
784				problemf("unrecognized file %s (skipping)", origName)
 
790		for _, f := range zr.File {
 
796				problemf("opening file %s in zip: %v", f.Name, err)
 
799			importFile(f.Name, zf)
 
801			log.Check(err, "closing file from zip")
 
811			} else if err != nil {
 
812				problemf("reading next tar header: %v (aborting)", err)
 
815			importFile(h.Name, tr)
 
820	for _, count := range messages {
 
823	log.Debug("messages imported", slog.Int("total", total))
 
825	// Send final update for count of last-imported mailbox.
 
826	if prevMailbox != "" {
 
827		sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
 
831	if len(deliveredIDs) > 0 {
 
832		sendEvent("step", importStep{"matching messages with threads"})
 
833		err = acc.AssignThreads(ctx, log, tx, deliveredIDs[0], 0, io.Discard)
 
834		ximportcheckf(err, "assigning messages to threads")
 
837	// Update mailboxes with counts and keywords.
 
838	for mbID, mc := range destMailboxCounts {
 
839		mb := store.Mailbox{ID: mbID}
 
841		ximportcheckf(err, "loading mailbox for counts and keywords")
 
843		if mb.MailboxCounts != mc {
 
844			mb.MailboxCounts = mc
 
845			changes = append(changes, mb.ChangeCounts())
 
848		keywords := destMailboxKeywords[mb.ID]
 
850		mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(keywords))
 
853		ximportcheckf(err, "updating mailbox count and keywords")
 
855			changes = append(changes, mb.ChangeKeywords())
 
859	err = acc.AddMessageSize(log, tx, addSize)
 
860	ximportcheckf(err, "updating disk usage after import")
 
864	ximportcheckf(err, "commit")
 
868		if err := jf.Close(); err != nil {
 
869			problemf("saving changes of training junk filter: %v (continuing)", err)
 
870			log.Errorx("saving changes of training junk filter", err)
 
875	store.BroadcastChanges(acc, changes)
 
878	log.Check(err, "closing account after import")
 
881	sendEvent("done", importDone{})
 
884func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
 
886	case "forwarded", "$forwarded":
 
887		flags.Forwarded = true
 
888	case "junk", "$junk":
 
890	case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
 
892	case "phishing", "$phishing":
 
893		flags.Phishing = true
 
894	case "mdnsent", "$mdnsent":
 
897		if err := store.CheckKeyword(word); err == nil {
 
898			keywords[word] = true