19 "golang.org/x/exp/maps"
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/store"
28// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
30const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
31it must have access to the archive files. The default suggested systemd service
32file isolates mox from most of the file system, with only the "data/" directory
33accessible, so you may want to put the mbox/maildir archive files in a
34directory like "data/import/" to make it available to mox.
36By default, messages will train the junk filter based on their flags and, if
37"automatic junk flags" configuration is set, based on mailbox naming.
39If the destination mailbox is the Sent mailbox, the recipients of the messages
40are added to the message metadata, causing later incoming messages from these
41recipients to be accepted, unless other reputation signals prevent that.
43Users can also import mailboxes/messages through the account web page by
44uploading a zip or tgz file with mbox and/or maildirs.
47func cmdImportMaildir(c *cmd) {
48 c.params = "accountname mailboxname maildir"
49 c.help = `Import a maildir into an account.
51` + importCommonHelp + `
52Mailbox flags, like "seen", "answered", will be imported. An optional
53dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
60 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
63func cmdImportMbox(c *cmd) {
64 c.params = "accountname mailboxname mbox"
65 c.help = `Import an mbox into an account.
67Using mbox is not recommended, maildir is a better defined format.
75 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
78func cmdXImportMaildir(c *cmd) {
80 c.params = "accountdir mailboxname maildir"
81 c.help = `Import a maildir into an account by directly accessing the data directory.
84See "mox help import maildir" for details.
89func cmdXImportMbox(c *cmd) {
91 c.params = "accountdir mailboxname mbox"
92 c.help = `Import an mbox into an account by directly accessing the data directory.
94See "mox help import mbox" for details.
99func xcmdXImport(mbox bool, c *cmd) {
105 accountdir := args[0]
106 account := filepath.Base(accountdir)
108 // Set up the mox config so the account can be opened.
109 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
110 log.Fatalf("accountdir must be of the form .../accounts/<name>")
113 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
114 xcheckf(err, "making absolute datadir")
115 mox.ConfigStaticPath = "fake.conf"
116 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
117 mox.Conf.Dynamic.Accounts = map[string]config.Account{
120 defer store.Switchboard()()
122 cconn, sconn := net.Pipe()
123 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
124 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
125 go servectlcmd(context.Background(), &serverctl, func() {})
127 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
130func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
132 ctl.xwrite("importmbox")
134 ctl.xwrite("importmaildir")
137 if strings.EqualFold(mailbox, "Inbox") {
143 fmt.Fprintln(os.Stderr, "importing...")
146 if strings.HasPrefix(line, "progress ") {
147 n := line[len("progress "):]
148 fmt.Fprintf(os.Stderr, "%s...\n", n)
152 log.Fatalf("import, expected ok, got %q", line)
157 fmt.Fprintf(os.Stderr, "%s imported\n", count)
160func importctl(ctx context.Context, ctl *ctl, mbox bool) {
162 > "importmaildir" or "importmbox"
165 > src (mbox file or maildir directory)
167 < "progress" count (zero or more times, once for every 1000 messages)
168 < "ok" when done, or error
169 < count (of total imported messages, only if not error)
171 account := ctl.xread()
172 mailbox := ctl.xread()
179 ctl.log.Info("importing messages",
180 slog.String("kind", kind),
181 slog.String("account", account),
182 slog.String("mailbox", mailbox),
183 slog.String("source", src))
187 var mdnewf, mdcurf *os.File
188 var msgreader store.MsgSource
190 // Open account, creating a database file if it doesn't exist yet. It must be known
191 // in the configuration file.
192 a, err := store.OpenAccount(ctl.log, account)
193 ctl.xcheck(err, "opening account")
197 ctl.log.Check(err, "closing account after import")
201 err = a.ThreadingWait(ctl.log)
202 ctl.xcheck(err, "waiting for account thread upgrade")
207 ctl.log.Check(err, "closing mbox file after import")
210 err := mdnewf.Close()
211 ctl.log.Check(err, "closing maildir new after import")
214 err := mdcurf.Close()
215 ctl.log.Check(err, "closing maildir cur after import")
219 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
220 // starting with junk or spam is junk mail.
222 // First check if we can access the mbox/maildir.
223 // Mox needs to be able to access those files, the user running the import command
224 // may be a different user who can access the files.
226 mboxf, err = os.Open(src)
227 ctl.xcheck(err, "open mbox file")
228 msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
230 mdnewf, err = os.Open(filepath.Join(src, "new"))
231 ctl.xcheck(err, "open subdir new of maildir")
232 mdcurf, err = os.Open(filepath.Join(src, "cur"))
233 ctl.xcheck(err, "open subdir cur of maildir")
234 msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
237 tx, err := a.DB.Begin(ctx, true)
238 ctl.xcheck(err, "begin transaction")
242 ctl.log.Check(err, "rolling back transaction")
246 // All preparations done. Good to go.
249 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
250 var deliveredIDs []int64
259 ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
261 metrics.PanicInc(metrics.Import)
263 ctl.log.Error("import error")
266 for _, id := range deliveredIDs {
267 p := a.MessagePath(id)
269 ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
272 ctl.xerror(fmt.Sprintf("import error: %v", x))
275 var changes []store.Change
277 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
279 xdeliver := func(m *store.Message, mf *os.File) {
280 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
284 const nothreads = true
285 const updateDiskUsage = false
286 err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
287 ctl.xcheck(err, "delivering message")
288 deliveredIDs = append(deliveredIDs, m.ID)
289 ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
290 changes = append(changes, m.ChangeAddUID())
293 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
296 // Ensure mailbox exists.
298 mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
299 ctl.xcheck(err, "ensuring mailbox exists")
301 // We ensure keywords in messages make it to the mailbox as well.
302 mailboxKeywords := map[string]bool{}
304 jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
305 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
306 ctl.xcheck(err, "open junk filter")
311 ctl.xcheck(err, "close junk filter")
317 maxSize := a.QuotaMessageSize()
319 du := store.DiskUsage{ID: 1}
321 ctl.xcheck(err, "get disk usage")
323 process := func(m *store.Message, msgf *os.File, origPath string) {
324 defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
327 if maxSize > 0 && du.MessageSize+addSize > maxSize {
328 ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
331 for _, kw := range m.Keywords {
332 mailboxKeywords[kw] = true
334 mb.Add(m.MailboxCounts())
336 // Parse message and store parsed information for later fast retrieval.
337 p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
339 ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
341 m.ParsedBuf, err = json.Marshal(p)
342 ctl.xcheck(err, "marshal parsed message structure")
344 // Set fields needed for future threading. By doing it now, DeliverMessage won't
345 // have to parse the Part again.
346 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
347 m.PrepareThreading(ctl.log, &p)
349 if m.Received.IsZero() {
350 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
351 m.Received = p.Envelope.Date
353 m.Received = time.Now()
357 // We set the flags that Deliver would set now and train ourselves. This prevents
358 // Deliver from training, which would open the junk filter, change it, and write it
359 // back to disk, for each message (slow).
360 m.JunkFlagsForMailbox(mb, conf)
361 if jf != nil && m.NeedsTraining() {
362 if words, err := jf.ParseMessage(p); err != nil {
363 ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
365 err = jf.Train(ctx, !m.Junk, words)
366 ctl.xcheck(err, "training junk filter")
367 m.TrainedJunk = &m.Junk
373 modseq, err = a.NextModSeq(tx)
374 ctl.xcheck(err, "assigning next modseq")
378 m.MailboxOrigID = mb.ID
385 ctl.xwrite(fmt.Sprintf("progress %d", n))
390 m, msgf, origPath, err := msgreader.Next()
394 ctl.xcheck(err, "reading next message")
396 process(m, msgf, origPath)
400 if len(deliveredIDs) > 0 {
401 err = a.AssignThreads(ctx, ctl.log, tx, deliveredIDs[0], 0, io.Discard)
402 ctl.xcheck(err, "assigning messages to threads")
405 // Get mailbox again, uidnext is likely updated.
406 mc := mb.MailboxCounts
408 ctl.xcheck(err, "get mailbox")
409 mb.MailboxCounts = mc
411 // If there are any new keywords, update the mailbox.
413 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
415 changes = append(changes, mb.ChangeKeywords())
419 ctl.xcheck(err, "updating message counts and keywords in mailbox")
420 changes = append(changes, mb.ChangeCounts())
422 err = a.AddMessageSize(ctl.log, tx, addSize)
423 xcheckf(err, "updating total message size")
426 ctl.xcheck(err, "commit")
428 ctl.log.Info("delivered messages through import", slog.Int("count", len(deliveredIDs)))
431 store.BroadcastChanges(a, changes)
435 ctl.xcheck(err, "closing account")
439 ctl.xwrite(fmt.Sprintf("%d", n))