1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "io/fs"
10 "log/slog"
11 "os"
12 "path/filepath"
13 "strings"
14 "time"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/dmarcdb"
19 "github.com/mjl-/mox/mox-"
20 "github.com/mjl-/mox/moxvar"
21 "github.com/mjl-/mox/mtastsdb"
22 "github.com/mjl-/mox/queue"
23 "github.com/mjl-/mox/store"
24 "github.com/mjl-/mox/tlsrptdb"
25)
26
27func backupctl(ctx context.Context, ctl *ctl) {
28 /* protocol:
29 > "backup"
30 > destdir
31 > "verbose" or ""
32 < stream
33 < "ok" or error
34 */
35
36 // Convention in this function: variables containing "src" or "dst" are file system
37 // paths that can be passed to os.Open and such. Variables with dirs/paths without
38 // "src" or "dst" are incomplete paths relative to the source or destination data
39 // directories.
40
41 dstDataDir := ctl.xread()
42 verbose := ctl.xread() == "verbose"
43
44 // Set when an error is encountered. At the end, we warn if set.
45 var incomplete bool
46
47 // We'll be writing output, and logging both to mox and the ctl stream.
48 writer := ctl.writer()
49
50 // Format easily readable output for the user.
51 formatLog := func(prefix, text string, err error, attrs ...slog.Attr) []byte {
52 var b bytes.Buffer
53 fmt.Fprint(&b, prefix)
54 fmt.Fprint(&b, text)
55 if err != nil {
56 fmt.Fprint(&b, ": "+err.Error())
57 }
58 for _, a := range attrs {
59 fmt.Fprintf(&b, "; %s=%v", a.Key, a.Value)
60 }
61 fmt.Fprint(&b, "\n")
62 return b.Bytes()
63 }
64
65 // Log an error to both the mox service as the user running "mox backup".
66 pkglogx := func(prefix, text string, err error, attrs ...slog.Attr) {
67 ctl.log.Errorx(text, err, attrs...)
68
69 _, werr := writer.Write(formatLog(prefix, text, err, attrs...))
70 ctl.xcheck(werr, "write to ctl")
71 }
72
73 // Log an error but don't mark backup as failed.
74 xwarnx := func(text string, err error, attrs ...slog.Attr) {
75 pkglogx("warning: ", text, err, attrs...)
76 }
77
78 // Log an error that causes the backup to be marked as failed. We typically
79 // continue processing though.
80 xerrx := func(text string, err error, attrs ...slog.Attr) {
81 incomplete = true
82 pkglogx("error: ", text, err, attrs...)
83 }
84
85 // If verbose is enabled, log to the cli command. Always log as info level.
86 xvlog := func(text string, attrs ...slog.Attr) {
87 ctl.log.Info(text, attrs...)
88 if verbose {
89 _, werr := writer.Write(formatLog("", text, nil, attrs...))
90 ctl.xcheck(werr, "write to ctl")
91 }
92 }
93
94 if _, err := os.Stat(dstDataDir); err == nil {
95 xwarnx("destination data directory already exists", nil, slog.String("dir", dstDataDir))
96 }
97
98 srcDataDir := filepath.Clean(mox.DataDirPath("."))
99
100 // When creating a file in the destination, we first ensure its directory exists.
101 // We track which directories we created, to prevent needless syscalls.
102 createdDirs := map[string]struct{}{}
103 ensureDestDir := func(dstpath string) {
104 dstdir := filepath.Dir(dstpath)
105 if _, ok := createdDirs[dstdir]; !ok {
106 err := os.MkdirAll(dstdir, 0770)
107 if err != nil {
108 xerrx("creating directory", err)
109 }
110 createdDirs[dstdir] = struct{}{}
111 }
112 }
113
114 // Backup a single file by copying (never hardlinking, the file may change).
115 backupFile := func(path string) {
116 tmFile := time.Now()
117 srcpath := filepath.Join(srcDataDir, path)
118 dstpath := filepath.Join(dstDataDir, path)
119
120 sf, err := os.Open(srcpath)
121 if err != nil {
122 xerrx("open source file (not backed up)", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
123 return
124 }
125 defer sf.Close()
126
127 ensureDestDir(dstpath)
128 df, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
129 if err != nil {
130 xerrx("creating destination file (not backed up)", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
131 return
132 }
133 defer func() {
134 if df != nil {
135 df.Close()
136 }
137 }()
138 if _, err := io.Copy(df, sf); err != nil {
139 xerrx("copying file (not backed up properly)", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
140 return
141 }
142 err = df.Close()
143 df = nil
144 if err != nil {
145 xerrx("closing destination file (not backed up properly)", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
146 return
147 }
148 xvlog("backed up file", slog.String("path", path), slog.Duration("duration", time.Since(tmFile)))
149 }
150
151 // Back up the files in a directory (by copying).
152 backupDir := func(dir string) {
153 tmDir := time.Now()
154 srcdir := filepath.Join(srcDataDir, dir)
155 dstdir := filepath.Join(dstDataDir, dir)
156 err := filepath.WalkDir(srcdir, func(srcpath string, d fs.DirEntry, err error) error {
157 if err != nil {
158 xerrx("walking file (not backed up)", err, slog.String("srcpath", srcpath))
159 return nil
160 }
161 if d.IsDir() {
162 return nil
163 }
164 backupFile(srcpath[len(srcDataDir)+1:])
165 return nil
166 })
167 if err != nil {
168 xerrx("copying directory (not backed up properly)", err,
169 slog.String("srcdir", srcdir),
170 slog.String("dstdir", dstdir),
171 slog.Duration("duration", time.Since(tmDir)))
172 return
173 }
174 xvlog("backed up directory", slog.String("dir", dir), slog.Duration("duration", time.Since(tmDir)))
175 }
176
177 // Backup a database by copying it in a readonly transaction.
178 // Always logs on error, so caller doesn't have to, but also returns the error so
179 // callers can see result.
180 backupDB := func(db *bstore.DB, path string) (rerr error) {
181 defer func() {
182 if rerr != nil {
183 xerrx("backing up database", rerr, slog.String("path", path))
184 }
185 }()
186
187 tmDB := time.Now()
188
189 dstpath := filepath.Join(dstDataDir, path)
190 ensureDestDir(dstpath)
191 df, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
192 if err != nil {
193 return fmt.Errorf("creating destination file: %v", err)
194 }
195 defer func() {
196 if df != nil {
197 df.Close()
198 }
199 }()
200 err = db.Read(ctx, func(tx *bstore.Tx) error {
201 // Using regular WriteTo seems fine, and fast. It just copies pages.
202 //
203 // bolt.Compact is slower, it writes all key/value pairs, building up new data
204 // structures. My compacted test database was ~60% of original size. Lz4 on the
205 // uncompacted database got it to 14%. Lz4 on the compacted database got it to 13%.
206 // Backups are likely archived somewhere with compression, so we don't compact.
207 //
208 // Tests with WriteTo and os.O_DIRECT were slower than without O_DIRECT, but
209 // probably because everything fit in the page cache. It may be better to use
210 // O_DIRECT when copying many large or inactive databases.
211 _, err := tx.WriteTo(df)
212 return err
213 })
214 if err != nil {
215 return fmt.Errorf("copying database: %v", err)
216 }
217 err = df.Close()
218 df = nil
219 if err != nil {
220 return fmt.Errorf("closing destination database after copy: %v", err)
221 }
222 xvlog("backed up database file", slog.String("path", path), slog.Duration("duration", time.Since(tmDB)))
223 return nil
224 }
225
226 // Try to create a hardlink. Fall back to copying the file (e.g. when on different file system).
227 warnedHardlink := false // We warn once about failing to hardlink.
228 linkOrCopy := func(srcpath, dstpath string) (bool, error) {
229 ensureDestDir(dstpath)
230
231 if err := os.Link(srcpath, dstpath); err == nil {
232 return true, nil
233 } else if os.IsNotExist(err) {
234 // No point in trying with regular copy, we would warn twice.
235 return false, err
236 } else if !warnedHardlink {
237 xwarnx("creating hardlink to message failed, will be doing regular file copies and not warn again", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
238 warnedHardlink = true
239 }
240
241 // Fall back to copying.
242 sf, err := os.Open(srcpath)
243 if err != nil {
244 return false, fmt.Errorf("open source path %s: %v", srcpath, err)
245 }
246 defer func() {
247 err := sf.Close()
248 ctl.log.Check(err, "closing copied source file")
249 }()
250
251 df, err := os.OpenFile(dstpath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0660)
252 if err != nil {
253 return false, fmt.Errorf("create destination path %s: %v", dstpath, err)
254 }
255 defer func() {
256 if df != nil {
257 err := df.Close()
258 ctl.log.Check(err, "closing partial destination file")
259 }
260 }()
261 if _, err := io.Copy(df, sf); err != nil {
262 return false, fmt.Errorf("coping: %v", err)
263 }
264 err = df.Close()
265 df = nil
266 if err != nil {
267 return false, fmt.Errorf("closing destination file: %v", err)
268 }
269 return false, nil
270 }
271
272 // Start making the backup.
273 tmStart := time.Now()
274
275 ctl.log.Print("making backup", slog.String("destdir", dstDataDir))
276
277 err := os.MkdirAll(dstDataDir, 0770)
278 if err != nil {
279 xerrx("creating destination data directory", err)
280 }
281
282 if err := os.WriteFile(filepath.Join(dstDataDir, "moxversion"), []byte(moxvar.Version), 0660); err != nil {
283 xerrx("writing moxversion", err)
284 }
285 backupDB(dmarcdb.ReportsDB, "dmarcrpt.db")
286 backupDB(dmarcdb.EvalDB, "dmarceval.db")
287 backupDB(mtastsdb.DB, "mtasts.db")
288 backupDB(tlsrptdb.ReportDB, "tlsrpt.db")
289 backupDB(tlsrptdb.ResultDB, "tlsrptresult.db")
290 backupFile("receivedid.key")
291
292 // Acme directory is optional.
293 srcAcmeDir := filepath.Join(srcDataDir, "acme")
294 if _, err := os.Stat(srcAcmeDir); err == nil {
295 backupDir("acme")
296 } else if err != nil && !os.IsNotExist(err) {
297 xerrx("copying acme/", err)
298 }
299
300 // Copy the queue database and all message files.
301 backupQueue := func(path string) {
302 tmQueue := time.Now()
303
304 if err := backupDB(queue.DB, path); err != nil {
305 xerrx("queue not backed up", err, slog.String("path", path), slog.Duration("duration", time.Since(tmQueue)))
306 return
307 }
308
309 dstdbpath := filepath.Join(dstDataDir, path)
310 db, err := bstore.Open(ctx, dstdbpath, &bstore.Options{MustExist: true}, queue.DBTypes...)
311 if err != nil {
312 xerrx("open copied queue database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmQueue)))
313 return
314 }
315
316 defer func() {
317 if db != nil {
318 err := db.Close()
319 ctl.log.Check(err, "closing new queue db")
320 }
321 }()
322
323 // Link/copy known message files. Warn if files are missing or unexpected
324 // (though a message file could have been removed just now due to delivery, or a
325 // new message may have been queued).
326 tmMsgs := time.Now()
327 seen := map[string]struct{}{}
328 var nlinked, ncopied int
329 err = bstore.QueryDB[queue.Msg](ctx, db).ForEach(func(m queue.Msg) error {
330 mp := store.MessagePath(m.ID)
331 seen[mp] = struct{}{}
332 srcpath := filepath.Join(srcDataDir, "queue", mp)
333 dstpath := filepath.Join(dstDataDir, "queue", mp)
334 if linked, err := linkOrCopy(srcpath, dstpath); err != nil {
335 xerrx("linking/copying queue message", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
336 } else if linked {
337 nlinked++
338 } else {
339 ncopied++
340 }
341 return nil
342 })
343 if err != nil {
344 xerrx("processing queue messages (not backed up properly)", err, slog.Duration("duration", time.Since(tmMsgs)))
345 } else {
346 xvlog("queue message files linked/copied",
347 slog.Int("linked", nlinked),
348 slog.Int("copied", ncopied),
349 slog.Duration("duration", time.Since(tmMsgs)))
350 }
351
352 // Read through all files in queue directory and warn about anything we haven't handled yet.
353 tmWalk := time.Now()
354 srcqdir := filepath.Join(srcDataDir, "queue")
355 err = filepath.WalkDir(srcqdir, func(srcqpath string, d fs.DirEntry, err error) error {
356 if err != nil {
357 xerrx("walking files in queue", err, slog.String("srcpath", srcqpath))
358 return nil
359 }
360 if d.IsDir() {
361 return nil
362 }
363 p := srcqpath[len(srcqdir)+1:]
364 if _, ok := seen[p]; ok {
365 return nil
366 }
367 if p == "index.db" {
368 return nil
369 }
370 qp := filepath.Join("queue", p)
371 xwarnx("backing up unrecognized file in queue directory", nil, slog.String("path", qp))
372 backupFile(qp)
373 return nil
374 })
375 if err != nil {
376 xerrx("walking queue directory (not backed up properly)", err, slog.String("dir", "queue"), slog.Duration("duration", time.Since(tmWalk)))
377 } else {
378 xvlog("walked queue directory", slog.Duration("duration", time.Since(tmWalk)))
379 }
380
381 xvlog("queue backed finished", slog.Duration("duration", time.Since(tmQueue)))
382 }
383 backupQueue(filepath.FromSlash("queue/index.db"))
384
385 backupAccount := func(acc *store.Account) {
386 defer acc.Close()
387
388 tmAccount := time.Now()
389
390 // Copy database file.
391 dbpath := filepath.Join("accounts", acc.Name, "index.db")
392 err := backupDB(acc.DB, dbpath)
393 if err != nil {
394 xerrx("copying account database", err, slog.String("path", dbpath), slog.Duration("duration", time.Since(tmAccount)))
395 }
396
397 // todo: should document/check not taking a rlock on account.
398
399 // Copy junkfilter files, if configured.
400 if jf, _, err := acc.OpenJunkFilter(ctx, ctl.log); err != nil {
401 if !errors.Is(err, store.ErrNoJunkFilter) {
402 xerrx("opening junk filter for account (not backed up)", err)
403 }
404 } else {
405 db := jf.DB()
406 jfpath := filepath.Join("accounts", acc.Name, "junkfilter.db")
407 backupDB(db, jfpath)
408 bloompath := filepath.Join("accounts", acc.Name, "junkfilter.bloom")
409 backupFile(bloompath)
410 db = nil
411 err := jf.Close()
412 ctl.log.Check(err, "closing junkfilter")
413 }
414
415 dstdbpath := filepath.Join(dstDataDir, dbpath)
416 db, err := bstore.Open(ctx, dstdbpath, &bstore.Options{MustExist: true}, store.DBTypes...)
417 if err != nil {
418 xerrx("open copied account database", err, slog.String("dstpath", dstdbpath), slog.Duration("duration", time.Since(tmAccount)))
419 return
420 }
421
422 defer func() {
423 if db != nil {
424 err := db.Close()
425 ctl.log.Check(err, "close account database")
426 }
427 }()
428
429 // Link/copy known message files. Warn if files are missing or unexpected (though a
430 // message file could have been added just now due to delivery, or a message have
431 // been removed).
432 tmMsgs := time.Now()
433 seen := map[string]struct{}{}
434 var nlinked, ncopied int
435 err = bstore.QueryDB[store.Message](ctx, db).FilterEqual("Expunged", false).ForEach(func(m store.Message) error {
436 mp := store.MessagePath(m.ID)
437 seen[mp] = struct{}{}
438 amp := filepath.Join("accounts", acc.Name, "msg", mp)
439 srcpath := filepath.Join(srcDataDir, amp)
440 dstpath := filepath.Join(dstDataDir, amp)
441 if linked, err := linkOrCopy(srcpath, dstpath); err != nil {
442 xerrx("linking/copying account message", err, slog.String("srcpath", srcpath), slog.String("dstpath", dstpath))
443 } else if linked {
444 nlinked++
445 } else {
446 ncopied++
447 }
448 return nil
449 })
450 if err != nil {
451 xerrx("processing account messages (not backed up properly)", err, slog.Duration("duration", time.Since(tmMsgs)))
452 } else {
453 xvlog("account message files linked/copied",
454 slog.Int("linked", nlinked),
455 slog.Int("copied", ncopied),
456 slog.Duration("duration", time.Since(tmMsgs)))
457 }
458
459 // Read through all files in account directory and warn about anything we haven't handled yet.
460 tmWalk := time.Now()
461 srcadir := filepath.Join(srcDataDir, "accounts", acc.Name)
462 err = filepath.WalkDir(srcadir, func(srcapath string, d fs.DirEntry, err error) error {
463 if err != nil {
464 xerrx("walking files in account", err, slog.String("srcpath", srcapath))
465 return nil
466 }
467 if d.IsDir() {
468 return nil
469 }
470 p := srcapath[len(srcadir)+1:]
471 l := strings.Split(p, string(filepath.Separator))
472 if l[0] == "msg" {
473 mp := filepath.Join(l[1:]...)
474 if _, ok := seen[mp]; ok {
475 return nil
476 }
477 }
478 switch p {
479 case "index.db", "junkfilter.db", "junkfilter.bloom":
480 return nil
481 }
482 ap := filepath.Join("accounts", acc.Name, p)
483 if strings.HasPrefix(p, "msg"+string(filepath.Separator)) {
484 xwarnx("backing up unrecognized file in account message directory (should be moved away)", nil, slog.String("path", ap))
485 } else {
486 xwarnx("backing up unrecognized file in account directory", nil, slog.String("path", ap))
487 }
488 backupFile(ap)
489 return nil
490 })
491 if err != nil {
492 xerrx("walking account directory (not backed up properly)", err, slog.String("srcdir", srcadir), slog.Duration("duration", time.Since(tmWalk)))
493 } else {
494 xvlog("walked account directory", slog.Duration("duration", time.Since(tmWalk)))
495 }
496
497 xvlog("account backup finished", slog.String("dir", filepath.Join("accounts", acc.Name)), slog.Duration("duration", time.Since(tmAccount)))
498 }
499
500 // For each configured account, open it, make a copy of the database and
501 // hardlink/copy the messages. We track the accounts we handled, and skip the
502 // account directories when handling "all other files" below.
503 accounts := map[string]struct{}{}
504 for _, accName := range mox.Conf.Accounts() {
505 acc, err := store.OpenAccount(ctl.log, accName)
506 if err != nil {
507 xerrx("opening account for copying (will try to copy as regular files later)", err, slog.String("account", accName))
508 continue
509 }
510 accounts[accName] = struct{}{}
511 backupAccount(acc)
512 }
513
514 // Copy all other files, that aren't part of the known files, databases, queue or accounts.
515 tmWalk := time.Now()
516 err = filepath.WalkDir(srcDataDir, func(srcpath string, d fs.DirEntry, err error) error {
517 if err != nil {
518 xerrx("walking path", err, slog.String("path", srcpath))
519 return nil
520 }
521
522 if srcpath == srcDataDir {
523 return nil
524 }
525 p := srcpath[len(srcDataDir)+1:]
526 if p == "queue" || p == "acme" || p == "tmp" {
527 return fs.SkipDir
528 }
529 l := strings.Split(p, string(filepath.Separator))
530 if len(l) >= 2 && l[0] == "accounts" {
531 name := l[1]
532 if _, ok := accounts[name]; ok {
533 return fs.SkipDir
534 }
535 }
536
537 // Only files are explicitly backed up.
538 if d.IsDir() {
539 return nil
540 }
541
542 switch p {
543 case "dmarcrpt.db", "dmarceval.db", "mtasts.db", "tlsrpt.db", "tlsrptresult.db", "receivedid.key", "ctl":
544 // Already handled.
545 return nil
546 case "lastknownversion": // Optional file, not yet handled.
547 default:
548 xwarnx("backing up unrecognized file", nil, slog.String("path", p))
549 }
550 backupFile(p)
551 return nil
552 })
553 if err != nil {
554 xerrx("walking other files (not backed up properly)", err, slog.Duration("duration", time.Since(tmWalk)))
555 } else {
556 xvlog("walking other files finished", slog.Duration("duration", time.Since(tmWalk)))
557 }
558
559 xvlog("backup finished", slog.Duration("duration", time.Since(tmStart)))
560
561 writer.xclose()
562
563 if incomplete {
564 ctl.xwrite("errors were encountered during backup")
565 } else {
566 ctl.xwriteok()
567 }
568}
569