1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "log/slog"
12 "net"
13 "os"
14 "path/filepath"
15 "runtime/debug"
16 "strings"
17 "time"
18
19 "golang.org/x/text/unicode/norm"
20
21 "github.com/mjl-/mox/config"
22 "github.com/mjl-/mox/message"
23 "github.com/mjl-/mox/metrics"
24 "github.com/mjl-/mox/mox-"
25 "github.com/mjl-/mox/moxio"
26 "github.com/mjl-/mox/store"
27)
28
29// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
30
31const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
32it must have access to the archive files. The default suggested systemd service
33file isolates mox from most of the file system, with only the "data/" directory
34accessible, so you may want to put the mbox/maildir archive files in a
35directory like "data/import/" to make it available to mox.
36
37By default, messages will train the junk filter based on their flags and, if
38"automatic junk flags" configuration is set, based on mailbox naming.
39
40If the destination mailbox is the Sent mailbox, the recipients of the messages
41are added to the message metadata, causing later incoming messages from these
42recipients to be accepted, unless other reputation signals prevent that.
43
44Users can also import mailboxes/messages through the account web page by
45uploading a zip or tgz file with mbox and/or maildirs.
46
47Messages are imported even if already present. Importing messages twice will
48result in duplicate messages.
49`
50
51func cmdImportMaildir(c *cmd) {
52 c.params = "accountname mailboxname maildir"
53 c.help = `Import a maildir into an account.
54
55` + importCommonHelp + `
56Mailbox flags, like "seen", "answered", will be imported. An optional
57dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
58`
59 args := c.Parse()
60 if len(args) != 3 {
61 c.Usage()
62 }
63 mustLoadConfig()
64 ctlcmdImport(xctl(), false, args[0], args[1], args[2])
65}
66
67func cmdImportMbox(c *cmd) {
68 c.params = "accountname mailboxname mbox"
69 c.help = `Import an mbox into an account.
70
71Using mbox is not recommended, maildir is a better defined format.
72
73` + importCommonHelp
74 args := c.Parse()
75 if len(args) != 3 {
76 c.Usage()
77 }
78 mustLoadConfig()
79 ctlcmdImport(xctl(), true, args[0], args[1], args[2])
80}
81
82func cmdXImportMaildir(c *cmd) {
83 c.unlisted = true
84 c.params = "accountdir mailboxname maildir"
85 c.help = `Import a maildir into an account by directly accessing the data directory.
86
87
88See "mox help import maildir" for details.
89`
90 xcmdXImport(false, c)
91}
92
93func cmdXImportMbox(c *cmd) {
94 c.unlisted = true
95 c.params = "accountdir mailboxname mbox"
96 c.help = `Import an mbox into an account by directly accessing the data directory.
97
98See "mox help import mbox" for details.
99`
100 xcmdXImport(true, c)
101}
102
103func xcmdXImport(mbox bool, c *cmd) {
104 args := c.Parse()
105 if len(args) != 3 {
106 c.Usage()
107 }
108
109 accountdir := args[0]
110 account := filepath.Base(accountdir)
111
112 // Set up the mox config so the account can be opened.
113 if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
114 log.Fatalf("accountdir must be of the form .../accounts/<name>")
115 }
116 var err error
117 mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
118 xcheckf(err, "making absolute datadir")
119 mox.ConfigStaticPath = "fake.conf"
120 mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
121 mox.Conf.Dynamic.Accounts = map[string]config.Account{
122 account: {},
123 }
124 defer store.Switchboard()()
125
126 cconn, sconn := net.Pipe()
127 clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
128 serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
129 go servectlcmd(context.Background(), &serverctl, 0, func() {})
130
131 ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
132}
133
134func ctlcmdImport(xctl *ctl, mbox bool, account, mailbox, src string) {
135 if mbox {
136 xctl.xwrite("importmbox")
137 } else {
138 xctl.xwrite("importmaildir")
139 }
140 xctl.xwrite(account)
141 if strings.EqualFold(mailbox, "Inbox") {
142 mailbox = "Inbox"
143 }
144 xctl.xwrite(mailbox)
145 xctl.xwrite(src)
146 xctl.xreadok()
147 fmt.Fprintln(os.Stderr, "importing...")
148 for {
149 line := xctl.xread()
150 if strings.HasPrefix(line, "progress ") {
151 n := line[len("progress "):]
152 fmt.Fprintf(os.Stderr, "%s...\n", n)
153 continue
154 }
155 if line != "ok" {
156 log.Fatalf("import, expected ok, got %q", line)
157 }
158 break
159 }
160 count := xctl.xread()
161 fmt.Fprintf(os.Stderr, "%s imported\n", count)
162}
163
164func ximportctl(ctx context.Context, xctl *ctl, mbox bool) {
165 /* protocol:
166 > "importmaildir" or "importmbox"
167 > account
168 > mailbox
169 > src (mbox file or maildir directory)
170 < "ok" or error
171 < "progress" count (zero or more times, once for every 1000 messages)
172 < "ok" when done, or error
173 < count (of total imported messages, only if not error)
174 */
175 account := xctl.xread()
176 mailbox := xctl.xread()
177 src := xctl.xread()
178
179 kind := "maildir"
180 if mbox {
181 kind = "mbox"
182 }
183 xctl.log.Info("importing messages",
184 slog.String("kind", kind),
185 slog.String("account", account),
186 slog.String("mailbox", mailbox),
187 slog.String("source", src))
188
189 var err error
190 var mboxf *os.File
191 var mdnewf, mdcurf *os.File
192 var msgreader store.MsgSource
193
194 // Ensure normalized form.
195 mailbox = norm.NFC.String(mailbox)
196 mailbox, _, err = store.CheckMailboxName(mailbox, true)
197 xctl.xcheck(err, "checking mailbox name")
198
199 // Open account, creating a database file if it doesn't exist yet. It must be known
200 // in the configuration file.
201 a, err := store.OpenAccount(xctl.log, account, false)
202 xctl.xcheck(err, "opening account")
203 defer func() {
204 if a != nil {
205 err := a.Close()
206 xctl.log.Check(err, "closing account after import")
207 }
208 }()
209
210 err = a.ThreadingWait(xctl.log)
211 xctl.xcheck(err, "waiting for account thread upgrade")
212
213 defer func() {
214 if mboxf != nil {
215 err := mboxf.Close()
216 xctl.log.Check(err, "closing mbox file after import")
217 }
218 if mdnewf != nil {
219 err := mdnewf.Close()
220 xctl.log.Check(err, "closing maildir new after import")
221 }
222 if mdcurf != nil {
223 err := mdcurf.Close()
224 xctl.log.Check(err, "closing maildir cur after import")
225 }
226 }()
227
228 // Messages don't always have a junk flag set. We'll assume anything in a mailbox
229 // starting with junk or spam is junk mail.
230
231 // First check if we can access the mbox/maildir.
232 // Mox needs to be able to access those files, the user running the import command
233 // may be a different user who can access the files.
234 if mbox {
235 mboxf, err = os.Open(src)
236 xctl.xcheck(err, "open mbox file")
237 msgreader = store.NewMboxReader(xctl.log, store.CreateMessageTemp, src, mboxf)
238 } else {
239 mdnewf, err = os.Open(filepath.Join(src, "new"))
240 xctl.xcheck(err, "open subdir new of maildir")
241 mdcurf, err = os.Open(filepath.Join(src, "cur"))
242 xctl.xcheck(err, "open subdir cur of maildir")
243 msgreader = store.NewMaildirReader(xctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
244 }
245
246 // todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
247 n := 0
248 a.WithWLock(func() {
249 var changes []store.Change
250
251 tx, err := a.DB.Begin(ctx, true)
252 xctl.xcheck(err, "begin transaction")
253 defer func() {
254 if tx != nil {
255 err := tx.Rollback()
256 xctl.log.Check(err, "rolling back transaction")
257 }
258 }()
259
260 // All preparations done. Good to go.
261 xctl.xwriteok()
262
263 // We will be delivering messages. If we fail halfway, we need to remove the created msg files.
264 var newIDs []int64
265 defer func() {
266 x := recover()
267 if x == nil {
268 return
269 }
270
271 if x != xctl.x {
272 xctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
273 debug.PrintStack()
274 metrics.PanicInc(metrics.Import)
275 } else {
276 xctl.log.Error("import error")
277 }
278
279 for _, id := range newIDs {
280 p := a.MessagePath(id)
281 err := os.Remove(p)
282 xctl.log.Check(err, "closing message file after import error", slog.String("path", p))
283 }
284 newIDs = nil
285
286 xctl.xerror(fmt.Sprintf("import error: %v", x))
287 }()
288
289 var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
290
291 // Ensure mailbox exists.
292 var mb store.Mailbox
293 mb, changes, err = a.MailboxEnsure(tx, mailbox, true, store.SpecialUse{}, &modseq)
294 xctl.xcheck(err, "ensuring mailbox exists")
295
296 nkeywords := len(mb.Keywords)
297
298 jf, _, err := a.OpenJunkFilter(ctx, xctl.log)
299 if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
300 xctl.xcheck(err, "open junk filter")
301 }
302 defer func() {
303 if jf != nil {
304 err = jf.CloseDiscard()
305 xctl.xcheck(err, "close junk filter")
306 }
307 }()
308
309 conf, _ := a.Conf()
310
311 maxSize := a.QuotaMessageSize()
312 var addSize int64
313 du := store.DiskUsage{ID: 1}
314 err = tx.Get(&du)
315 xctl.xcheck(err, "get disk usage")
316
317 msgDirs := map[string]struct{}{}
318
319 process := func(m *store.Message, msgf *os.File, origPath string) {
320 defer store.CloseRemoveTempFile(xctl.log, msgf, "message to import")
321
322 addSize += m.Size
323 if maxSize > 0 && du.MessageSize+addSize > maxSize {
324 xctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
325 }
326
327 // Parse message and store parsed information for later fast retrieval.
328 p, err := message.EnsurePart(xctl.log.Logger, false, msgf, m.Size)
329 if err != nil {
330 xctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
331 }
332 m.ParsedBuf, err = json.Marshal(p)
333 xctl.xcheck(err, "marshal parsed message structure")
334
335 // Set fields needed for future threading. By doing it now, MessageAdd won't
336 // have to parse the Part again.
337 p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
338 m.PrepareThreading(xctl.log, &p)
339
340 if m.Received.IsZero() {
341 if p.Envelope != nil && !p.Envelope.Date.IsZero() {
342 m.Received = p.Envelope.Date
343 } else {
344 m.Received = time.Now()
345 }
346 }
347
348 m.JunkFlagsForMailbox(mb, conf)
349 if jf != nil && m.NeedsTraining() {
350 if words, err := jf.ParseMessage(p); err != nil {
351 xctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
352 } else {
353 err = jf.Train(ctx, !m.Junk, words)
354 xctl.xcheck(err, "training junk filter")
355 m.TrainedJunk = &m.Junk
356 }
357 }
358
359 if modseq == 0 {
360 var err error
361 modseq, err = a.NextModSeq(tx)
362 xctl.xcheck(err, "assigning next modseq")
363 mb.ModSeq = modseq
364 }
365
366 m.MailboxID = mb.ID
367 m.MailboxOrigID = mb.ID
368 m.CreateSeq = modseq
369 m.ModSeq = modseq
370
371 // todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
372 opts := store.AddOpts{
373 SkipDirSync: true,
374 SkipTraining: true,
375 SkipThreads: true, // We do this efficiently when we have all messages.
376 SkipUpdateDiskUsage: true, // We do this once at the end.
377 SkipCheckQuota: true, // We check before.
378 SkipPreview: true, // We'll do this on-demand when messages are requested. Saves time.
379 }
380 err = a.MessageAdd(xctl.log, tx, &mb, m, msgf, opts)
381 xctl.xcheck(err, "delivering message")
382 newIDs = append(newIDs, m.ID)
383 changes = append(changes, m.ChangeAddUID())
384
385 msgDirs[filepath.Dir(a.MessagePath(m.ID))] = struct{}{}
386
387 n++
388 if n%1000 == 0 {
389 xctl.xwrite(fmt.Sprintf("progress %d", n))
390 }
391 }
392
393 for {
394 m, msgf, origPath, err := msgreader.Next()
395 if err == io.EOF {
396 break
397 }
398 xctl.xcheck(err, "reading next message")
399
400 process(m, msgf, origPath)
401 }
402
403 // Match threads.
404 if len(newIDs) > 0 {
405 err = a.AssignThreads(ctx, xctl.log, tx, newIDs[0], 0, io.Discard)
406 xctl.xcheck(err, "assigning messages to threads")
407 }
408
409 changes = append(changes, mb.ChangeCounts())
410 if nkeywords != len(mb.Keywords) {
411 changes = append(changes, mb.ChangeKeywords())
412 }
413
414 err = tx.Update(&mb)
415 xctl.xcheck(err, "updating message counts and keywords in mailbox")
416
417 err = a.AddMessageSize(xctl.log, tx, addSize)
418 xctl.xcheck(err, "updating total message size")
419
420 for msgDir := range msgDirs {
421 err := moxio.SyncDir(xctl.log, msgDir)
422 xctl.xcheck(err, "sync dir")
423 }
424
425 if jf != nil {
426 err := jf.Close()
427 xctl.log.Check(err, "close junk filter")
428 jf = nil
429 }
430
431 err = tx.Commit()
432 xctl.xcheck(err, "commit")
433 tx = nil
434 xctl.log.Info("delivered messages through import", slog.Int("count", len(newIDs)))
435 newIDs = nil
436
437 store.BroadcastChanges(a, changes)
438 })
439
440 err = a.Close()
441 xctl.xcheck(err, "closing account")
442 a = nil
443
444 xctl.xwriteok()
445 xctl.xwrite(fmt.Sprintf("%d", n))
446}
447