1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "log/slog"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime/debug"
16 "strings"
17 "time"
18
19 "golang.org/x/exp/maps"
20
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/store"
26)
27
28// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
29
30const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
31it must have access to the archive files. The default suggested systemd service
32file isolates mox from most of the file system, with only the "data/" directory
33accessible, so you may want to put the mbox/maildir archive files in a
34directory like "data/import/" to make it available to mox.
35
36By default, messages will train the junk filter based on their flags and, if
37"automatic junk flags" configuration is set, based on mailbox naming.
38
39If the destination mailbox is the Sent mailbox, the recipients of the messages
40are added to the message metadata, causing later incoming messages from these
41recipients to be accepted, unless other reputation signals prevent that.
42
43Users can also import mailboxes/messages through the account web page by
44uploading a zip or tgz file with mbox and/or maildirs.
45`
46
47func cmdImportMaildir(c *cmd) {
48 c.params = "accountname mailboxname maildir"
49 c.help = `Import a maildir into an account.
50
51` + importCommonHelp + `
52Mailbox flags, like "seen", "answered", will be imported. An optional
53dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
54`
55 args := c.Parse()
56 if len(args) != 3 {
57 c.Usage()
58 }
59 mustLoadConfig()
60 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
61}
62
63func cmdImportMbox(c *cmd) {
64 c.params = "accountname mailboxname mbox"
65 c.help = `Import an mbox into an account.
66
67Using mbox is not recommended, maildir is a better defined format.
68
69` + importCommonHelp
70 args := c.Parse()
71 if len(args) != 3 {
72 c.Usage()
73 }
74 mustLoadConfig()
75 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
76}
77
78func cmdXImportMaildir(c *cmd) {
79 c.unlisted = true
80 c.params = "accountdir mailboxname maildir"
81 c.help = `Import a maildir into an account by directly accessing the data directory.
82
83
84See "mox help import maildir" for details.
85`
86 xcmdXImport(false, c)
87}
88
89func cmdXImportMbox(c *cmd) {
90 c.unlisted = true
91 c.params = "accountdir mailboxname mbox"
92 c.help = `Import an mbox into an account by directly accessing the data directory.
93
94See "mox help import mbox" for details.
95`
96 xcmdXImport(true, c)
97}
98
99func xcmdXImport(mbox bool, c *cmd) {
100 args := c.Parse()
101 if len(args) != 3 {
102 c.Usage()
103 }
104
105 accountdir := args[0]
106 account := filepath.Base(accountdir)
107
108 // Set up the mox config so the account can be opened.
109 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
110 log.Fatalf("accountdir must be of the form .../accounts/<name>")
111 }
112 var err error
113 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
114 xcheckf(err, "making absolute datadir")
115 mox.ConfigStaticPath = "fake.conf"
116 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
117 mox.Conf.Dynamic.Accounts = map[string]config.Account{
118 account: {},
119 }
120 defer store.Switchboard()()
121
122 cconn, sconn := net.Pipe()
123 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
124 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
125 go servectlcmd(context.Background(), &serverctl, func() {})
126
127 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
128}
129
130func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
131 if mbox {
132 ctl.xwrite("importmbox")
133 } else {
134 ctl.xwrite("importmaildir")
135 }
136 ctl.xwrite(account)
137 if strings.EqualFold(mailbox, "Inbox") {
138 mailbox = "Inbox"
139 }
140 ctl.xwrite(mailbox)
141 ctl.xwrite(src)
142 ctl.xreadok()
143 fmt.Fprintln(os.Stderr, "importing...")
144 for {
145 line := ctl.xread()
146 if strings.HasPrefix(line, "progress ") {
147 n := line[len("progress "):]
148 fmt.Fprintf(os.Stderr, "%s...\n", n)
149 continue
150 }
151 if line != "ok" {
152 log.Fatalf("import, expected ok, got %q", line)
153 }
154 break
155 }
156 count := ctl.xread()
157 fmt.Fprintf(os.Stderr, "%s imported\n", count)
158}
159
160func importctl(ctx context.Context, ctl *ctl, mbox bool) {
161 /* protocol:
162 > "importmaildir" or "importmbox"
163 > account
164 > mailbox
165 > src (mbox file or maildir directory)
166 < "ok" or error
167 < "progress" count (zero or more times, once for every 1000 messages)
168 < "ok" when done, or error
169 < count (of total imported messages, only if not error)
170 */
171 account := ctl.xread()
172 mailbox := ctl.xread()
173 src := ctl.xread()
174
175 kind := "maildir"
176 if mbox {
177 kind = "mbox"
178 }
179 ctl.log.Info("importing messages",
180 slog.String("kind", kind),
181 slog.String("account", account),
182 slog.String("mailbox", mailbox),
183 slog.String("source", src))
184
185 var err error
186 var mboxf *os.File
187 var mdnewf, mdcurf *os.File
188 var msgreader store.MsgSource
189
190 // Open account, creating a database file if it doesn't exist yet. It must be known
191 // in the configuration file.
192 a, err := store.OpenAccount(ctl.log, account)
193 ctl.xcheck(err, "opening account")
194 defer func() {
195 if a != nil {
196 err := a.Close()
197 ctl.log.Check(err, "closing account after import")
198 }
199 }()
200
201 err = a.ThreadingWait(ctl.log)
202 ctl.xcheck(err, "waiting for account thread upgrade")
203
204 defer func() {
205 if mboxf != nil {
206 err := mboxf.Close()
207 ctl.log.Check(err, "closing mbox file after import")
208 }
209 if mdnewf != nil {
210 err := mdnewf.Close()
211 ctl.log.Check(err, "closing maildir new after import")
212 }
213 if mdcurf != nil {
214 err := mdcurf.Close()
215 ctl.log.Check(err, "closing maildir cur after import")
216 }
217 }()
218
219 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
220 // starting with junk or spam is junk mail.
221
222 // First check if we can access the mbox/maildir.
223 // Mox needs to be able to access those files, the user running the import command
224 // may be a different user who can access the files.
225 if mbox {
226 mboxf, err = os.Open(src)
227 ctl.xcheck(err, "open mbox file")
228 msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
229 } else {
230 mdnewf, err = os.Open(filepath.Join(src, "new"))
231 ctl.xcheck(err, "open subdir new of maildir")
232 mdcurf, err = os.Open(filepath.Join(src, "cur"))
233 ctl.xcheck(err, "open subdir cur of maildir")
234 msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
235 }
236
237 tx, err := a.DB.Begin(ctx, true)
238 ctl.xcheck(err, "begin transaction")
239 defer func() {
240 if tx != nil {
241 err := tx.Rollback()
242 ctl.log.Check(err, "rolling back transaction")
243 }
244 }()
245
246 // All preparations done. Good to go.
247 ctl.xwriteok()
248
249 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
250 var deliveredIDs []int64
251
252 defer func() {
253 x := recover()
254 if x == nil {
255 return
256 }
257
258 if x != ctl.x {
259 ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
260 debug.PrintStack()
261 metrics.PanicInc(metrics.Import)
262 } else {
263 ctl.log.Error("import error")
264 }
265
266 for _, id := range deliveredIDs {
267 p := a.MessagePath(id)
268 err := os.Remove(p)
269 ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
270 }
271
272 ctl.xerror(fmt.Sprintf("import error: %v", x))
273 }()
274
275 var changes []store.Change
276
277 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
278
279 xdeliver := func(m *store.Message, mf *os.File) {
280 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
281
282 const sync = false
283 const notrain = true
284 const nothreads = true
285 const updateDiskUsage = false
286 err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
287 ctl.xcheck(err, "delivering message")
288 deliveredIDs = append(deliveredIDs, m.ID)
289 ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
290 changes = append(changes, m.ChangeAddUID())
291 }
292
293 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
294 n := 0
295 a.WithWLock(func() {
296 // Ensure mailbox exists.
297 var mb store.Mailbox
298 mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
299 ctl.xcheck(err, "ensuring mailbox exists")
300
301 // We ensure keywords in messages make it to the mailbox as well.
302 mailboxKeywords := map[string]bool{}
303
304 jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
305 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
306 ctl.xcheck(err, "open junk filter")
307 }
308 defer func() {
309 if jf != nil {
310 err = jf.Close()
311 ctl.xcheck(err, "close junk filter")
312 }
313 }()
314
315 conf, _ := a.Conf()
316
317 maxSize := a.QuotaMessageSize()
318 var addSize int64
319 du := store.DiskUsage{ID: 1}
320 err = tx.Get(&du)
321 ctl.xcheck(err, "get disk usage")
322
323 process := func(m *store.Message, msgf *os.File, origPath string) {
324 defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
325
326 addSize += m.Size
327 if maxSize > 0 && du.MessageSize+addSize > maxSize {
328 ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
329 }
330
331 for _, kw := range m.Keywords {
332 mailboxKeywords[kw] = true
333 }
334 mb.Add(m.MailboxCounts())
335
336 // Parse message and store parsed information for later fast retrieval.
337 p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
338 if err != nil {
339 ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
340 }
341 m.ParsedBuf, err = json.Marshal(p)
342 ctl.xcheck(err, "marshal parsed message structure")
343
344 // Set fields needed for future threading. By doing it now, DeliverMessage won't
345 // have to parse the Part again.
346 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
347 m.PrepareThreading(ctl.log, &p)
348
349 if m.Received.IsZero() {
350 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
351 m.Received = p.Envelope.Date
352 } else {
353 m.Received = time.Now()
354 }
355 }
356
357 // We set the flags that Deliver would set now and train ourselves. This prevents
358 // Deliver from training, which would open the junk filter, change it, and write it
359 // back to disk, for each message (slow).
360 m.JunkFlagsForMailbox(mb, conf)
361 if jf != nil && m.NeedsTraining() {
362 if words, err := jf.ParseMessage(p); err != nil {
363 ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
364 } else {
365 err = jf.Train(ctx, !m.Junk, words)
366 ctl.xcheck(err, "training junk filter")
367 m.TrainedJunk = &m.Junk
368 }
369 }
370
371 if modseq == 0 {
372 var err error
373 modseq, err = a.NextModSeq(tx)
374 ctl.xcheck(err, "assigning next modseq")
375 }
376
377 m.MailboxID = mb.ID
378 m.MailboxOrigID = mb.ID
379 m.CreateSeq = modseq
380 m.ModSeq = modseq
381 xdeliver(m, msgf)
382
383 n++
384 if n%1000 == 0 {
385 ctl.xwrite(fmt.Sprintf("progress %d", n))
386 }
387 }
388
389 for {
390 m, msgf, origPath, err := msgreader.Next()
391 if err == io.EOF {
392 break
393 }
394 ctl.xcheck(err, "reading next message")
395
396 process(m, msgf, origPath)
397 }
398
399 // Match threads.
400 if len(deliveredIDs) > 0 {
401 err = a.AssignThreads(ctx, ctl.log, tx, deliveredIDs[0], 0, io.Discard)
402 ctl.xcheck(err, "assigning messages to threads")
403 }
404
405 // Get mailbox again, uidnext is likely updated.
406 mc := mb.MailboxCounts
407 err = tx.Get(&mb)
408 ctl.xcheck(err, "get mailbox")
409 mb.MailboxCounts = mc
410
411 // If there are any new keywords, update the mailbox.
412 var mbKwChanged bool
413 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
414 if mbKwChanged {
415 changes = append(changes, mb.ChangeKeywords())
416 }
417
418 err = tx.Update(&mb)
419 ctl.xcheck(err, "updating message counts and keywords in mailbox")
420 changes = append(changes, mb.ChangeCounts())
421
422 err = a.AddMessageSize(ctl.log, tx, addSize)
423 xcheckf(err, "updating total message size")
424
425 err = tx.Commit()
426 ctl.xcheck(err, "commit")
427 tx = nil
428 ctl.log.Info("delivered messages through import", slog.Int("count", len(deliveredIDs)))
429 deliveredIDs = nil
430
431 store.BroadcastChanges(a, changes)
432 })
433
434 err = a.Close()
435 ctl.xcheck(err, "closing account")
436 a = nil
437
438 ctl.xwriteok()
439 ctl.xwrite(fmt.Sprintf("%d", n))
440}
441