1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "log/slog"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime/debug"
16 "strings"
17 "time"
18
19 "golang.org/x/exp/maps"
20
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/store"
26)
27
28// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
29
30const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
31it must have access to the archive files. The default suggested systemd service
32file isolates mox from most of the file system, with only the "data/" directory
33accessible, so you may want to put the mbox/maildir archive files in a
34directory like "data/import/" to make it available to mox.
35
36By default, messages will train the junk filter based on their flags and, if
37"automatic junk flags" configuration is set, based on mailbox naming.
38
39If the destination mailbox is the Sent mailbox, the recipients of the messages
40are added to the message metadata, causing later incoming messages from these
41recipients to be accepted, unless other reputation signals prevent that.
42
43Users can also import mailboxes/messages through the account web page by
44uploading a zip or tgz file with mbox and/or maildirs.
45
46Messages are imported even if already present. Importing messages twice will
47result in duplicate messages.
48`
49
50func cmdImportMaildir(c *cmd) {
51 c.params = "accountname mailboxname maildir"
52 c.help = `Import a maildir into an account.
53
54` + importCommonHelp + `
55Mailbox flags, like "seen", "answered", will be imported. An optional
56dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
57`
58 args := c.Parse()
59 if len(args) != 3 {
60 c.Usage()
61 }
62 mustLoadConfig()
63 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
64}
65
66func cmdImportMbox(c *cmd) {
67 c.params = "accountname mailboxname mbox"
68 c.help = `Import an mbox into an account.
69
70Using mbox is not recommended, maildir is a better defined format.
71
72` + importCommonHelp
73 args := c.Parse()
74 if len(args) != 3 {
75 c.Usage()
76 }
77 mustLoadConfig()
78 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
79}
80
81func cmdXImportMaildir(c *cmd) {
82 c.unlisted = true
83 c.params = "accountdir mailboxname maildir"
84 c.help = `Import a maildir into an account by directly accessing the data directory.
85
86
87See "mox help import maildir" for details.
88`
89 xcmdXImport(false, c)
90}
91
92func cmdXImportMbox(c *cmd) {
93 c.unlisted = true
94 c.params = "accountdir mailboxname mbox"
95 c.help = `Import an mbox into an account by directly accessing the data directory.
96
97See "mox help import mbox" for details.
98`
99 xcmdXImport(true, c)
100}
101
102func xcmdXImport(mbox bool, c *cmd) {
103 args := c.Parse()
104 if len(args) != 3 {
105 c.Usage()
106 }
107
108 accountdir := args[0]
109 account := filepath.Base(accountdir)
110
111 // Set up the mox config so the account can be opened.
112 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
113 log.Fatalf("accountdir must be of the form .../accounts/<name>")
114 }
115 var err error
116 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
117 xcheckf(err, "making absolute datadir")
118 mox.ConfigStaticPath = "fake.conf"
119 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
120 mox.Conf.Dynamic.Accounts = map[string]config.Account{
121 account: {},
122 }
123 defer store.Switchboard()()
124
125 cconn, sconn := net.Pipe()
126 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
127 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
128 go servectlcmd(context.Background(), &serverctl, func() {})
129
130 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
131}
132
133func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
134 if mbox {
135 ctl.xwrite("importmbox")
136 } else {
137 ctl.xwrite("importmaildir")
138 }
139 ctl.xwrite(account)
140 if strings.EqualFold(mailbox, "Inbox") {
141 mailbox = "Inbox"
142 }
143 ctl.xwrite(mailbox)
144 ctl.xwrite(src)
145 ctl.xreadok()
146 fmt.Fprintln(os.Stderr, "importing...")
147 for {
148 line := ctl.xread()
149 if strings.HasPrefix(line, "progress ") {
150 n := line[len("progress "):]
151 fmt.Fprintf(os.Stderr, "%s...\n", n)
152 continue
153 }
154 if line != "ok" {
155 log.Fatalf("import, expected ok, got %q", line)
156 }
157 break
158 }
159 count := ctl.xread()
160 fmt.Fprintf(os.Stderr, "%s imported\n", count)
161}
162
163func importctl(ctx context.Context, ctl *ctl, mbox bool) {
164 /* protocol:
165 > "importmaildir" or "importmbox"
166 > account
167 > mailbox
168 > src (mbox file or maildir directory)
169 < "ok" or error
170 < "progress" count (zero or more times, once for every 1000 messages)
171 < "ok" when done, or error
172 < count (of total imported messages, only if not error)
173 */
174 account := ctl.xread()
175 mailbox := ctl.xread()
176 src := ctl.xread()
177
178 kind := "maildir"
179 if mbox {
180 kind = "mbox"
181 }
182 ctl.log.Info("importing messages",
183 slog.String("kind", kind),
184 slog.String("account", account),
185 slog.String("mailbox", mailbox),
186 slog.String("source", src))
187
188 var err error
189 var mboxf *os.File
190 var mdnewf, mdcurf *os.File
191 var msgreader store.MsgSource
192
193 // Open account, creating a database file if it doesn't exist yet. It must be known
194 // in the configuration file.
195 a, err := store.OpenAccount(ctl.log, account)
196 ctl.xcheck(err, "opening account")
197 defer func() {
198 if a != nil {
199 err := a.Close()
200 ctl.log.Check(err, "closing account after import")
201 }
202 }()
203
204 err = a.ThreadingWait(ctl.log)
205 ctl.xcheck(err, "waiting for account thread upgrade")
206
207 defer func() {
208 if mboxf != nil {
209 err := mboxf.Close()
210 ctl.log.Check(err, "closing mbox file after import")
211 }
212 if mdnewf != nil {
213 err := mdnewf.Close()
214 ctl.log.Check(err, "closing maildir new after import")
215 }
216 if mdcurf != nil {
217 err := mdcurf.Close()
218 ctl.log.Check(err, "closing maildir cur after import")
219 }
220 }()
221
222 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
223 // starting with junk or spam is junk mail.
224
225 // First check if we can access the mbox/maildir.
226 // Mox needs to be able to access those files, the user running the import command
227 // may be a different user who can access the files.
228 if mbox {
229 mboxf, err = os.Open(src)
230 ctl.xcheck(err, "open mbox file")
231 msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
232 } else {
233 mdnewf, err = os.Open(filepath.Join(src, "new"))
234 ctl.xcheck(err, "open subdir new of maildir")
235 mdcurf, err = os.Open(filepath.Join(src, "cur"))
236 ctl.xcheck(err, "open subdir cur of maildir")
237 msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
238 }
239
240 tx, err := a.DB.Begin(ctx, true)
241 ctl.xcheck(err, "begin transaction")
242 defer func() {
243 if tx != nil {
244 err := tx.Rollback()
245 ctl.log.Check(err, "rolling back transaction")
246 }
247 }()
248
249 // All preparations done. Good to go.
250 ctl.xwriteok()
251
252 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
253 var deliveredIDs []int64
254
255 defer func() {
256 x := recover()
257 if x == nil {
258 return
259 }
260
261 if x != ctl.x {
262 ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
263 debug.PrintStack()
264 metrics.PanicInc(metrics.Import)
265 } else {
266 ctl.log.Error("import error")
267 }
268
269 for _, id := range deliveredIDs {
270 p := a.MessagePath(id)
271 err := os.Remove(p)
272 ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
273 }
274
275 ctl.xerror(fmt.Sprintf("import error: %v", x))
276 }()
277
278 var changes []store.Change
279
280 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
281
282 xdeliver := func(m *store.Message, mf *os.File) {
283 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
284
285 const sync = false
286 const notrain = true
287 const nothreads = true
288 const updateDiskUsage = false
289 err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
290 ctl.xcheck(err, "delivering message")
291 deliveredIDs = append(deliveredIDs, m.ID)
292 ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
293 changes = append(changes, m.ChangeAddUID())
294 }
295
296 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
297 n := 0
298 a.WithWLock(func() {
299 // Ensure mailbox exists.
300 var mb store.Mailbox
301 mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
302 ctl.xcheck(err, "ensuring mailbox exists")
303
304 // We ensure keywords in messages make it to the mailbox as well.
305 mailboxKeywords := map[string]bool{}
306
307 jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
308 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
309 ctl.xcheck(err, "open junk filter")
310 }
311 defer func() {
312 if jf != nil {
313 err = jf.Close()
314 ctl.xcheck(err, "close junk filter")
315 }
316 }()
317
318 conf, _ := a.Conf()
319
320 maxSize := a.QuotaMessageSize()
321 var addSize int64
322 du := store.DiskUsage{ID: 1}
323 err = tx.Get(&du)
324 ctl.xcheck(err, "get disk usage")
325
326 process := func(m *store.Message, msgf *os.File, origPath string) {
327 defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
328
329 addSize += m.Size
330 if maxSize > 0 && du.MessageSize+addSize > maxSize {
331 ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
332 }
333
334 for _, kw := range m.Keywords {
335 mailboxKeywords[kw] = true
336 }
337 mb.Add(m.MailboxCounts())
338
339 // Parse message and store parsed information for later fast retrieval.
340 p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
341 if err != nil {
342 ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
343 }
344 m.ParsedBuf, err = json.Marshal(p)
345 ctl.xcheck(err, "marshal parsed message structure")
346
347 // Set fields needed for future threading. By doing it now, DeliverMessage won't
348 // have to parse the Part again.
349 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
350 m.PrepareThreading(ctl.log, &p)
351
352 if m.Received.IsZero() {
353 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
354 m.Received = p.Envelope.Date
355 } else {
356 m.Received = time.Now()
357 }
358 }
359
360 // We set the flags that Deliver would set now and train ourselves. This prevents
361 // Deliver from training, which would open the junk filter, change it, and write it
362 // back to disk, for each message (slow).
363 m.JunkFlagsForMailbox(mb, conf)
364 if jf != nil && m.NeedsTraining() {
365 if words, err := jf.ParseMessage(p); err != nil {
366 ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
367 } else {
368 err = jf.Train(ctx, !m.Junk, words)
369 ctl.xcheck(err, "training junk filter")
370 m.TrainedJunk = &m.Junk
371 }
372 }
373
374 if modseq == 0 {
375 var err error
376 modseq, err = a.NextModSeq(tx)
377 ctl.xcheck(err, "assigning next modseq")
378 }
379
380 m.MailboxID = mb.ID
381 m.MailboxOrigID = mb.ID
382 m.CreateSeq = modseq
383 m.ModSeq = modseq
384 xdeliver(m, msgf)
385
386 n++
387 if n%1000 == 0 {
388 ctl.xwrite(fmt.Sprintf("progress %d", n))
389 }
390 }
391
392 for {
393 m, msgf, origPath, err := msgreader.Next()
394 if err == io.EOF {
395 break
396 }
397 ctl.xcheck(err, "reading next message")
398
399 process(m, msgf, origPath)
400 }
401
402 // Match threads.
403 if len(deliveredIDs) > 0 {
404 err = a.AssignThreads(ctx, ctl.log, tx, deliveredIDs[0], 0, io.Discard)
405 ctl.xcheck(err, "assigning messages to threads")
406 }
407
408 // Get mailbox again, uidnext is likely updated.
409 mc := mb.MailboxCounts
410 err = tx.Get(&mb)
411 ctl.xcheck(err, "get mailbox")
412 mb.MailboxCounts = mc
413
414 // If there are any new keywords, update the mailbox.
415 var mbKwChanged bool
416 mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
417 if mbKwChanged {
418 changes = append(changes, mb.ChangeKeywords())
419 }
420
421 err = tx.Update(&mb)
422 ctl.xcheck(err, "updating message counts and keywords in mailbox")
423 changes = append(changes, mb.ChangeCounts())
424
425 err = a.AddMessageSize(ctl.log, tx, addSize)
426 xcheckf(err, "updating total message size")
427
428 err = tx.Commit()
429 ctl.xcheck(err, "commit")
430 tx = nil
431 ctl.log.Info("delivered messages through import", slog.Int("count", len(deliveredIDs)))
432 deliveredIDs = nil
433
434 store.BroadcastChanges(a, changes)
435 })
436
437 err = a.Close()
438 ctl.xcheck(err, "closing account")
439 a = nil
440
441 ctl.xwriteok()
442 ctl.xwrite(fmt.Sprintf("%d", n))
443}
444