19 "golang.org/x/text/unicode/norm"
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/moxio"
26 "github.com/mjl-/mox/store"
29// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
31const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
32it must have access to the archive files. The default suggested systemd service
33file isolates mox from most of the file system, with only the "data/" directory
34accessible, so you may want to put the mbox/maildir archive files in a
35directory like "data/import/" to make it available to mox.
37By default, messages will train the junk filter based on their flags and, if
38"automatic junk flags" configuration is set, based on mailbox naming.
40If the destination mailbox is the Sent mailbox, the recipients of the messages
41are added to the message metadata, causing later incoming messages from these
42recipients to be accepted, unless other reputation signals prevent that.
44Users can also import mailboxes/messages through the account web page by
45uploading a zip or tgz file with mbox and/or maildirs.
47Messages are imported even if already present. Importing messages twice will
48result in duplicate messages.
51func cmdImportMaildir(c *cmd) {
52 c.params = "accountname mailboxname maildir"
53 c.help = `Import a maildir into an account.
55` + importCommonHelp + `
56Mailbox flags, like "seen", "answered", will be imported. An optional
57dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
64 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
67func cmdImportMbox(c *cmd) {
68 c.params = "accountname mailboxname mbox"
69 c.help = `Import an mbox into an account.
71Using mbox is not recommended, maildir is a better defined format.
79 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
82func cmdXImportMaildir(c *cmd) {
84 c.params = "accountdir mailboxname maildir"
85 c.help = `Import a maildir into an account by directly accessing the data directory.
88See "mox help import maildir" for details.
93func cmdXImportMbox(c *cmd) {
95 c.params = "accountdir mailboxname mbox"
96 c.help = `Import an mbox into an account by directly accessing the data directory.
98See "mox help import mbox" for details.
103func xcmdXImport(mbox bool, c *cmd) {
109 accountdir := args[0]
110 account := filepath.Base(accountdir)
112 // Set up the mox config so the account can be opened.
113 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
114 log.Fatalf("accountdir must be of the form .../accounts/<name>")
117 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
118 xcheckf(err, "making absolute datadir")
119 mox.ConfigStaticPath = "fake.conf"
120 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
121 mox.Conf.Dynamic.Accounts = map[string]config.Account{
124 defer store.Switchboard()()
126 cconn, sconn := net.Pipe()
127 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
128 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
129 go servectlcmd(context.Background(), &serverctl, 0, func() {})
131 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
134func ctlcmdImport(xctl *ctl, mbox bool, account, mailbox, src string) {
136 xctl.xwrite("importmbox")
138 xctl.xwrite("importmaildir")
141 if strings.EqualFold(mailbox, "Inbox") {
147 fmt.Fprintln(os.Stderr, "importing...")
150 if strings.HasPrefix(line, "progress ") {
151 n := line[len("progress "):]
152 fmt.Fprintf(os.Stderr, "%s...\n", n)
156 log.Fatalf("import, expected ok, got %q", line)
160 count := xctl.xread()
161 fmt.Fprintf(os.Stderr, "%s imported\n", count)
164func ximportctl(ctx context.Context, xctl *ctl, mbox bool) {
166 > "importmaildir" or "importmbox"
169 > src (mbox file or maildir directory)
171 < "progress" count (zero or more times, once for every 1000 messages)
172 < "ok" when done, or error
173 < count (of total imported messages, only if not error)
175 account := xctl.xread()
176 mailbox := xctl.xread()
183 xctl.log.Info("importing messages",
184 slog.String("kind", kind),
185 slog.String("account", account),
186 slog.String("mailbox", mailbox),
187 slog.String("source", src))
191 var mdnewf, mdcurf *os.File
192 var msgreader store.MsgSource
194 // Ensure normalized form.
195 mailbox = norm.NFC.String(mailbox)
196 mailbox, _, err = store.CheckMailboxName(mailbox, true)
197 xctl.xcheck(err, "checking mailbox name")
199 // Open account, creating a database file if it doesn't exist yet. It must be known
200 // in the configuration file.
201 a, err := store.OpenAccount(xctl.log, account, false)
202 xctl.xcheck(err, "opening account")
206 xctl.log.Check(err, "closing account after import")
210 err = a.ThreadingWait(xctl.log)
211 xctl.xcheck(err, "waiting for account thread upgrade")
216 xctl.log.Check(err, "closing mbox file after import")
219 err := mdnewf.Close()
220 xctl.log.Check(err, "closing maildir new after import")
223 err := mdcurf.Close()
224 xctl.log.Check(err, "closing maildir cur after import")
228 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
229 // starting with junk or spam is junk mail.
231 // First check if we can access the mbox/maildir.
232 // Mox needs to be able to access those files, the user running the import command
233 // may be a different user who can access the files.
235 mboxf, err = os.Open(src)
236 xctl.xcheck(err, "open mbox file")
237 msgreader = store.NewMboxReader(xctl.log, store.CreateMessageTemp, src, mboxf)
239 mdnewf, err = os.Open(filepath.Join(src, "new"))
240 xctl.xcheck(err, "open subdir new of maildir")
241 mdcurf, err = os.Open(filepath.Join(src, "cur"))
242 xctl.xcheck(err, "open subdir cur of maildir")
243 msgreader = store.NewMaildirReader(xctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
246 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
249 var changes []store.Change
251 tx, err := a.DB.Begin(ctx, true)
252 xctl.xcheck(err, "begin transaction")
256 xctl.log.Check(err, "rolling back transaction")
260 // All preparations done. Good to go.
263 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
272 xctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
274 metrics.PanicInc(metrics.Import)
276 xctl.log.Error("import error")
279 for _, id := range newIDs {
280 p := a.MessagePath(id)
282 xctl.log.Check(err, "closing message file after import error", slog.String("path", p))
286 xctl.xerror(fmt.Sprintf("import error: %v", x))
289 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
291 // Ensure mailbox exists.
293 mb, changes, err = a.MailboxEnsure(tx, mailbox, true, store.SpecialUse{}, &modseq)
294 xctl.xcheck(err, "ensuring mailbox exists")
296 nkeywords := len(mb.Keywords)
298 jf, _, err := a.OpenJunkFilter(ctx, xctl.log)
299 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
300 xctl.xcheck(err, "open junk filter")
304 err = jf.CloseDiscard()
305 xctl.xcheck(err, "close junk filter")
311 maxSize := a.QuotaMessageSize()
313 du := store.DiskUsage{ID: 1}
315 xctl.xcheck(err, "get disk usage")
317 msgDirs := map[string]struct{}{}
319 process := func(m *store.Message, msgf *os.File, origPath string) {
320 defer store.CloseRemoveTempFile(xctl.log, msgf, "message to import")
323 if maxSize > 0 && du.MessageSize+addSize > maxSize {
324 xctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
327 // Parse message and store parsed information for later fast retrieval.
328 p, err := message.EnsurePart(xctl.log.Logger, false, msgf, m.Size)
330 xctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
332 m.ParsedBuf, err = json.Marshal(p)
333 xctl.xcheck(err, "marshal parsed message structure")
335 // Set fields needed for future threading. By doing it now, MessageAdd won't
336 // have to parse the Part again.
337 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
338 m.PrepareThreading(xctl.log, &p)
340 if m.Received.IsZero() {
341 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
342 m.Received = p.Envelope.Date
344 m.Received = time.Now()
348 m.JunkFlagsForMailbox(mb, conf)
349 if jf != nil && m.NeedsTraining() {
350 if words, err := jf.ParseMessage(p); err != nil {
351 xctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
353 err = jf.Train(ctx, !m.Junk, words)
354 xctl.xcheck(err, "training junk filter")
355 m.TrainedJunk = &m.Junk
361 modseq, err = a.NextModSeq(tx)
362 xctl.xcheck(err, "assigning next modseq")
367 m.MailboxOrigID = mb.ID
371 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
372 opts := store.AddOpts{
375 SkipThreads: true, // We do this efficiently when we have all messages.
376 SkipUpdateDiskUsage: true, // We do this once at the end.
377 SkipCheckQuota: true, // We check before.
378 SkipPreview: true, // We'll do this on-demand when messages are requested. Saves time.
380 err = a.MessageAdd(xctl.log, tx, &mb, m, msgf, opts)
381 xctl.xcheck(err, "delivering message")
382 newIDs = append(newIDs, m.ID)
383 changes = append(changes, m.ChangeAddUID())
385 msgDirs[filepath.Dir(a.MessagePath(m.ID))] = struct{}{}
389 xctl.xwrite(fmt.Sprintf("progress %d", n))
394 m, msgf, origPath, err := msgreader.Next()
398 xctl.xcheck(err, "reading next message")
400 process(m, msgf, origPath)
405 err = a.AssignThreads(ctx, xctl.log, tx, newIDs[0], 0, io.Discard)
406 xctl.xcheck(err, "assigning messages to threads")
409 changes = append(changes, mb.ChangeCounts())
410 if nkeywords != len(mb.Keywords) {
411 changes = append(changes, mb.ChangeKeywords())
415 xctl.xcheck(err, "updating message counts and keywords in mailbox")
417 err = a.AddMessageSize(xctl.log, tx, addSize)
418 xctl.xcheck(err, "updating total message size")
420 for msgDir := range msgDirs {
421 err := moxio.SyncDir(xctl.log, msgDir)
422 xctl.xcheck(err, "sync dir")
427 xctl.log.Check(err, "close junk filter")
432 xctl.xcheck(err, "commit")
434 xctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs)))
437 store.BroadcastChanges(a, changes)
441 xctl.xcheck(err, "closing account")
445 xctl.xwrite(fmt.Sprintf("%d", n))