1package webaccount
2
3import (
4 "archive/tar"
5 "archive/zip"
6 "bufio"
7 "bytes"
8 "compress/gzip"
9 "context"
10 cryptrand "crypto/rand"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "log/slog"
16 "os"
17 "path"
18 "runtime/debug"
19 "sort"
20 "strconv"
21 "strings"
22 "time"
23
24 "golang.org/x/exp/maps"
25
26 "golang.org/x/text/unicode/norm"
27
28 "github.com/mjl-/bstore"
29
30 "github.com/mjl-/mox/message"
31 "github.com/mjl-/mox/metrics"
32 "github.com/mjl-/mox/mlog"
33 "github.com/mjl-/mox/mox-"
34 "github.com/mjl-/mox/store"
35)
36
37type importListener struct {
38 Token string
39 Events chan importEvent
40 Register chan bool // Whether register is successful.
41}
42
43type importEvent struct {
44 Token string
45 SSEMsg []byte // Full SSE message, including event: ... and data: ... \n\n
46 Event any // nil, importCount, importProblem, importDone, importAborted
47 Cancel func() // For cancelling the context causing abort of the import. Set in first, import-registering, event.
48}
49
50type importAbortRequest struct {
51 Token string
52 Response chan error
53}
54
55var importers = struct {
56 Register chan *importListener
57 Unregister chan *importListener
58 Events chan importEvent
59 Abort chan importAbortRequest
60 Stop chan struct{}
61}{
62 make(chan *importListener, 1),
63 make(chan *importListener, 1),
64 make(chan importEvent),
65 make(chan importAbortRequest),
66 make(chan struct{}),
67}
68
69// ImportManage should be run as a goroutine, it manages imports of mboxes/maildirs, propagating progress over SSE connections.
70func ImportManage() {
71 log := mlog.New("httpimport", nil)
72 defer func() {
73 if x := recover(); x != nil {
74 log.Error("import manage panic", slog.Any("err", x))
75 debug.PrintStack()
76 metrics.PanicInc(metrics.Importmanage)
77 }
78 }()
79
80 type state struct {
81 MailboxCounts map[string]int
82 Problems []string
83 Done *time.Time
84 Aborted *time.Time
85 Listeners map[*importListener]struct{}
86 Cancel func()
87 }
88
89 imports := map[string]state{} // Token to state.
90 for {
91 select {
92 case l := <-importers.Register:
93 // If we have state, send it so the client is up to date.
94 s, ok := imports[l.Token]
95 l.Register <- ok
96 if !ok {
97 break
98 }
99 s.Listeners[l] = struct{}{}
100
101 sendEvent := func(kind string, v any) {
102 buf, err := json.Marshal(v)
103 if err != nil {
104 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
105 return
106 }
107 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
108
109 select {
110 case l.Events <- importEvent{kind, []byte(ssemsg), nil, nil}:
111 default:
112 log.Debug("dropped initial import event to slow consumer")
113 }
114 }
115
116 for m, c := range s.MailboxCounts {
117 sendEvent("count", importCount{m, c})
118 }
119 for _, p := range s.Problems {
120 sendEvent("problem", importProblem{p})
121 }
122 if s.Done != nil {
123 sendEvent("done", importDone{})
124 } else if s.Aborted != nil {
125 sendEvent("aborted", importAborted{})
126 }
127
128 case l := <-importers.Unregister:
129 delete(imports[l.Token].Listeners, l)
130
131 case e := <-importers.Events:
132 s, ok := imports[e.Token]
133 if !ok {
134 s = state{
135 MailboxCounts: map[string]int{},
136 Listeners: map[*importListener]struct{}{},
137 Cancel: e.Cancel,
138 }
139 imports[e.Token] = s
140 }
141 for l := range s.Listeners {
142 select {
143 case l.Events <- e:
144 default:
145 log.Debug("dropped import event to slow consumer")
146 }
147 }
148 if e.Event != nil {
149 s := imports[e.Token]
150 switch x := e.Event.(type) {
151 case importCount:
152 s.MailboxCounts[x.Mailbox] = x.Count
153 case importProblem:
154 s.Problems = append(s.Problems, x.Message)
155 case importDone:
156 now := time.Now()
157 s.Done = &now
158 case importAborted:
159 now := time.Now()
160 s.Aborted = &now
161 }
162 imports[e.Token] = s
163 }
164
165 case a := <-importers.Abort:
166 s, ok := imports[a.Token]
167 if !ok {
168 a.Response <- errors.New("import not found")
169 return
170 }
171 if s.Done != nil {
172 a.Response <- errors.New("import already finished")
173 return
174 }
175 s.Cancel()
176 a.Response <- nil
177
178 case <-importers.Stop:
179 return
180 }
181
182 // Cleanup old state.
183 for t, s := range imports {
184 if len(s.Listeners) > 0 {
185 continue
186 }
187 if s.Done != nil && time.Since(*s.Done) > time.Minute || s.Aborted != nil && time.Since(*s.Aborted) > time.Minute {
188 delete(imports, t)
189 }
190 }
191 }
192}
193
194type importCount struct {
195 Mailbox string
196 Count int
197}
198type importProblem struct {
199 Message string
200}
201type importDone struct{}
202type importAborted struct{}
203type importStep struct {
204 Title string
205}
206
207// importStart prepare the import and launches the goroutine to actually import.
208// importStart is responsible for closing f and removing f.
209func importStart(log mlog.Log, accName string, f *os.File, skipMailboxPrefix string) (string, bool, error) {
210 defer func() {
211 if f != nil {
212 store.CloseRemoveTempFile(log, f, "upload for import")
213 }
214 }()
215
216 buf := make([]byte, 16)
217 if _, err := cryptrand.Read(buf); err != nil {
218 return "", false, err
219 }
220 token := fmt.Sprintf("%x", buf)
221
222 if _, err := f.Seek(0, 0); err != nil {
223 return "", false, fmt.Errorf("seek to start of file: %v", err)
224 }
225
226 // Recognize file format.
227 var iszip bool
228 magicZip := []byte{0x50, 0x4b, 0x03, 0x04}
229 magicGzip := []byte{0x1f, 0x8b}
230 magic := make([]byte, 4)
231 if _, err := f.ReadAt(magic, 0); err != nil {
232 return "", true, fmt.Errorf("detecting file format: %v", err)
233 }
234 if bytes.Equal(magic, magicZip) {
235 iszip = true
236 } else if !bytes.Equal(magic[:2], magicGzip) {
237 return "", true, fmt.Errorf("file is not a zip or gzip file")
238 }
239
240 var zr *zip.Reader
241 var tr *tar.Reader
242 if iszip {
243 fi, err := f.Stat()
244 if err != nil {
245 return "", false, fmt.Errorf("stat temporary import zip file: %v", err)
246 }
247 zr, err = zip.NewReader(f, fi.Size())
248 if err != nil {
249 return "", true, fmt.Errorf("opening zip file: %v", err)
250 }
251 } else {
252 gzr, err := gzip.NewReader(f)
253 if err != nil {
254 return "", true, fmt.Errorf("gunzip: %v", err)
255 }
256 tr = tar.NewReader(gzr)
257 }
258
259 acc, err := store.OpenAccount(log, accName)
260 if err != nil {
261 return "", false, fmt.Errorf("open acount: %v", err)
262 }
263 acc.Lock() // Not using WithWLock because importMessage is responsible for unlocking.
264
265 tx, err := acc.DB.Begin(context.Background(), true)
266 if err != nil {
267 acc.Unlock()
268 xerr := acc.Close()
269 log.Check(xerr, "closing account")
270 return "", false, fmt.Errorf("start transaction: %v", err)
271 }
272
273 // Ensure token is registered before returning, with context that can be canceled.
274 ctx, cancel := context.WithCancel(mox.Shutdown)
275 importers.Events <- importEvent{token, []byte(": keepalive\n\n"), nil, cancel}
276
277 log.Info("starting import")
278 go importMessages(ctx, log.WithCid(mox.Cid()), token, acc, tx, zr, tr, f, skipMailboxPrefix)
279 f = nil // importMessages is now responsible for closing and removing.
280
281 return token, false, nil
282}
283
284// importMessages imports the messages from zip/tgz file f.
285// importMessages is responsible for unlocking and closing acc, and closing tx and f.
286func importMessages(ctx context.Context, log mlog.Log, token string, acc *store.Account, tx *bstore.Tx, zr *zip.Reader, tr *tar.Reader, f *os.File, skipMailboxPrefix string) {
287 // If a fatal processing error occurs, we panic with this type.
288 type importError struct{ Err error }
289
290 // During import we collect all changes and broadcast them at the end, when successful.
291 var changes []store.Change
292
293 // ID's of delivered messages. If we have to rollback, we have to remove this files.
294 var deliveredIDs []int64
295
296 sendEvent := func(kind string, v any) {
297 buf, err := json.Marshal(v)
298 if err != nil {
299 log.Errorx("marshal event", err, slog.String("kind", kind), slog.Any("event", v))
300 return
301 }
302 ssemsg := fmt.Sprintf("event: %s\ndata: %s\n\n", kind, buf)
303 importers.Events <- importEvent{token, []byte(ssemsg), v, nil}
304 }
305
306 canceled := func() bool {
307 select {
308 case <-ctx.Done():
309 sendEvent("aborted", importAborted{})
310 return true
311 default:
312 return false
313 }
314 }
315
316 problemf := func(format string, args ...any) {
317 msg := fmt.Sprintf(format, args...)
318 sendEvent("problem", importProblem{Message: msg})
319 }
320
321 defer func() {
322 store.CloseRemoveTempFile(log, f, "uploaded messages")
323
324 for _, id := range deliveredIDs {
325 p := acc.MessagePath(id)
326 err := os.Remove(p)
327 log.Check(err, "closing message file after import error", slog.String("path", p))
328 }
329 if tx != nil {
330 err := tx.Rollback()
331 log.Check(err, "rolling back transaction")
332 }
333 if acc != nil {
334 acc.Unlock()
335 err := acc.Close()
336 log.Check(err, "closing account")
337 }
338
339 x := recover()
340 if x == nil {
341 return
342 }
343 if err, ok := x.(importError); ok {
344 log.Errorx("import error", err.Err)
345 problemf("%s (aborting)", err.Err)
346 sendEvent("aborted", importAborted{})
347 } else {
348 log.Error("import panic", slog.Any("err", x))
349 debug.PrintStack()
350 metrics.PanicInc(metrics.Importmessages)
351 }
352 }()
353
354 ximportcheckf := func(err error, format string, args ...any) {
355 if err != nil {
356 panic(importError{fmt.Errorf("%s: %s", fmt.Sprintf(format, args...), err)})
357 }
358 }
359
360 err := acc.ThreadingWait(log)
361 ximportcheckf(err, "waiting for account thread upgrade")
362
363 conf, _ := acc.Conf()
364
365 jf, _, err := acc.OpenJunkFilter(ctx, log)
366 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
367 ximportcheckf(err, "open junk filter")
368 }
369 defer func() {
370 if jf != nil {
371 err := jf.CloseDiscard()
372 log.Check(err, "closing junk filter")
373 }
374 }()
375
376 // Mailboxes we imported, and message counts.
377 mailboxes := map[string]store.Mailbox{}
378 messages := map[string]int{}
379
380 maxSize := acc.QuotaMessageSize()
381 du := store.DiskUsage{ID: 1}
382 err = tx.Get(&du)
383 ximportcheckf(err, "get disk usage")
384 var addSize int64
385
386 // For maildirs, we are likely to get a possible dovecot-keywords file after having
387 // imported the messages. Once we see the keywords, we use them. But before that
388 // time we remember which messages miss a keywords. Once the keywords become
389 // available, we'll fix up the flags for the unknown messages
390 mailboxKeywords := map[string]map[rune]string{} // Mailbox to 'a'-'z' to flag name.
391 mailboxMissingKeywordMessages := map[string]map[int64]string{} // Mailbox to message id to string consisting of the unrecognized flags.
392
393 // We keep the mailboxes we deliver to up to date with count and keywords (non-system flags).
394 destMailboxCounts := map[int64]store.MailboxCounts{}
395 destMailboxKeywords := map[int64]map[string]bool{}
396
397 // Previous mailbox an event was sent for. We send an event for new mailboxes, when
398 // another 100 messages were added, when adding a message to another mailbox, and
399 // finally at the end as a closing statement.
400 var prevMailbox string
401
402 var modseq store.ModSeq // Assigned on first message, used for all messages.
403
404 trainMessage := func(m *store.Message, p message.Part, pos string) {
405 words, err := jf.ParseMessage(p)
406 if err != nil {
407 problemf("parsing message %s for updating junk filter: %v (continuing)", pos, err)
408 return
409 }
410 err = jf.Train(ctx, !m.Junk, words)
411 if err != nil {
412 problemf("training junk filter for message %s: %v (continuing)", pos, err)
413 return
414 }
415 m.TrainedJunk = &m.Junk
416 }
417
418 openTrainMessage := func(m *store.Message) {
419 path := acc.MessagePath(m.ID)
420 f, err := os.Open(path)
421 if err != nil {
422 problemf("opening message again for training junk filter: %v (continuing)", err)
423 return
424 }
425 defer func() {
426 err := f.Close()
427 log.Check(err, "closing file after training junkfilter")
428 }()
429 p, err := m.LoadPart(f)
430 if err != nil {
431 problemf("loading parsed message again for training junk filter: %v (continuing)", err)
432 return
433 }
434 trainMessage(m, p, fmt.Sprintf("message id %d", m.ID))
435 }
436
437 xensureMailbox := func(name string) store.Mailbox {
438 name = norm.NFC.String(name)
439 if strings.ToLower(name) == "inbox" {
440 name = "Inbox"
441 }
442
443 if mb, ok := mailboxes[name]; ok {
444 return mb
445 }
446
447 var p string
448 var mb store.Mailbox
449 for i, e := range strings.Split(name, "/") {
450 if i == 0 {
451 p = e
452 } else {
453 p = path.Join(p, e)
454 }
455 if _, ok := mailboxes[p]; ok {
456 continue
457 }
458
459 q := bstore.QueryTx[store.Mailbox](tx)
460 q.FilterNonzero(store.Mailbox{Name: p})
461 var err error
462 mb, err = q.Get()
463 if err == bstore.ErrAbsent {
464 uidvalidity, err := acc.NextUIDValidity(tx)
465 ximportcheckf(err, "finding next uid validity")
466 mb = store.Mailbox{
467 Name: p,
468 UIDValidity: uidvalidity,
469 UIDNext: 1,
470 HaveCounts: true,
471 // Do not assign special-use flags. This existing account probably already has such mailboxes.
472 }
473 err = tx.Insert(&mb)
474 ximportcheckf(err, "inserting mailbox in database")
475
476 if tx.Get(&store.Subscription{Name: p}) != nil {
477 err := tx.Insert(&store.Subscription{Name: p})
478 ximportcheckf(err, "subscribing to imported mailbox")
479 }
480 changes = append(changes, store.ChangeAddMailbox{Mailbox: mb, Flags: []string{`\Subscribed`}})
481 } else if err != nil {
482 ximportcheckf(err, "creating mailbox %s (aborting)", p)
483 }
484 if prevMailbox != "" && mb.Name != prevMailbox {
485 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
486 }
487 mailboxes[mb.Name] = mb
488 sendEvent("count", importCount{mb.Name, 0})
489 prevMailbox = mb.Name
490 }
491 return mb
492 }
493
494 xdeliver := func(mb store.Mailbox, m *store.Message, f *os.File, pos string) {
495 defer func() {
496 name := f.Name()
497 err = f.Close()
498 log.Check(err, "closing temporary message file for delivery")
499 err := os.Remove(name)
500 log.Check(err, "removing temporary message file for delivery")
501 }()
502 m.MailboxID = mb.ID
503 m.MailboxOrigID = mb.ID
504
505 addSize += m.Size
506 if maxSize > 0 && du.MessageSize+addSize > maxSize {
507 ximportcheckf(fmt.Errorf("account over maximum total size %d", maxSize), "checking quota")
508 }
509
510 if modseq == 0 {
511 var err error
512 modseq, err = acc.NextModSeq(tx)
513 ximportcheckf(err, "assigning next modseq")
514 }
515 m.CreateSeq = modseq
516 m.ModSeq = modseq
517
518 mc := destMailboxCounts[mb.ID]
519 mc.Add(m.MailboxCounts())
520 destMailboxCounts[mb.ID] = mc
521
522 if len(m.Keywords) > 0 {
523 if destMailboxKeywords[mb.ID] == nil {
524 destMailboxKeywords[mb.ID] = map[string]bool{}
525 }
526 for _, k := range m.Keywords {
527 destMailboxKeywords[mb.ID][k] = true
528 }
529 }
530
531 // Parse message and store parsed information for later fast retrieval.
532 p, err := message.EnsurePart(log.Logger, false, f, m.Size)
533 if err != nil {
534 problemf("parsing message %s: %s (continuing)", pos, err)
535 }
536 m.ParsedBuf, err = json.Marshal(p)
537 ximportcheckf(err, "marshal parsed message structure")
538
539 // Set fields needed for future threading. By doing it now, DeliverMessage won't
540 // have to parse the Part again.
541 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, f))
542 m.PrepareThreading(log, &p)
543
544 if m.Received.IsZero() {
545 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
546 m.Received = p.Envelope.Date
547 } else {
548 m.Received = time.Now()
549 }
550 }
551
552 // We set the flags that Deliver would set now and train ourselves. This prevents
553 // Deliver from training, which would open the junk filter, change it, and write it
554 // back to disk, for each message (slow).
555 m.JunkFlagsForMailbox(mb, conf)
556 if jf != nil && m.NeedsTraining() {
557 trainMessage(m, p, pos)
558 }
559
560 const sync = false
561 const notrain = true
562 const nothreads = true
563 const updateDiskUsage = false
564 if err := acc.DeliverMessage(log, tx, m, f, sync, notrain, nothreads, updateDiskUsage); err != nil {
565 problemf("delivering message %s: %s (continuing)", pos, err)
566 return
567 }
568 deliveredIDs = append(deliveredIDs, m.ID)
569 changes = append(changes, m.ChangeAddUID())
570 messages[mb.Name]++
571 if messages[mb.Name]%100 == 0 || prevMailbox != mb.Name {
572 prevMailbox = mb.Name
573 sendEvent("count", importCount{mb.Name, messages[mb.Name]})
574 }
575 }
576
577 ximportMbox := func(mailbox, filename string, r io.Reader) {
578 if mailbox == "" {
579 problemf("empty mailbox name for mbox file %s (skipping)", filename)
580 return
581 }
582 mb := xensureMailbox(mailbox)
583
584 mr := store.NewMboxReader(log, store.CreateMessageTemp, filename, r)
585 for {
586 m, mf, pos, err := mr.Next()
587 if err == io.EOF {
588 break
589 } else if err != nil {
590 ximportcheckf(err, "next message in mbox file")
591 }
592
593 xdeliver(mb, m, mf, pos)
594 }
595 }
596
597 ximportMaildir := func(mailbox, filename string, r io.Reader) {
598 if mailbox == "" {
599 problemf("empty mailbox name for maildir file %s (skipping)", filename)
600 return
601 }
602 mb := xensureMailbox(mailbox)
603
604 f, err := store.CreateMessageTemp(log, "import")
605 ximportcheckf(err, "creating temp message")
606 defer func() {
607 if f != nil {
608 store.CloseRemoveTempFile(log, f, "message to import")
609 }
610 }()
611
612 // Copy data, changing bare \n into \r\n.
613 br := bufio.NewReader(r)
614 w := bufio.NewWriter(f)
615 var size int64
616 for {
617 line, err := br.ReadBytes('\n')
618 if err != nil && err != io.EOF {
619 ximportcheckf(err, "reading message")
620 }
621 if len(line) > 0 {
622 if !bytes.HasSuffix(line, []byte("\r\n")) {
623 line = append(line[:len(line)-1], "\r\n"...)
624 }
625
626 n, err := w.Write(line)
627 ximportcheckf(err, "writing message")
628 size += int64(n)
629 }
630 if err == io.EOF {
631 break
632 }
633 }
634 err = w.Flush()
635 ximportcheckf(err, "writing message")
636
637 var received time.Time
638 t := strings.SplitN(path.Base(filename), ".", 2)
639 if v, err := strconv.ParseInt(t[0], 10, 64); err == nil {
640 received = time.Unix(v, 0)
641 }
642
643 // Parse flags. See https://cr.yp.to/proto/maildir.html.
644 var keepFlags string
645 var flags store.Flags
646 keywords := map[string]bool{}
647 t = strings.SplitN(path.Base(filename), ":2,", 2)
648 if len(t) == 2 {
649 for _, c := range t[1] {
650 switch c {
651 case 'P':
652 // Passed, doesn't map to a common IMAP flag.
653 case 'R':
654 flags.Answered = true
655 case 'S':
656 flags.Seen = true
657 case 'T':
658 flags.Deleted = true
659 case 'D':
660 flags.Draft = true
661 case 'F':
662 flags.Flagged = true
663 default:
664 if c >= 'a' && c <= 'z' {
665 dovecotKeywords, ok := mailboxKeywords[mailbox]
666 if !ok {
667 // No keywords file seen yet, we'll try later if it comes in.
668 keepFlags += string(c)
669 } else if kw, ok := dovecotKeywords[c]; ok {
670 flagSet(&flags, keywords, kw)
671 }
672 }
673 }
674 }
675 }
676
677 m := store.Message{
678 Received: received,
679 Flags: flags,
680 Keywords: maps.Keys(keywords),
681 Size: size,
682 }
683 xdeliver(mb, &m, f, filename)
684 f = nil
685 if keepFlags != "" {
686 if _, ok := mailboxMissingKeywordMessages[mailbox]; !ok {
687 mailboxMissingKeywordMessages[mailbox] = map[int64]string{}
688 }
689 mailboxMissingKeywordMessages[mailbox][m.ID] = keepFlags
690 }
691 }
692
693 importFile := func(name string, r io.Reader) {
694 origName := name
695
696 if strings.HasPrefix(name, skipMailboxPrefix) {
697 name = strings.TrimPrefix(name[len(skipMailboxPrefix):], "/")
698 }
699
700 if strings.HasSuffix(name, "/") {
701 name = strings.TrimSuffix(name, "/")
702 dir := path.Dir(name)
703 switch path.Base(dir) {
704 case "new", "cur", "tmp":
705 // Maildir, ensure it exists.
706 mailbox := path.Dir(dir)
707 xensureMailbox(mailbox)
708 }
709 // Otherwise, this is just a directory that probably holds mbox files and maildirs.
710 return
711 }
712
713 if strings.HasSuffix(path.Base(name), ".mbox") {
714 mailbox := name[:len(name)-len(".mbox")]
715 ximportMbox(mailbox, origName, r)
716 return
717 }
718 dir := path.Dir(name)
719 dirbase := path.Base(dir)
720 switch dirbase {
721 case "new", "cur", "tmp":
722 mailbox := path.Dir(dir)
723 ximportMaildir(mailbox, origName, r)
724 default:
725 if path.Base(name) == "dovecot-keywords" {
726 mailbox := path.Dir(name)
727 dovecotKeywords := map[rune]string{}
728 words, err := store.ParseDovecotKeywordsFlags(r, log)
729 log.Check(err, "parsing dovecot keywords for mailbox", slog.String("mailbox", mailbox))
730 for i, kw := range words {
731 dovecotKeywords['a'+rune(i)] = kw
732 }
733 mailboxKeywords[mailbox] = dovecotKeywords
734
735 for id, chars := range mailboxMissingKeywordMessages[mailbox] {
736 var flags, zeroflags store.Flags
737 keywords := map[string]bool{}
738 for _, c := range chars {
739 kw, ok := dovecotKeywords[c]
740 if !ok {
741 problemf("unspecified dovecot message flag %c for message id %d (continuing)", c, id)
742 continue
743 }
744 flagSet(&flags, keywords, kw)
745 }
746 if flags == zeroflags && len(keywords) == 0 {
747 continue
748 }
749
750 m := store.Message{ID: id}
751 err := tx.Get(&m)
752 ximportcheckf(err, "get imported message for flag update")
753
754 mc := destMailboxCounts[m.MailboxID]
755 mc.Sub(m.MailboxCounts())
756
757 oflags := m.Flags
758 m.Flags = m.Flags.Set(flags, flags)
759 m.Keywords = maps.Keys(keywords)
760 sort.Strings(m.Keywords)
761
762 mc.Add(m.MailboxCounts())
763 destMailboxCounts[m.MailboxID] = mc
764
765 if len(m.Keywords) > 0 {
766 if destMailboxKeywords[m.MailboxID] == nil {
767 destMailboxKeywords[m.MailboxID] = map[string]bool{}
768 }
769 for _, k := range m.Keywords {
770 destMailboxKeywords[m.MailboxID][k] = true
771 }
772 }
773
774 // We train before updating, training may set m.TrainedJunk.
775 if jf != nil && m.NeedsTraining() {
776 openTrainMessage(&m)
777 }
778 err = tx.Update(&m)
779 ximportcheckf(err, "updating message after flag update")
780 changes = append(changes, m.ChangeFlags(oflags))
781 }
782 delete(mailboxMissingKeywordMessages, mailbox)
783 } else {
784 problemf("unrecognized file %s (skipping)", origName)
785 }
786 }
787 }
788
789 if zr != nil {
790 for _, f := range zr.File {
791 if canceled() {
792 return
793 }
794 zf, err := f.Open()
795 if err != nil {
796 problemf("opening file %s in zip: %v", f.Name, err)
797 continue
798 }
799 importFile(f.Name, zf)
800 err = zf.Close()
801 log.Check(err, "closing file from zip")
802 }
803 } else {
804 for {
805 if canceled() {
806 return
807 }
808 h, err := tr.Next()
809 if err == io.EOF {
810 break
811 } else if err != nil {
812 problemf("reading next tar header: %v (aborting)", err)
813 return
814 }
815 importFile(h.Name, tr)
816 }
817 }
818
819 total := 0
820 for _, count := range messages {
821 total += count
822 }
823 log.Debug("messages imported", slog.Int("total", total))
824
825 // Send final update for count of last-imported mailbox.
826 if prevMailbox != "" {
827 sendEvent("count", importCount{prevMailbox, messages[prevMailbox]})
828 }
829
830 // Match threads.
831 if len(deliveredIDs) > 0 {
832 sendEvent("step", importStep{"matching messages with threads"})
833 err = acc.AssignThreads(ctx, log, tx, deliveredIDs[0], 0, io.Discard)
834 ximportcheckf(err, "assigning messages to threads")
835 }
836
837 // Update mailboxes with counts and keywords.
838 for mbID, mc := range destMailboxCounts {
839 mb := store.Mailbox{ID: mbID}
840 err := tx.Get(&mb)
841 ximportcheckf(err, "loading mailbox for counts and keywords")
842
843 if mb.MailboxCounts != mc {
844 mb.MailboxCounts = mc
845 changes = append(changes, mb.ChangeCounts())
846 }
847
848 keywords := destMailboxKeywords[mb.ID]
849 var mbKwChanged bool
850 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(keywords))
851
852 err = tx.Update(&mb)
853 ximportcheckf(err, "updating mailbox count and keywords")
854 if mbKwChanged {
855 changes = append(changes, mb.ChangeKeywords())
856 }
857 }
858
859 err = acc.AddMessageSize(log, tx, addSize)
860 ximportcheckf(err, "updating disk usage after import")
861
862 err = tx.Commit()
863 tx = nil
864 ximportcheckf(err, "commit")
865 deliveredIDs = nil
866
867 if jf != nil {
868 if err := jf.Close(); err != nil {
869 problemf("saving changes of training junk filter: %v (continuing)", err)
870 log.Errorx("saving changes of training junk filter", err)
871 }
872 jf = nil
873 }
874
875 store.BroadcastChanges(acc, changes)
876 acc.Unlock()
877 err = acc.Close()
878 log.Check(err, "closing account after import")
879 acc = nil
880
881 sendEvent("done", importDone{})
882}
883
884func flagSet(flags *store.Flags, keywords map[string]bool, word string) {
885 switch word {
886 case "forwarded", "$forwarded":
887 flags.Forwarded = true
888 case "junk", "$junk":
889 flags.Junk = true
890 case "notjunk", "$notjunk", "nonjunk", "$nonjunk":
891 flags.Notjunk = true
892 case "phishing", "$phishing":
893 flags.Phishing = true
894 case "mdnsent", "$mdnsent":
895 flags.MDNSent = true
896 default:
897 if err := store.CheckKeyword(word); err == nil {
898 keywords[word] = true
899 }
900 }
901}
902