1package webaccount
2
3import (
4 "archive/tar"
5 "archive/zip"
6 "bufio"
7 "bytes"
8 "compress/gzip"
9 "context"
10 cryptrand "crypto/rand"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "log/slog"
16 "os"
17 "path"
18 "runtime/debug"
19 "sort"
20 "strconv"
21 "strings"
22 "time"
23
24 "golang.org/x/exp/maps"
25
26 "golang.org/x/text/unicode/norm"
27
28 "github.com/mjl-/bstore"
29
30 "github.com/mjl-/mox/message"
31 "github.com/mjl-/mox/metrics"
32 "github.com/mjl-/mox/mlog"
33 "github.com/mjl-/mox/mox-"
34 "github.com/mjl-/mox/store"
35)
36
37type importListener struct {
38 Token string
39 Events chan importEvent
40 Register chan bool // Whether register is successful.
41}
42
43type importEvent struct {
44 Token string
45 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
46 Event any // nil, importCount, importProblem, importDone, importAborted
47 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
48}
49
50type importAbortRequest struct {
51 Token string
52 Response chan error
53}
54
55var importers = struct {
56 Register chan *importListener
57 Unregister chan *importListener
58 Events chan importEvent
59 Abort chan importAbortRequest
60}{
61 make(chan *importListener, 1),
62 make(chan *importListener, 1),
63 make(chan importEvent),
64 make(chan importAbortRequest),
65}
66
67// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
68func ImportManage() {
69 log := mlog.New("httpimport", nil)
70 defer func() {
71 if x := recover(); x != nil {
72 log.Error("import manage panic", slog.Any("err", x))
73 debug.PrintStack()
74 metrics.PanicInc(metrics.Importmanage)
75 }
76 }()
77
78 type state struct {
79 MailboxCounts map[string]int
80 Problems []string
81 Done *time.Time
82 Aborted *time.Time
83 Listeners map[*importListener]struct{}
84 Cancel func()
85 }
86
87 imports := map[string]state{} // Token to state.
88 for {
89 select {
90 case l := <-importers.Register:
91 // If we have state, send it so the client is up to date.
92 if s, ok := imports[l.Token]; ok {
93 l.Register <- true
94 s.Listeners[l] = struct{}{}
95
96 sendEvent := func(kind string, v any) {
97 buf, err := json.Marshal(v)
98 if err != nil {
99 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
100 return
101 }
102 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
103
104 select {
105 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
106 default:
107 log.Debug("dropped initial import event to slow consumer")
108 }
109 }
110
111 for m, c := range s.MailboxCounts {
112 sendEvent("count", importCount{m, c})
113 }
114 for _, p := range s.Problems {
115 sendEvent("problem", importProblem{p})
116 }
117 if s.Done != nil {
118 sendEvent("done", importDone{})
119 } else if s.Aborted != nil {
120 sendEvent("aborted", importAborted{})
121 }
122 } else {
123 l.Register <- false
124 }
125
126 case l := <-importers.Unregister:
127 delete(imports[l.Token].Listeners, l)
128
129 case e := <-importers.Events:
130 s, ok := imports[e.Token]
131 if !ok {
132 s = state{
133 MailboxCounts: map[string]int{},
134 Listeners: map[*importListener]struct{}{},
135 Cancel: e.Cancel,
136 }
137 imports[e.Token] = s
138 }
139 for l := range s.Listeners {
140 select {
141 case l.Events <- e:
142 default:
143 log.Debug("dropped import event to slow consumer")
144 }
145 }
146 if e.Event != nil {
147 s := imports[e.Token]
148 switch x := e.Event.(type) {
149 case importCount:
150 s.MailboxCounts[x.Mailbox] = x.Count
151 case importProblem:
152 s.Problems = append(s.Problems, x.Message)
153 case importDone:
154 now := time.Now()
155 s.Done = &now
156 case importAborted:
157 now := time.Now()
158 s.Aborted = &now
159 }
160 imports[e.Token] = s
161 }
162
163 case a := <-importers.Abort:
164 s, ok := imports[a.Token]
165 if !ok {
166 a.Response <- errors.New("import not found")
167 return
168 }
169 if s.Done != nil {
170 a.Response <- errors.New("import already finished")
171 return
172 }
173 s.Cancel()
174 a.Response <- nil
175 }
176
177 // Cleanup old state.
178 for t, s := range imports {
179 if len(s.Listeners) > 0 {
180 continue
181 }
182 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
183 delete(imports, t)
184 }
185 }
186 }
187}
188
189type importCount struct {
190 Mailbox string
191 Count int
192}
193type importProblem struct {
194 Message string
195}
196type importDone struct{}
197type importAborted struct{}
198type importStep struct {
199 Title string
200}
201
202// importStart prepare the import and launches the goroutine to actually import.
203// importStart is responsible for closing f and removing f.
204func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
205 defer func() {
206 if f != nil {
207 store.CloseRemoveTempFile(log, f, "upload for import")
208 }
209 }()
210
211 buf := make([]byte, 16)
212 if _, err := cryptrand.Read(buf); err != nil {
213 return "", false, err
214 }
215 token := fmt.Sprintf("%x", buf)
216
217 if _, err := f.Seek(0, 0); err != nil {
218 return "", false, fmt.Errorf("seek to start of file: %v", err)
219 }
220
221 // Recognize file format.
222 var iszip bool
223 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
224 magicGzip := []byte{0x1f, 0x8b}
225 magic := make([]byte, 4)
226 if _, err := f.ReadAt(magic, 0); err != nil {
227 return "", true, fmt.Errorf("detecting file format: %v", err)
228 }
229 if bytes.Equal(magic, magicZip) {
230 iszip = true
231 } else if !bytes.Equal(magic[:2], magicGzip) {
232 return "", true, fmt.Errorf("file is not a zip or gzip file")
233 }
234
235 var zr *zip.Reader
236 var tr *tar.Reader
237 if iszip {
238 fi, err := f.Stat()
239 if err != nil {
240 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
241 }
242 zr, err = zip.NewReader(f, fi.Size())
243 if err != nil {
244 return "", true, fmt.Errorf("opening zip file: %v", err)
245 }
246 } else {
247 gzr, err := gzip.NewReader(f)
248 if err != nil {
249 return "", true, fmt.Errorf("gunzip: %v", err)
250 }
251 tr = tar.NewReader(gzr)
252 }
253
254 acc, err := store.OpenAccount(log, accName)
255 if err != nil {
256 return "", false, fmt.Errorf("open acount: %v", err)
257 }
258 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
259
260 tx, err := acc.DB.Begin(context.Background(), true)
261 if err != nil {
262 acc.Unlock()
263 xerr := acc.Close()
264 log.Check(xerr, "closing account")
265 return "", false, fmt.Errorf("start transaction: %v", err)
266 }
267
268 // Ensure token is registered before returning, with context that can be canceled.
269 ctx, cancel := context.WithCancel(mox.Shutdown)
270 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
271
272 log.Info("starting import")
273 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
274 f = nil // importMessages is now responsible for closing and removing.
275
276 return token, false, nil
277}
278
279// importMessages imports the messages from zip/tgz file f.
280// importMessages is responsible for unlocking and closing acc, and closing tx and f.
281func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
282 // If a fatal processing error occurs, we panic with this type.
283 type importError struct{ Err error }
284
285 // During import we collect all changes and broadcast them at the end, when successful.
286 var changes []store.Change
287
288 // ID's of delivered messages. If we have to rollback, we have to remove this files.
289 var deliveredIDs []int64
290
291 sendEvent := func(kind string, v any) {
292 buf, err := json.Marshal(v)
293 if err != nil {
294 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
295 return
296 }
297 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
298 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
299 }
300
301 canceled := func() bool {
302 select {
303 case <-ctx.Done():
304 sendEvent("aborted", importAborted{})
305 return true
306 default:
307 return false
308 }
309 }
310
311 problemf := func(format string, args ...any) {
312 msg := fmt.Sprintf(format, args...)
313 sendEvent("problem", importProblem{Message: msg})
314 }
315
316 defer func() {
317 store.CloseRemoveTempFile(log, f, "uploaded messages")
318
319 for _, id := range deliveredIDs {
320 p := acc.MessagePath(id)
321 err := os.Remove(p)
322 log.Check(err, "closing message file after import error", slog.String("path", p))
323 }
324 if tx != nil {
325 err := tx.Rollback()
326 log.Check(err, "rolling back transaction")
327 }
328 if acc != nil {
329 acc.Unlock()
330 err := acc.Close()
331 log.Check(err, "closing account")
332 }
333
334 x := recover()
335 if x == nil {
336 return
337 }
338 if err, ok := x.(importError); ok {
339 log.Errorx("import error", err.Err)
340 problemf("%s (aborting)", err.Err)
341 sendEvent("aborted", importAborted{})
342 } else {
343 log.Error("import panic", slog.Any("err", x))
344 debug.PrintStack()
345 metrics.PanicInc(metrics.Importmessages)
346 }
347 }()
348
349 ximportcheckf := func(err error, format string, args ...any) {
350 if err != nil {
351 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
352 }
353 }
354
355 err := acc.ThreadingWait(log)
356 ximportcheckf(err, "waiting for account thread upgrade")
357
358 conf, _ := acc.Conf()
359
360 jf, _, err := acc.OpenJunkFilter(ctx, log)
361 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
362 ximportcheckf(err, "open junk filter")
363 }
364 defer func() {
365 if jf != nil {
366 err := jf.CloseDiscard()
367 log.Check(err, "closing junk filter")
368 }
369 }()
370
371 // Mailboxes we imported, and message counts.
372 mailboxes := map[string]store.Mailbox{}
373 messages := map[string]int{}
374
375 maxSize := acc.QuotaMessageSize()
376 du := store.DiskUsage{ID: 1}
377 err = tx.Get(&du)
378 ximportcheckf(err, "get disk usage")
379 var addSize int64
380
381 // For maildirs, we are likely to get a possible dovecot-keywords file after having
382 // imported the messages. Once we see the keywords, we use them. But before that
383 // time we remember which messages miss a keywords. Once the keywords become
384 // available, we'll fix up the flags for the unknown messages
385 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
386 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
387
388 // We keep the mailboxes we deliver to up to date with count and keywords (non-system flags).
389 destMailboxCounts := map[int64]store.MailboxCounts{}
390 destMailboxKeywords := map[int64]map[string]bool{}
391
392 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
393 // another 100 messages were added, when adding a message to another mailbox, and
394 // finally at the end as a closing statement.
395 var prevMailbox string
396
397 var modseq store.ModSeq // Assigned on first message, used for all messages.
398
399 trainMessage := func(m *store.Message, p message.Part, pos string) {
400 words, err := jf.ParseMessage(p)
401 if err != nil {
402 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
403 return
404 }
405 err = jf.Train(ctx, !m.Junk, words)
406 if err != nil {
407 problemf("training junk filter for message %s: %v (continuing)", pos, err)
408 return
409 }
410 m.TrainedJunk = &m.Junk
411 }
412
413 openTrainMessage := func(m *store.Message) {
414 path := acc.MessagePath(m.ID)
415 f, err := os.Open(path)
416 if err != nil {
417 problemf("opening message again for training junk filter: %v (continuing)", err)
418 return
419 }
420 defer func() {
421 err := f.Close()
422 log.Check(err, "closing file after training junkfilter")
423 }()
424 p, err := m.LoadPart(f)
425 if err != nil {
426 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
427 return
428 }
429 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
430 }
431
432 xensureMailbox := func(name string) store.Mailbox {
433 name = norm.NFC.String(name)
434 if strings.ToLower(name) == "inbox" {
435 name = "Inbox"
436 }
437
438 if mb, ok := mailboxes[name]; ok {
439 return mb
440 }
441
442 var p string
443 var mb store.Mailbox
444 for i, e := range strings.Split(name, "/") {
445 if i == 0 {
446 p = e
447 } else {
448 p = path.Join(p, e)
449 }
450 if _, ok := mailboxes[p]; ok {
451 continue
452 }
453
454 q := bstore.QueryTx[store.Mailbox](tx)
455 q.FilterNonzero(store.Mailbox{Name: p})
456 var err error
457 mb, err = q.Get()
458 if err == bstore.ErrAbsent {
459 uidvalidity, err := acc.NextUIDValidity(tx)
460 ximportcheckf(err, "finding next uid validity")
461 mb = store.Mailbox{
462 Name: p,
463 UIDValidity: uidvalidity,
464 UIDNext: 1,
465 HaveCounts: true,
466 // Do not assign special-use flags. This existing account probably already has such mailboxes.
467 }
468 err = tx.Insert(&mb)
469 ximportcheckf(err, "inserting mailbox in database")
470
471 if tx.Get(&store.Subscription{Name: p}) != nil {
472 err := tx.Insert(&store.Subscription{Name: p})
473 ximportcheckf(err, "subscribing to imported mailbox")
474 }
475 changes = append(changes, store.ChangeAddMailbox{Mailbox: mb, Flags: []string{`\Subscribed`}})
476 } else if err != nil {
477 ximportcheckf(err, "creating mailbox %s (aborting)", p)
478 }
479 if prevMailbox != "" && mb.Name != prevMailbox {
480 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
481 }
482 mailboxes[mb.Name] = mb
483 sendEvent("count", importCount{mb.Name, 0})
484 prevMailbox = mb.Name
485 }
486 return mb
487 }
488
489 xdeliver := func(mb store.Mailbox, m *store.Message, f *os.File, pos string) {
490 defer func() {
491 name := f.Name()
492 err = f.Close()
493 log.Check(err, "closing temporary message file for delivery")
494 err := os.Remove(name)
495 log.Check(err, "removing temporary message file for delivery")
496 }()
497 m.MailboxID = mb.ID
498 m.MailboxOrigID = mb.ID
499
500 addSize += m.Size
501 if maxSize > 0 && du.MessageSize+addSize > maxSize {
502 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
503 }
504
505 if modseq == 0 {
506 var err error
507 modseq, err = acc.NextModSeq(tx)
508 ximportcheckf(err, "assigning next modseq")
509 }
510 m.CreateSeq = modseq
511 m.ModSeq = modseq
512
513 mc := destMailboxCounts[mb.ID]
514 mc.Add(m.MailboxCounts())
515 destMailboxCounts[mb.ID] = mc
516
517 if len(m.Keywords) > 0 {
518 if destMailboxKeywords[mb.ID] == nil {
519 destMailboxKeywords[mb.ID] = map[string]bool{}
520 }
521 for _, k := range m.Keywords {
522 destMailboxKeywords[mb.ID][k] = true
523 }
524 }
525
526 // Parse message and store parsed information for later fast retrieval.
527 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
528 if err != nil {
529 problemf("parsing message %s: %s (continuing)", pos, err)
530 }
531 m.ParsedBuf, err = json.Marshal(p)
532 ximportcheckf(err, "marshal parsed message structure")
533
534 // Set fields needed for future threading. By doing it now, DeliverMessage won't
535 // have to parse the Part again.
536 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
537 m.PrepareThreading(log, &p)
538
539 if m.Received.IsZero() {
540 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
541 m.Received = p.Envelope.Date
542 } else {
543 m.Received = time.Now()
544 }
545 }
546
547 // We set the flags that Deliver would set now and train ourselves. This prevents
548 // Deliver from training, which would open the junk filter, change it, and write it
549 // back to disk, for each message (slow).
550 m.JunkFlagsForMailbox(mb, conf)
551 if jf != nil && m.NeedsTraining() {
552 trainMessage(m, p, pos)
553 }
554
555 const sync = false
556 const notrain = true
557 const nothreads = true
558 const updateDiskUsage = false
559 if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
560 problemf("delivering message %s: %s (continuing)", pos, err)
561 return
562 }
563 deliveredIDs = append(deliveredIDs, m.ID)
564 changes = append(changes, m.ChangeAddUID())
565 messages[mb.Name]++
566 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
567 prevMailbox = mb.Name
568 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
569 }
570 }
571
572 ximportMbox := func(mailbox, filename string, r io.Reader) {
573 if mailbox == "" {
574 problemf("empty mailbox name for mbox file %s (skipping)", filename)
575 return
576 }
577 mb := xensureMailbox(mailbox)
578
579 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
580 for {
581 m, mf, pos, err := mr.Next()
582 if err == io.EOF {
583 break
584 } else if err != nil {
585 ximportcheckf(err, "next message in mbox file")
586 }
587
588 xdeliver(mb, m, mf, pos)
589 }
590 }
591
592 ximportMaildir := func(mailbox, filename string, r io.Reader) {
593 if mailbox == "" {
594 problemf("empty mailbox name for maildir file %s (skipping)", filename)
595 return
596 }
597 mb := xensureMailbox(mailbox)
598
599 f, err := store.CreateMessageTemp(log, "import")
600 ximportcheckf(err, "creating temp message")
601 defer func() {
602 if f != nil {
603 store.CloseRemoveTempFile(log, f, "message to import")
604 }
605 }()
606
607 // Copy data, changing bare \n into \r\n.
608 br := bufio.NewReader(r)
609 w := bufio.NewWriter(f)
610 var size int64
611 for {
612 line, err := br.ReadBytes('\n')
613 if err != nil && err != io.EOF {
614 ximportcheckf(err, "reading message")
615 }
616 if len(line) > 0 {
617 if !bytes.HasSuffix(line, []byte("\r\n")) {
618 line = append(line[:len(line)-1], "\r\n"...)
619 }
620
621 n, err := w.Write(line)
622 ximportcheckf(err, "writing message")
623 size += int64(n)
624 }
625 if err == io.EOF {
626 break
627 }
628 }
629 err = w.Flush()
630 ximportcheckf(err, "writing message")
631
632 var received time.Time
633 t := strings.SplitN(path.Base(filename), ".", 2)
634 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
635 received = time.Unix(v, 0)
636 }
637
638 // Parse flags. See https://cr.yp.to/proto/maildir.html.
639 var keepFlags string
640 var flags store.Flags
641 keywords := map[string]bool{}
642 t = strings.SplitN(path.Base(filename), ":2,", 2)
643 if len(t) == 2 {
644 for _, c := range t[1] {
645 switch c {
646 case 'P':
647 // Passed, doesn't map to a common IMAP flag.
648 case 'R':
649 flags.Answered = true
650 case 'S':
651 flags.Seen = true
652 case 'T':
653 flags.Deleted = true
654 case 'D':
655 flags.Draft = true
656 case 'F':
657 flags.Flagged = true
658 default:
659 if c >= 'a' && c <= 'z' {
660 dovecotKeywords, ok := mailboxKeywords[mailbox]
661 if !ok {
662 // No keywords file seen yet, we'll try later if it comes in.
663 keepFlags += string(c)
664 } else if kw, ok := dovecotKeywords[c]; ok {
665 flagSet(&flags, keywords, kw)
666 }
667 }
668 }
669 }
670 }
671
672 m := store.Message{
673 Received: received,
674 Flags: flags,
675 Keywords: maps.Keys(keywords),
676 Size: size,
677 }
678 xdeliver(mb, &m, f, filename)
679 f = nil
680 if keepFlags != "" {
681 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
682 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
683 }
684 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
685 }
686 }
687
688 importFile := func(name string, r io.Reader) {
689 origName := name
690
691 if strings.HasPrefix(name, skipMailboxPrefix) {
692 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
693 }
694
695 if strings.HasSuffix(name, "/") {
696 name = strings.TrimSuffix(name, "/")
697 dir := path.Dir(name)
698 switch path.Base(dir) {
699 case "new", "cur", "tmp":
700 // Maildir, ensure it exists.
701 mailbox := path.Dir(dir)
702 xensureMailbox(mailbox)
703 }
704 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
705 return
706 }
707
708 if strings.HasSuffix(path.Base(name), ".mbox") {
709 mailbox := name[:len(name)-len(".mbox")]
710 ximportMbox(mailbox, origName, r)
711 return
712 }
713 dir := path.Dir(name)
714 dirbase := path.Base(dir)
715 switch dirbase {
716 case "new", "cur", "tmp":
717 mailbox := path.Dir(dir)
718 ximportMaildir(mailbox, origName, r)
719 default:
720 if path.Base(name) == "dovecot-keywords" {
721 mailbox := path.Dir(name)
722 dovecotKeywords := map[rune]string{}
723 words, err := store.ParseDovecotKeywordsFlags(r, log)
724 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
725 for i, kw := range words {
726 dovecotKeywords['a'+rune(i)] = kw
727 }
728 mailboxKeywords[mailbox] = dovecotKeywords
729
730 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
731 var flags, zeroflags store.Flags
732 keywords := map[string]bool{}
733 for _, c := range chars {
734 kw, ok := dovecotKeywords[c]
735 if !ok {
736 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
737 continue
738 }
739 flagSet(&flags, keywords, kw)
740 }
741 if flags == zeroflags && len(keywords) == 0 {
742 continue
743 }
744
745 m := store.Message{ID: id}
746 err := tx.Get(&m)
747 ximportcheckf(err, "get imported message for flag update")
748
749 mc := destMailboxCounts[m.MailboxID]
750 mc.Sub(m.MailboxCounts())
751
752 oflags := m.Flags
753 m.Flags = m.Flags.Set(flags, flags)
754 m.Keywords = maps.Keys(keywords)
755 sort.Strings(m.Keywords)
756
757 mc.Add(m.MailboxCounts())
758 destMailboxCounts[m.MailboxID] = mc
759
760 if len(m.Keywords) > 0 {
761 if destMailboxKeywords[m.MailboxID] == nil {
762 destMailboxKeywords[m.MailboxID] = map[string]bool{}
763 }
764 for _, k := range m.Keywords {
765 destMailboxKeywords[m.MailboxID][k] = true
766 }
767 }
768
769 // We train before updating, training may set m.TrainedJunk.
770 if jf != nil && m.NeedsTraining() {
771 openTrainMessage(&m)
772 }
773 err = tx.Update(&m)
774 ximportcheckf(err, "updating message after flag update")
775 changes = append(changes, m.ChangeFlags(oflags))
776 }
777 delete(mailboxMissingKeywordMessages, mailbox)
778 } else {
779 problemf("unrecognized file %s (skipping)", origName)
780 }
781 }
782 }
783
784 if zr != nil {
785 for _, f := range zr.File {
786 if canceled() {
787 return
788 }
789 zf, err := f.Open()
790 if err != nil {
791 problemf("opening file %s in zip: %v", f.Name, err)
792 continue
793 }
794 importFile(f.Name, zf)
795 err = zf.Close()
796 log.Check(err, "closing file from zip")
797 }
798 } else {
799 for {
800 if canceled() {
801 return
802 }
803 h, err := tr.Next()
804 if err == io.EOF {
805 break
806 } else if err != nil {
807 problemf("reading next tar header: %v (aborting)", err)
808 return
809 }
810 importFile(h.Name, tr)
811 }
812 }
813
814 total := 0
815 for _, count := range messages {
816 total += count
817 }
818 log.Debug("messages imported", slog.Int("total", total))
819
820 // Send final update for count of last-imported mailbox.
821 if prevMailbox != "" {
822 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
823 }
824
825 // Match threads.
826 if len(deliveredIDs) > 0 {
827 sendEvent("step", importStep{"matching messages with threads"})
828 err = acc.AssignThreads(ctx, log, tx, deliveredIDs[0], 0, io.Discard)
829 ximportcheckf(err, "assigning messages to threads")
830 }
831
832 // Update mailboxes with counts and keywords.
833 for mbID, mc := range destMailboxCounts {
834 mb := store.Mailbox{ID: mbID}
835 err := tx.Get(&mb)
836 ximportcheckf(err, "loading mailbox for counts and keywords")
837
838 if mb.MailboxCounts != mc {
839 mb.MailboxCounts = mc
840 changes = append(changes, mb.ChangeCounts())
841 }
842
843 keywords := destMailboxKeywords[mb.ID]
844 var mbKwChanged bool
845 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(keywords))
846
847 err = tx.Update(&mb)
848 ximportcheckf(err, "updating mailbox count and keywords")
849 if mbKwChanged {
850 changes = append(changes, mb.ChangeKeywords())
851 }
852 }
853
854 err = acc.AddMessageSize(log, tx, addSize)
855 ximportcheckf(err, "updating disk usage after import")
856
857 err = tx.Commit()
858 tx = nil
859 ximportcheckf(err, "commit")
860 deliveredIDs = nil
861
862 if jf != nil {
863 if err := jf.Close(); err != nil {
864 problemf("saving changes of training junk filter: %v (continuing)", err)
865 log.Errorx("saving changes of training junk filter", err)
866 }
867 jf = nil
868 }
869
870 store.BroadcastChanges(acc, changes)
871 acc.Unlock()
872 err = acc.Close()
873 log.Check(err, "closing account after import")
874 acc = nil
875
876 sendEvent("done", importDone{})
877}
878
879func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
880 switch word {
881 case "forwarded", "$forwarded":
882 flags.Forwarded = true
883 case "junk", "$junk":
884 flags.Junk = true
885 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
886 flags.Notjunk = true
887 case "phishing", "$phishing":
888 flags.Phishing = true
889 case "mdnsent", "$mdnsent":
890 flags.MDNSent = true
891 default:
892 if err := store.CheckKeyword(word); err == nil {
893 keywords[word] = true
894 }
895 }
896}
897