19 "golang.org/x/exp/maps"
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/store"
28// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
30const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
31it must have access to the archive files. The default suggested systemd service
32file isolates mox from most of the file system, with only the "data/" directory
33accessible, so you may want to put the mbox/maildir archive files in a
34directory like "data/import/" to make it available to mox.
36By default, messages will train the junk filter based on their flags and, if
37"automatic junk flags" configuration is set, based on mailbox naming.
39If the destination mailbox is the Sent mailbox, the recipients of the messages
40are added to the message metadata, causing later incoming messages from these
41recipients to be accepted, unless other reputation signals prevent that.
43Users can also import mailboxes/messages through the account web page by
44uploading a zip or tgz file with mbox and/or maildirs.
46Messages are imported even if already present. Importing messages twice will
47result in duplicate messages.
50func cmdImportMaildir(c *cmd) {
51 c.params = "accountname mailboxname maildir"
52 c.help = `Import a maildir into an account.
54` + importCommonHelp + `
55Mailbox flags, like "seen", "answered", will be imported. An optional
56dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
63 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
66func cmdImportMbox(c *cmd) {
67 c.params = "accountname mailboxname mbox"
68 c.help = `Import an mbox into an account.
70Using mbox is not recommended, maildir is a better defined format.
78 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
81func cmdXImportMaildir(c *cmd) {
83 c.params = "accountdir mailboxname maildir"
84 c.help = `Import a maildir into an account by directly accessing the data directory.
87See "mox help import maildir" for details.
92func cmdXImportMbox(c *cmd) {
94 c.params = "accountdir mailboxname mbox"
95 c.help = `Import an mbox into an account by directly accessing the data directory.
97See "mox help import mbox" for details.
102func xcmdXImport(mbox bool, c *cmd) {
108 accountdir := args[0]
109 account := filepath.Base(accountdir)
111 // Set up the mox config so the account can be opened.
112 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
113 log.Fatalf("accountdir must be of the form .../accounts/<name>")
116 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
117 xcheckf(err, "making absolute datadir")
118 mox.ConfigStaticPath = "fake.conf"
119 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
120 mox.Conf.Dynamic.Accounts = map[string]config.Account{
123 defer store.Switchboard()()
125 cconn, sconn := net.Pipe()
126 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
127 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
128 go servectlcmd(context.Background(), &serverctl, func() {})
130 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
133func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
135 ctl.xwrite("importmbox")
137 ctl.xwrite("importmaildir")
140 if strings.EqualFold(mailbox, "Inbox") {
146 fmt.Fprintln(os.Stderr, "importing...")
149 if strings.HasPrefix(line, "progress ") {
150 n := line[len("progress "):]
151 fmt.Fprintf(os.Stderr, "%s...\n", n)
155 log.Fatalf("import, expected ok, got %q", line)
160 fmt.Fprintf(os.Stderr, "%s imported\n", count)
163func importctl(ctx context.Context, ctl *ctl, mbox bool) {
165 > "importmaildir" or "importmbox"
168 > src (mbox file or maildir directory)
170 < "progress" count (zero or more times, once for every 1000 messages)
171 < "ok" when done, or error
172 < count (of total imported messages, only if not error)
174 account := ctl.xread()
175 mailbox := ctl.xread()
182 ctl.log.Info("importing messages",
183 slog.String("kind", kind),
184 slog.String("account", account),
185 slog.String("mailbox", mailbox),
186 slog.String("source", src))
190 var mdnewf, mdcurf *os.File
191 var msgreader store.MsgSource
193 // Open account, creating a database file if it doesn't exist yet. It must be known
194 // in the configuration file.
195 a, err := store.OpenAccount(ctl.log, account)
196 ctl.xcheck(err, "opening account")
200 ctl.log.Check(err, "closing account after import")
204 err = a.ThreadingWait(ctl.log)
205 ctl.xcheck(err, "waiting for account thread upgrade")
210 ctl.log.Check(err, "closing mbox file after import")
213 err := mdnewf.Close()
214 ctl.log.Check(err, "closing maildir new after import")
217 err := mdcurf.Close()
218 ctl.log.Check(err, "closing maildir cur after import")
222 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
223 // starting with junk or spam is junk mail.
225 // First check if we can access the mbox/maildir.
226 // Mox needs to be able to access those files, the user running the import command
227 // may be a different user who can access the files.
229 mboxf, err = os.Open(src)
230 ctl.xcheck(err, "open mbox file")
231 msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
233 mdnewf, err = os.Open(filepath.Join(src, "new"))
234 ctl.xcheck(err, "open subdir new of maildir")
235 mdcurf, err = os.Open(filepath.Join(src, "cur"))
236 ctl.xcheck(err, "open subdir cur of maildir")
237 msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
240 tx, err := a.DB.Begin(ctx, true)
241 ctl.xcheck(err, "begin transaction")
245 ctl.log.Check(err, "rolling back transaction")
249 // All preparations done. Good to go.
252 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
253 var deliveredIDs []int64
262 ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
264 metrics.PanicInc(metrics.Import)
266 ctl.log.Error("import error")
269 for _, id := range deliveredIDs {
270 p := a.MessagePath(id)
272 ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
275 ctl.xerror(fmt.Sprintf("import error: %v", x))
278 var changes []store.Change
280 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
282 xdeliver := func(m *store.Message, mf *os.File) {
283 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
287 const nothreads = true
288 const updateDiskUsage = false
289 err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
290 ctl.xcheck(err, "delivering message")
291 deliveredIDs = append(deliveredIDs, m.ID)
292 ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
293 changes = append(changes, m.ChangeAddUID())
296 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
299 // Ensure mailbox exists.
301 mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
302 ctl.xcheck(err, "ensuring mailbox exists")
304 // We ensure keywords in messages make it to the mailbox as well.
305 mailboxKeywords := map[string]bool{}
307 jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
308 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
309 ctl.xcheck(err, "open junk filter")
314 ctl.xcheck(err, "close junk filter")
320 maxSize := a.QuotaMessageSize()
322 du := store.DiskUsage{ID: 1}
324 ctl.xcheck(err, "get disk usage")
326 process := func(m *store.Message, msgf *os.File, origPath string) {
327 defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
330 if maxSize > 0 && du.MessageSize+addSize > maxSize {
331 ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
334 for _, kw := range m.Keywords {
335 mailboxKeywords[kw] = true
337 mb.Add(m.MailboxCounts())
339 // Parse message and store parsed information for later fast retrieval.
340 p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
342 ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
344 m.ParsedBuf, err = json.Marshal(p)
345 ctl.xcheck(err, "marshal parsed message structure")
347 // Set fields needed for future threading. By doing it now, DeliverMessage won't
348 // have to parse the Part again.
349 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
350 m.PrepareThreading(ctl.log, &p)
352 if m.Received.IsZero() {
353 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
354 m.Received = p.Envelope.Date
356 m.Received = time.Now()
360 // We set the flags that Deliver would set now and train ourselves. This prevents
361 // Deliver from training, which would open the junk filter, change it, and write it
362 // back to disk, for each message (slow).
363 m.JunkFlagsForMailbox(mb, conf)
364 if jf != nil && m.NeedsTraining() {
365 if words, err := jf.ParseMessage(p); err != nil {
366 ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
368 err = jf.Train(ctx, !m.Junk, words)
369 ctl.xcheck(err, "training junk filter")
370 m.TrainedJunk = &m.Junk
376 modseq, err = a.NextModSeq(tx)
377 ctl.xcheck(err, "assigning next modseq")
381 m.MailboxOrigID = mb.ID
388 ctl.xwrite(fmt.Sprintf("progress %d", n))
393 m, msgf, origPath, err := msgreader.Next()
397 ctl.xcheck(err, "reading next message")
399 process(m, msgf, origPath)
403 if len(deliveredIDs) > 0 {
404 err = a.AssignThreads(ctx, ctl.log, tx, deliveredIDs[0], 0, io.Discard)
405 ctl.xcheck(err, "assigning messages to threads")
408 // Get mailbox again, uidnext is likely updated.
409 mc := mb.MailboxCounts
411 ctl.xcheck(err, "get mailbox")
412 mb.MailboxCounts = mc
414 // If there are any new keywords, update the mailbox.
416 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
418 changes = append(changes, mb.ChangeKeywords())
422 ctl.xcheck(err, "updating message counts and keywords in mailbox")
423 changes = append(changes, mb.ChangeCounts())
425 err = a.AddMessageSize(ctl.log, tx, addSize)
426 xcheckf(err, "updating total message size")
429 ctl.xcheck(err, "commit")
431 ctl.log.Info("delivered messages through import", slog.Int("count", len(deliveredIDs)))
434 store.BroadcastChanges(a, changes)
438 ctl.xcheck(err, "closing account")
442 ctl.xwrite(fmt.Sprintf("%d", n))