1package webaccount
2
3import (
4 "archive/tar"
5 "archive/zip"
6 "bufio"
7 "bytes"
8 "compress/gzip"
9 "context"
10 cryptrand "crypto/rand"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "log/slog"
16 "maps"
17 "os"
18 "path"
19 "runtime/debug"
20 "slices"
21 "strconv"
22 "strings"
23 "time"
24
25 "golang.org/x/text/unicode/norm"
26
27 "github.com/mjl-/bstore"
28
29 "github.com/mjl-/mox/message"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/store"
34)
35
36type importListener struct {
37 Token string
38 Events chan importEvent
39 Register chan bool // Whether register is successful.
40}
41
42type importEvent struct {
43 Token string
44 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
45 Event any // nil, importCount, importProblem, importDone, importAborted
46 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
47}
48
49type importAbortRequest struct {
50 Token string
51 Response chan error
52}
53
54var importers = struct {
55 Register chan *importListener
56 Unregister chan *importListener
57 Events chan importEvent
58 Abort chan importAbortRequest
59 Stop chan struct{}
60}{
61 make(chan *importListener, 1),
62 make(chan *importListener, 1),
63 make(chan importEvent),
64 make(chan importAbortRequest),
65 make(chan struct{}),
66}
67
68// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
69func ImportManage() {
70 log := mlog.New("httpimport", nil)
71 defer func() {
72 if x := recover(); x != nil {
73 log.Error("import manage panic", slog.Any("err", x))
74 debug.PrintStack()
75 metrics.PanicInc(metrics.Importmanage)
76 }
77 }()
78
79 type state struct {
80 MailboxCounts map[string]int
81 Problems []string
82 Done *time.Time
83 Aborted *time.Time
84 Listeners map[*importListener]struct{}
85 Cancel func()
86 }
87
88 imports := map[string]state{} // Token to state.
89 for {
90 select {
91 case l := <-importers.Register:
92 // If we have state, send it so the client is up to date.
93 s, ok := imports[l.Token]
94 l.Register <- ok
95 if !ok {
96 break
97 }
98 s.Listeners[l] = struct{}{}
99
100 sendEvent := func(kind string, v any) {
101 buf, err := json.Marshal(v)
102 if err != nil {
103 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
104 return
105 }
106 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
107
108 select {
109 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
110 default:
111 log.Debug("dropped initial import event to slow consumer")
112 }
113 }
114
115 for m, c := range s.MailboxCounts {
116 sendEvent("count", importCount{m, c})
117 }
118 for _, p := range s.Problems {
119 sendEvent("problem", importProblem{p})
120 }
121 if s.Done != nil {
122 sendEvent("done", importDone{})
123 } else if s.Aborted != nil {
124 sendEvent("aborted", importAborted{})
125 }
126
127 case l := <-importers.Unregister:
128 delete(imports[l.Token].Listeners, l)
129
130 case e := <-importers.Events:
131 s, ok := imports[e.Token]
132 if !ok {
133 s = state{
134 MailboxCounts: map[string]int{},
135 Listeners: map[*importListener]struct{}{},
136 Cancel: e.Cancel,
137 }
138 imports[e.Token] = s
139 }
140 for l := range s.Listeners {
141 select {
142 case l.Events <- e:
143 default:
144 log.Debug("dropped import event to slow consumer")
145 }
146 }
147 if e.Event != nil {
148 s := imports[e.Token]
149 switch x := e.Event.(type) {
150 case importCount:
151 s.MailboxCounts[x.Mailbox] = x.Count
152 case importProblem:
153 s.Problems = append(s.Problems, x.Message)
154 case importDone:
155 now := time.Now()
156 s.Done = &now
157 case importAborted:
158 now := time.Now()
159 s.Aborted = &now
160 }
161 imports[e.Token] = s
162 }
163
164 case a := <-importers.Abort:
165 s, ok := imports[a.Token]
166 if !ok {
167 a.Response <- errors.New("import not found")
168 return
169 }
170 if s.Done != nil {
171 a.Response <- errors.New("import already finished")
172 return
173 }
174 s.Cancel()
175 a.Response <- nil
176
177 case <-importers.Stop:
178 return
179 }
180
181 // Cleanup old state.
182 for t, s := range imports {
183 if len(s.Listeners) > 0 {
184 continue
185 }
186 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
187 delete(imports, t)
188 }
189 }
190 }
191}
192
193type importCount struct {
194 Mailbox string
195 Count int
196}
197type importProblem struct {
198 Message string
199}
200type importDone struct{}
201type importAborted struct{}
202type importStep struct {
203 Title string
204}
205
206// importStart prepare the import and launches the goroutine to actually import.
207// importStart is responsible for closing f and removing f.
208func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
209 defer func() {
210 if f != nil {
211 store.CloseRemoveTempFile(log, f, "upload for import")
212 }
213 }()
214
215 buf := make([]byte, 16)
216 if _, err := cryptrand.Read(buf); err != nil {
217 return "", false, err
218 }
219 token := fmt.Sprintf("%x", buf)
220
221 if _, err := f.Seek(0, 0); err != nil {
222 return "", false, fmt.Errorf("seek to start of file: %v", err)
223 }
224
225 // Recognize file format.
226 var iszip bool
227 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
228 magicGzip := []byte{0x1f, 0x8b}
229 magic := make([]byte, 4)
230 if _, err := f.ReadAt(magic, 0); err != nil {
231 return "", true, fmt.Errorf("detecting file format: %v", err)
232 }
233 if bytes.Equal(magic, magicZip) {
234 iszip = true
235 } else if !bytes.Equal(magic[:2], magicGzip) {
236 return "", true, fmt.Errorf("file is not a zip or gzip file")
237 }
238
239 var zr *zip.Reader
240 var tr *tar.Reader
241 if iszip {
242 fi, err := f.Stat()
243 if err != nil {
244 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
245 }
246 zr, err = zip.NewReader(f, fi.Size())
247 if err != nil {
248 return "", true, fmt.Errorf("opening zip file: %v", err)
249 }
250 } else {
251 gzr, err := gzip.NewReader(f)
252 if err != nil {
253 return "", true, fmt.Errorf("gunzip: %v", err)
254 }
255 tr = tar.NewReader(gzr)
256 }
257
258 acc, err := store.OpenAccount(log, accName, false)
259 if err != nil {
260 return "", false, fmt.Errorf("open acount: %v", err)
261 }
262 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
263
264 tx, err := acc.DB.Begin(context.Background(), true)
265 if err != nil {
266 acc.Unlock()
267 xerr := acc.Close()
268 log.Check(xerr, "closing account")
269 return "", false, fmt.Errorf("start transaction: %v", err)
270 }
271
272 // Ensure token is registered before returning, with context that can be canceled.
273 ctx, cancel := context.WithCancel(mox.Shutdown)
274 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
275
276 log.Info("starting import")
277 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
278 f = nil // importMessages is now responsible for closing and removing.
279
280 return token, false, nil
281}
282
283// importMessages imports the messages from zip/tgz file f.
284// importMessages is responsible for unlocking and closing acc, and closing tx and f.
285func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
286 // If a fatal processing error occurs, we panic with this type.
287 type importError struct{ Err error }
288
289 // During import we collect all changes and broadcast them at the end, when successful.
290 var changes []store.Change
291
292 // ID's of delivered messages. If we have to rollback, we have to remove this files.
293 var newIDs []int64
294
295 sendEvent := func(kind string, v any) {
296 buf, err := json.Marshal(v)
297 if err != nil {
298 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
299 return
300 }
301 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
302 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
303 }
304
305 canceled := func() bool {
306 select {
307 case <-ctx.Done():
308 sendEvent("aborted", importAborted{})
309 return true
310 default:
311 return false
312 }
313 }
314
315 problemf := func(format string, args ...any) {
316 msg := fmt.Sprintf(format, args...)
317 sendEvent("problem", importProblem{Message: msg})
318 }
319
320 defer func() {
321 store.CloseRemoveTempFile(log, f, "uploaded messages")
322
323 for _, id := range newIDs {
324 p := acc.MessagePath(id)
325 err := os.Remove(p)
326 log.Check(err, "closing message file after import error", slog.String("path", p))
327 }
328 if tx != nil {
329 err := tx.Rollback()
330 log.Check(err, "rolling back transaction")
331 }
332 if acc != nil {
333 acc.Unlock()
334 err := acc.Close()
335 log.Check(err, "closing account")
336 }
337
338 x := recover()
339 if x == nil {
340 return
341 }
342 if err, ok := x.(importError); ok {
343 log.Errorx("import error", err.Err)
344 problemf("%s (aborting)", err.Err)
345 sendEvent("aborted", importAborted{})
346 } else {
347 log.Error("import panic", slog.Any("err", x))
348 debug.PrintStack()
349 metrics.PanicInc(metrics.Importmessages)
350 }
351 }()
352
353 ximportcheckf := func(err error, format string, args ...any) {
354 if err != nil {
355 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
356 }
357 }
358
359 err := acc.ThreadingWait(log)
360 ximportcheckf(err, "waiting for account thread upgrade")
361
362 conf, _ := acc.Conf()
363
364 jf, _, err := acc.OpenJunkFilter(ctx, log)
365 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
366 ximportcheckf(err, "open junk filter")
367 }
368 defer func() {
369 if jf != nil {
370 err := jf.CloseDiscard()
371 log.Check(err, "closing junk filter")
372 }
373 }()
374
375 // Mailboxes we imported, and message counts.
376 mailboxNames := map[string]*store.Mailbox{}
377 mailboxIDs := map[int64]*store.Mailbox{}
378 mailboxKeywordCounts := map[int64]int{}
379 messages := map[string]int{}
380
381 maxSize := acc.QuotaMessageSize()
382 du := store.DiskUsage{ID: 1}
383 err = tx.Get(&du)
384 ximportcheckf(err, "get disk usage")
385 var addSize int64
386
387 // For maildirs, we are likely to get a possible dovecot-keywords file after having
388 // imported the messages. Once we see the keywords, we use them. But before that
389 // time we remember which messages miss keywords. Once the keywords become
390 // available, we'll fix up the flags for the unknown messages
391 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
392 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
393
394 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
395 // another 100 messages were added, when adding a message to another mailbox, and
396 // finally at the end as a closing statement.
397 var prevMailbox string
398
399 var modseq store.ModSeq // Assigned on first message, used for all messages.
400
401 trainMessage := func(m *store.Message, p message.Part, pos string) {
402 words, err := jf.ParseMessage(p)
403 if err != nil {
404 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
405 return
406 }
407 err = jf.Train(ctx, !m.Junk, words)
408 if err != nil {
409 problemf("training junk filter for message %s: %v (continuing)", pos, err)
410 return
411 }
412 m.TrainedJunk = &m.Junk
413 }
414
415 openTrainMessage := func(m *store.Message) {
416 path := acc.MessagePath(m.ID)
417 f, err := os.Open(path)
418 if err != nil {
419 problemf("opening message again for training junk filter: %v (continuing)", err)
420 return
421 }
422 defer func() {
423 err := f.Close()
424 log.Check(err, "closing file after training junkfilter")
425 }()
426 p, err := m.LoadPart(f)
427 if err != nil {
428 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
429 return
430 }
431 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
432 }
433
434 xensureMailbox := func(name string) *store.Mailbox {
435 // Ensure name is normalized.
436 name = norm.NFC.String(name)
437 name, _, err := store.CheckMailboxName(name, true)
438 ximportcheckf(err, "checking mailbox name")
439
440 if mb, ok := mailboxNames[name]; ok {
441 return mb
442 }
443
444 var p string
445 var mb *store.Mailbox
446 var parent store.Mailbox
447 for i, e := range strings.Split(name, "/") {
448 if i == 0 {
449 p = e
450 } else {
451 p = path.Join(p, e)
452 }
453 if _, ok := mailboxNames[p]; ok {
454 continue
455 }
456
457 mb, err = acc.MailboxFind(tx, p)
458 ximportcheckf(err, "looking up mailbox %s to import to (aborting)", p)
459 if mb == nil {
460 uidvalidity, err := acc.NextUIDValidity(tx)
461 ximportcheckf(err, "finding next uid validity")
462
463 if modseq == 0 {
464 var err error
465 modseq, err = acc.NextModSeq(tx)
466 ximportcheckf(err, "assigning next modseq")
467 }
468
469 mb = &store.Mailbox{
470 CreateSeq: modseq,
471 ModSeq: modseq,
472 ParentID: parent.ID,
473 Name: p,
474 UIDValidity: uidvalidity,
475 UIDNext: 1,
476 HaveCounts: true,
477 // Do not assign special-use flags. This existing account probably already has such mailboxes.
478 }
479 err = tx.Insert(mb)
480 ximportcheckf(err, "inserting mailbox in database")
481 parent = *mb
482
483 if tx.Get(&store.Subscription{Name: p}) != nil {
484 err := tx.Insert(&store.Subscription{Name: p})
485 ximportcheckf(err, "subscribing to imported mailbox")
486 }
487 changes = append(changes, store.ChangeAddMailbox{Mailbox: *mb, Flags: []string{`\Subscribed`}})
488 }
489 if prevMailbox != "" && mb.Name != prevMailbox {
490 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
491 }
492 mailboxKeywordCounts[mb.ID] = len(mb.Keywords)
493 mailboxNames[mb.Name] = mb
494 mailboxIDs[mb.ID] = mb
495 sendEvent("count", importCount{mb.Name, 0})
496 prevMailbox = mb.Name
497 }
498 return mb
499 }
500
501 xdeliver := func(mb *store.Mailbox, m *store.Message, f *os.File, pos string) {
502 defer store.CloseRemoveTempFile(log, f, "message file for import")
503 m.MailboxID = mb.ID
504 m.MailboxOrigID = mb.ID
505
506 addSize += m.Size
507 if maxSize > 0 && du.MessageSize+addSize > maxSize {
508 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
509 }
510
511 if modseq == 0 {
512 var err error
513 modseq, err = acc.NextModSeq(tx)
514 ximportcheckf(err, "assigning next modseq")
515 }
516 m.CreateSeq = modseq
517 m.ModSeq = modseq
518
519 // Parse message and store parsed information for later fast retrieval.
520 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
521 if err != nil {
522 problemf("parsing message %s: %s (continuing)", pos, err)
523 }
524 m.ParsedBuf, err = json.Marshal(p)
525 ximportcheckf(err, "marshal parsed message structure")
526
527 // Set fields needed for future threading. By doing it now, MessageAdd won't
528 // have to parse the Part again.
529 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
530 m.PrepareThreading(log, &p)
531
532 if m.Received.IsZero() {
533 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
534 m.Received = p.Envelope.Date
535 } else {
536 m.Received = time.Now()
537 }
538 }
539
540 // We set the flags that Deliver would set now and train ourselves. This prevents
541 // Deliver from training, which would open the junk filter, change it, and write it
542 // back to disk, for each message (slow).
543 m.JunkFlagsForMailbox(*mb, conf)
544 if jf != nil && m.NeedsTraining() {
545 trainMessage(m, p, pos)
546 }
547
548 opts := store.AddOpts{
549 SkipDirSync: true,
550 SkipTraining: true,
551 SkipThreads: true,
552 SkipUpdateDiskUsage: true,
553 SkipCheckQuota: true,
554 SkipPreview: true,
555 }
556 if err := acc.MessageAdd(log, tx, mb, m, f, opts); err != nil {
557 problemf("delivering message %s: %s (continuing)", pos, err)
558 return
559 }
560 newIDs = append(newIDs, m.ID)
561 changes = append(changes, m.ChangeAddUID())
562 messages[mb.Name]++
563 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
564 prevMailbox = mb.Name
565 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
566 }
567 }
568
569 ximportMbox := func(mailbox, filename string, r io.Reader) {
570 if mailbox == "" {
571 problemf("empty mailbox name for mbox file %s (skipping)", filename)
572 return
573 }
574 mb := xensureMailbox(mailbox)
575
576 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
577 for {
578 m, mf, pos, err := mr.Next()
579 if err == io.EOF {
580 break
581 } else if err != nil {
582 ximportcheckf(err, "next message in mbox file")
583 }
584
585 xdeliver(mb, m, mf, pos)
586 }
587 }
588
589 ximportMaildir := func(mailbox, filename string, r io.Reader) {
590 if mailbox == "" {
591 problemf("empty mailbox name for maildir file %s (skipping)", filename)
592 return
593 }
594 mb := xensureMailbox(mailbox)
595
596 f, err := store.CreateMessageTemp(log, "import")
597 ximportcheckf(err, "creating temp message")
598 defer func() {
599 if f != nil {
600 store.CloseRemoveTempFile(log, f, "message to import")
601 }
602 }()
603
604 // Copy data, changing bare \n into \r\n.
605 br := bufio.NewReader(r)
606 w := bufio.NewWriter(f)
607 var size int64
608 for {
609 line, err := br.ReadBytes('\n')
610 if err != nil && err != io.EOF {
611 ximportcheckf(err, "reading message")
612 }
613 if len(line) > 0 {
614 if !bytes.HasSuffix(line, []byte("\r\n")) {
615 line = append(line[:len(line)-1], "\r\n"...)
616 }
617
618 n, err := w.Write(line)
619 ximportcheckf(err, "writing message")
620 size += int64(n)
621 }
622 if err == io.EOF {
623 break
624 }
625 }
626 err = w.Flush()
627 ximportcheckf(err, "writing message")
628
629 var received time.Time
630 t := strings.SplitN(path.Base(filename), ".", 2)
631 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
632 received = time.Unix(v, 0)
633 }
634
635 // Parse flags. See https://cr.yp.to/proto/maildir.html.
636 var keepFlags string
637 var flags store.Flags
638 keywords := map[string]bool{}
639 t = strings.SplitN(path.Base(filename), ":2,", 2)
640 if len(t) == 2 {
641 for _, c := range t[1] {
642 switch c {
643 case 'P':
644 // Passed, doesn't map to a common IMAP flag.
645 case 'R':
646 flags.Answered = true
647 case 'S':
648 flags.Seen = true
649 case 'T':
650 flags.Deleted = true
651 case 'D':
652 flags.Draft = true
653 case 'F':
654 flags.Flagged = true
655 default:
656 if c >= 'a' && c <= 'z' {
657 dovecotKeywords, ok := mailboxKeywords[mailbox]
658 if !ok {
659 // No keywords file seen yet, we'll try later if it comes in.
660 keepFlags += string(c)
661 } else if kw, ok := dovecotKeywords[c]; ok {
662 flagSet(&flags, keywords, kw)
663 }
664 }
665 }
666 }
667 }
668
669 m := store.Message{
670 Received: received,
671 Flags: flags,
672 Keywords: slices.Sorted(maps.Keys(keywords)),
673 Size: size,
674 }
675 xdeliver(mb, &m, f, filename)
676 f = nil
677 if keepFlags != "" {
678 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
679 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
680 }
681 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
682 }
683 }
684
685 importFile := func(name string, r io.Reader) {
686 origName := name
687
688 if strings.HasPrefix(name, skipMailboxPrefix) {
689 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
690 }
691
692 if strings.HasSuffix(name, "/") {
693 name = strings.TrimSuffix(name, "/")
694 dir := path.Dir(name)
695 switch path.Base(dir) {
696 case "new", "cur", "tmp":
697 // Maildir, ensure it exists.
698 mailbox := path.Dir(dir)
699 xensureMailbox(mailbox)
700 }
701 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
702 return
703 }
704
705 if strings.HasSuffix(path.Base(name), ".mbox") {
706 mailbox := name[:len(name)-len(".mbox")]
707 ximportMbox(mailbox, origName, r)
708 return
709 }
710 dir := path.Dir(name)
711 dirbase := path.Base(dir)
712 switch dirbase {
713 case "new", "cur", "tmp":
714 mailbox := path.Dir(dir)
715 ximportMaildir(mailbox, origName, r)
716 return
717 }
718
719 if path.Base(name) != "dovecot-keywords" {
720 problemf("unrecognized file %s (skipping)", origName)
721 return
722 }
723
724 // Handle dovecot-keywords.
725 mailbox := path.Dir(name)
726 dovecotKeywords := map[rune]string{}
727 words, err := store.ParseDovecotKeywordsFlags(r, log)
728 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
729 for i, kw := range words {
730 dovecotKeywords['a'+rune(i)] = kw
731 }
732 mailboxKeywords[mailbox] = dovecotKeywords
733
734 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
735 var flags, zeroflags store.Flags
736 keywords := map[string]bool{}
737 for _, c := range chars {
738 kw, ok := dovecotKeywords[c]
739 if !ok {
740 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
741 continue
742 }
743 flagSet(&flags, keywords, kw)
744 }
745 if flags == zeroflags && len(keywords) == 0 {
746 continue
747 }
748
749 m := store.Message{ID: id}
750 err := tx.Get(&m)
751 ximportcheckf(err, "get imported message for flag update")
752
753 mb := mailboxIDs[m.MailboxID]
754 mb.Sub(m.MailboxCounts())
755
756 oflags := m.Flags
757 m.Flags = m.Flags.Set(flags, flags)
758 m.Keywords = slices.Sorted(maps.Keys(keywords))
759
760 mb.Add(m.MailboxCounts())
761
762 mb.Keywords, _ = store.MergeKeywords(mb.Keywords, m.Keywords)
763
764 // We train before updating, training may set m.TrainedJunk.
765 if jf != nil && m.NeedsTraining() {
766 openTrainMessage(&m)
767 }
768 err = tx.Update(&m)
769 ximportcheckf(err, "updating message after flag update")
770 changes = append(changes, m.ChangeFlags(oflags))
771 }
772 delete(mailboxMissingKeywordMessages, mailbox)
773 }
774
775 if zr != nil {
776 for _, f := range zr.File {
777 if canceled() {
778 return
779 }
780 zf, err := f.Open()
781 if err != nil {
782 problemf("opening file %s in zip: %v", f.Name, err)
783 continue
784 }
785 importFile(f.Name, zf)
786 err = zf.Close()
787 log.Check(err, "closing file from zip")
788 }
789 } else {
790 for {
791 if canceled() {
792 return
793 }
794 h, err := tr.Next()
795 if err == io.EOF {
796 break
797 } else if err != nil {
798 problemf("reading next tar header: %v (aborting)", err)
799 return
800 }
801 importFile(h.Name, tr)
802 }
803 }
804
805 total := 0
806 for _, count := range messages {
807 total += count
808 }
809 log.Debug("messages imported", slog.Int("total", total))
810
811 // Send final update for count of last-imported mailbox.
812 if prevMailbox != "" {
813 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
814 }
815
816 // Match threads.
817 if len(newIDs) > 0 {
818 sendEvent("step", importStep{"matching messages with threads"})
819 err = acc.AssignThreads(ctx, log, tx, newIDs[0], 0, io.Discard)
820 ximportcheckf(err, "assigning messages to threads")
821 }
822
823 // Update mailboxes with counts and keywords.
824 for _, mb := range mailboxIDs {
825 err = tx.Update(mb)
826 ximportcheckf(err, "updating mailbox count and keywords")
827
828 changes = append(changes, mb.ChangeCounts())
829 if len(mb.Keywords) != mailboxKeywordCounts[mb.ID] {
830 changes = append(changes, mb.ChangeKeywords())
831 }
832 }
833
834 err = acc.AddMessageSize(log, tx, addSize)
835 ximportcheckf(err, "updating disk usage after import")
836
837 err = tx.Commit()
838 tx = nil
839 ximportcheckf(err, "commit")
840 newIDs = nil
841
842 if jf != nil {
843 if err := jf.Close(); err != nil {
844 problemf("saving changes of training junk filter: %v (continuing)", err)
845 log.Errorx("saving changes of training junk filter", err)
846 }
847 jf = nil
848 }
849
850 store.BroadcastChanges(acc, changes)
851 acc.Unlock()
852 err = acc.Close()
853 log.Check(err, "closing account after import")
854 acc = nil
855
856 sendEvent("done", importDone{})
857}
858
859func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
860 switch word {
861 case "forwarded", "$forwarded":
862 flags.Forwarded = true
863 case "junk", "$junk":
864 flags.Junk = true
865 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
866 flags.Notjunk = true
867 case "phishing", "$phishing":
868 flags.Phishing = true
869 case "mdnsent", "$mdnsent":
870 flags.MDNSent = true
871 default:
872 if err := store.CheckKeyword(word); err == nil {
873 keywords[word] = true
874 }
875 }
876}
877