1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "log/slog"
12 "maps"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime/debug"
17 "sort"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/mjl-/bstore"
23
24 "github.com/mjl-/mox/config"
25 "github.com/mjl-/mox/dns"
26 "github.com/mjl-/mox/message"
27 "github.com/mjl-/mox/metrics"
28 "github.com/mjl-/mox/mlog"
29 "github.com/mjl-/mox/mox-"
30 "github.com/mjl-/mox/queue"
31 "github.com/mjl-/mox/smtp"
32 "github.com/mjl-/mox/store"
33 "github.com/mjl-/mox/webapi"
34)
35
36// ctl represents a connection to the ctl unix domain socket of a running mox instance.
37// ctl provides functions to read/write commands/responses/data streams.
38type ctl struct {
39 cmd string // Set for server-side of commands.
40 conn net.Conn
41 r *bufio.Reader // Set for first reader.
42 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
43 log mlog.Log // If set, along with x, logging is done here.
44}
45
46// xctl opens a ctl connection.
47func xctl() *ctl {
48 p := mox.DataDirPath("ctl")
49 conn, err := net.Dial("unix", p)
50 if err != nil {
51 log.Fatalf("connecting to control socket at %q: %v", p, err)
52 }
53 ctl := &ctl{conn: conn}
54 version := ctl.xread()
55 if version != "ctlv0" {
56 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
57 }
58 return ctl
59}
60
61// Interpret msg as an error.
62// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
63func (c *ctl) xerror(msg string) {
64 if c.x == nil {
65 log.Fatalln(msg)
66 }
67 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
68 c.xwrite(msg)
69 panic(c.x)
70}
71
72// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
73// ctl.x is set, the error string is written to ctl, to be interpreted as an error
74// by the command reading from ctl.
75func (c *ctl) xcheck(err error, msg string) {
76 if err == nil {
77 return
78 }
79 if c.x == nil {
80 log.Fatalf("%s: %s", msg, err)
81 }
82 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
83 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
84 panic(c.x)
85}
86
87// Read a line and return it without trailing newline.
88func (c *ctl) xread() string {
89 if c.r == nil {
90 c.r = bufio.NewReader(c.conn)
91 }
92 line, err := c.r.ReadString('\n')
93 c.xcheck(err, "read from ctl")
94 return strings.TrimSuffix(line, "\n")
95}
96
97// Read a line. If not "ok", the string is interpreted as an error.
98func (c *ctl) xreadok() {
99 line := c.xread()
100 if line != "ok" {
101 c.xerror(line)
102 }
103}
104
105// Write a string, typically a command or parameter.
106func (c *ctl) xwrite(text string) {
107 _, err := fmt.Fprintln(c.conn, text)
108 c.xcheck(err, "write")
109}
110
111// Write "ok" to indicate success.
112func (c *ctl) xwriteok() {
113 c.xwrite("ok")
114}
115
116// Copy data from a stream from ctl to dst.
117func (c *ctl) xstreamto(dst io.Writer) {
118 _, err := io.Copy(dst, c.reader())
119 c.xcheck(err, "reading message")
120}
121
122// Copy data from src to a stream to ctl.
123func (c *ctl) xstreamfrom(src io.Reader) {
124 w := c.writer()
125 _, err := io.Copy(w, src)
126 c.xcheck(err, "copying")
127 w.xclose()
128}
129
130// Writer returns an io.Writer for a data stream to ctl.
131// When done writing, caller must call xclose to signal the end of the stream.
132// Behaviour of "x" is copied from ctl.
133func (c *ctl) writer() *ctlwriter {
134 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
135}
136
137// Reader returns an io.Reader for a data stream from ctl.
138// Behaviour of "x" is copied from ctl.
139func (c *ctl) reader() *ctlreader {
140 if c.r == nil {
141 c.r = bufio.NewReader(c.conn)
142 }
143 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
144}
145
146/*
147Ctlwriter and ctlreader implement the writing and reading a data stream. They
148implement the io.Writer and io.Reader interface. In the protocol below each
149non-data message ends with a newline that is typically stripped when
150interpreting.
151
152Zero or more data transactions:
153
154 > "123" (for data size) or an error message
155 > data, 123 bytes
156 < "ok" or an error message
157
158Followed by a end of stream indicated by zero data bytes message:
159
160 > "0"
161*/
162
163type ctlwriter struct {
164 cmd string // Set for server-side of commands.
165 conn net.Conn // Ctl socket from which messages are read.
166 buf []byte // Scratch buffer, for reading response.
167 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
168 log mlog.Log
169}
170
171func (s *ctlwriter) Write(buf []byte) (int, error) {
172 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
173 s.xcheck(err, "write count")
174 _, err = s.conn.Write(buf)
175 s.xcheck(err, "write data")
176 if s.buf == nil {
177 s.buf = make([]byte, 512)
178 }
179 n, err := s.conn.Read(s.buf)
180 s.xcheck(err, "reading response to write")
181 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
182 if line != "ok" {
183 s.xerror(line)
184 }
185 return len(buf), nil
186}
187
188func (s *ctlwriter) xerror(msg string) {
189 if s.x == nil {
190 log.Fatalln(msg)
191 } else {
192 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
193 panic(s.x)
194 }
195}
196
197func (s *ctlwriter) xcheck(err error, msg string) {
198 if err == nil {
199 return
200 }
201 if s.x == nil {
202 log.Fatalf("%s: %s", msg, err)
203 } else {
204 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
205 panic(s.x)
206 }
207}
208
209func (s *ctlwriter) xclose() {
210 _, err := fmt.Fprintf(s.conn, "0\n")
211 s.xcheck(err, "write eof")
212}
213
214type ctlreader struct {
215 cmd string // Set for server-side of command.
216 conn net.Conn // For writing "ok" after reading.
217 r *bufio.Reader // Buffered ctl socket.
218 err error // If set, returned for each read. can also be io.EOF.
219 npending int // Number of bytes that can still be read until a new count line must be read.
220 x any // If set, errors are handled with panic(x) instead of log.Fatal.
221 log mlog.Log // If x is set, logging goes to log.
222}
223
224func (s *ctlreader) Read(buf []byte) (N int, Err error) {
225 if s.err != nil {
226 return 0, s.err
227 }
228 if s.npending == 0 {
229 line, err := s.r.ReadString('\n')
230 s.xcheck(err, "reading count")
231 line = strings.TrimSuffix(line, "\n")
232 n, err := strconv.ParseInt(line, 10, 32)
233 if err != nil {
234 s.xerror(line)
235 }
236 if n == 0 {
237 s.err = io.EOF
238 return 0, s.err
239 }
240 s.npending = int(n)
241 }
242 rn := len(buf)
243 if rn > s.npending {
244 rn = s.npending
245 }
246 n, err := s.r.Read(buf[:rn])
247 s.xcheck(err, "read from ctl")
248 s.npending -= n
249 if s.npending == 0 {
250 _, err = fmt.Fprintln(s.conn, "ok")
251 s.xcheck(err, "writing ok after reading")
252 }
253 return n, err
254}
255
256func (s *ctlreader) xerror(msg string) {
257 if s.x == nil {
258 log.Fatalln(msg)
259 } else {
260 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
261 panic(s.x)
262 }
263}
264
265func (s *ctlreader) xcheck(err error, msg string) {
266 if err == nil {
267 return
268 }
269 if s.x == nil {
270 log.Fatalf("%s: %s", msg, err)
271 } else {
272 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
273 panic(s.x)
274 }
275}
276
277// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
278func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
279 log.Debug("ctl connection")
280
281 var stop = struct{}{} // Sentinel value for panic and recover.
282 ctl := &ctl{conn: conn, x: stop, log: log}
283 defer func() {
284 x := recover()
285 if x == nil || x == stop {
286 return
287 }
288 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
289 debug.PrintStack()
290 metrics.PanicInc(metrics.Ctl)
291 }()
292
293 defer conn.Close()
294
295 ctl.xwrite("ctlv0")
296 for {
297 servectlcmd(ctx, ctl, shutdown)
298 }
299}
300
301func xparseJSON(ctl *ctl, s string, v any) {
302 dec := json.NewDecoder(strings.NewReader(s))
303 dec.DisallowUnknownFields()
304 err := dec.Decode(v)
305 ctl.xcheck(err, "parsing from ctl as json")
306}
307
308func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
309 log := ctl.log
310 cmd := ctl.xread()
311 ctl.cmd = cmd
312 log.Info("ctl command", slog.String("cmd", cmd))
313 switch cmd {
314 case "stop":
315 shutdown()
316 os.Exit(0)
317
318 case "deliver":
319 /* The protocol, double quoted are literals.
320
321 > "deliver"
322 > address
323 < "ok"
324 > stream
325 < "ok"
326 */
327
328 to := ctl.xread()
329 a, addr, err := store.OpenEmail(log, to)
330 ctl.xcheck(err, "lookup destination address")
331
332 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
333 ctl.xcheck(err, "creating temporary message file")
334 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
335 mw := message.NewWriter(msgFile)
336 ctl.xwriteok()
337
338 ctl.xstreamto(mw)
339 err = msgFile.Sync()
340 ctl.xcheck(err, "syncing message to storage")
341
342 m := store.Message{
343 Received: time.Now(),
344 Size: mw.Size,
345 }
346
347 a.WithWLock(func() {
348 err := a.DeliverDestination(log, addr, &m, msgFile)
349 ctl.xcheck(err, "delivering message")
350 log.Info("message delivered through ctl", slog.Any("to", to))
351 })
352
353 err = a.Close()
354 ctl.xcheck(err, "closing account")
355 ctl.xwriteok()
356
357 case "setaccountpassword":
358 /* protocol:
359 > "setaccountpassword"
360 > account
361 > password
362 < "ok" or error
363 */
364
365 account := ctl.xread()
366 pw := ctl.xread()
367
368 acc, err := store.OpenAccount(log, account)
369 ctl.xcheck(err, "open account")
370 defer func() {
371 if acc != nil {
372 err := acc.Close()
373 log.Check(err, "closing account after setting password")
374 }
375 }()
376
377 err = acc.SetPassword(log, pw)
378 ctl.xcheck(err, "setting password")
379 err = acc.Close()
380 ctl.xcheck(err, "closing account")
381 acc = nil
382 ctl.xwriteok()
383
384 case "queueholdruleslist":
385 /* protocol:
386 > "queueholdruleslist"
387 < "ok"
388 < stream
389 */
390 l, err := queue.HoldRuleList(ctx)
391 ctl.xcheck(err, "listing hold rules")
392 ctl.xwriteok()
393 xw := ctl.writer()
394 fmt.Fprintln(xw, "hold rules:")
395 for _, hr := range l {
396 var elems []string
397 if hr.Account != "" {
398 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
399 }
400 var zerodom dns.Domain
401 if hr.SenderDomain != zerodom {
402 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
403 }
404 if hr.RecipientDomain != zerodom {
405 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
406 }
407 if len(elems) == 0 {
408 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
409 } else {
410 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
411 }
412 }
413 if len(l) == 0 {
414 fmt.Fprint(xw, "(none)\n")
415 }
416 xw.xclose()
417
418 case "queueholdrulesadd":
419 /* protocol:
420 > "queueholdrulesadd"
421 > account
422 > senderdomainstr
423 > recipientdomainstr
424 < "ok" or error
425 */
426 var hr queue.HoldRule
427 hr.Account = ctl.xread()
428 senderdomstr := ctl.xread()
429 rcptdomstr := ctl.xread()
430 var err error
431 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
432 ctl.xcheck(err, "parsing sender domain")
433 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
434 ctl.xcheck(err, "parsing recipient domain")
435 hr, err = queue.HoldRuleAdd(ctx, log, hr)
436 ctl.xcheck(err, "add hold rule")
437 ctl.xwriteok()
438
439 case "queueholdrulesremove":
440 /* protocol:
441 > "queueholdrulesremove"
442 > id
443 < "ok" or error
444 */
445 id, err := strconv.ParseInt(ctl.xread(), 10, 64)
446 ctl.xcheck(err, "parsing id")
447 err = queue.HoldRuleRemove(ctx, log, id)
448 ctl.xcheck(err, "remove hold rule")
449 ctl.xwriteok()
450
451 case "queuelist":
452 /* protocol:
453 > "queuelist"
454 > filters as json
455 > sort as json
456 < "ok"
457 < stream
458 */
459 var f queue.Filter
460 xparseJSON(ctl, ctl.xread(), &f)
461 var s queue.Sort
462 xparseJSON(ctl, ctl.xread(), &s)
463 qmsgs, err := queue.List(ctx, f, s)
464 ctl.xcheck(err, "listing queue")
465 ctl.xwriteok()
466
467 xw := ctl.writer()
468 fmt.Fprintln(xw, "messages:")
469 for _, qm := range qmsgs {
470 var lastAttempt string
471 if qm.LastAttempt != nil {
472 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
473 }
474 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
475 }
476 if len(qmsgs) == 0 {
477 fmt.Fprint(xw, "(none)\n")
478 }
479 xw.xclose()
480
481 case "queueholdset":
482 /* protocol:
483 > "queueholdset"
484 > queuefilters as json
485 > "true" or "false"
486 < "ok" or error
487 < count
488 */
489
490 var f queue.Filter
491 xparseJSON(ctl, ctl.xread(), &f)
492 hold := ctl.xread() == "true"
493 count, err := queue.HoldSet(ctx, f, hold)
494 ctl.xcheck(err, "setting on hold status for messages")
495 ctl.xwriteok()
496 ctl.xwrite(fmt.Sprintf("%d", count))
497
498 case "queueschedule":
499 /* protocol:
500 > "queueschedule"
501 > queuefilters as json
502 > relative to now
503 > duration
504 < "ok" or error
505 < count
506 */
507
508 var f queue.Filter
509 xparseJSON(ctl, ctl.xread(), &f)
510 relnow := ctl.xread()
511 d, err := time.ParseDuration(ctl.xread())
512 ctl.xcheck(err, "parsing duration for next delivery attempt")
513 var count int
514 if relnow == "" {
515 count, err = queue.NextAttemptAdd(ctx, f, d)
516 } else {
517 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
518 }
519 ctl.xcheck(err, "setting next delivery attempts in queue")
520 ctl.xwriteok()
521 ctl.xwrite(fmt.Sprintf("%d", count))
522
523 case "queuetransport":
524 /* protocol:
525 > "queuetransport"
526 > queuefilters as json
527 > transport
528 < "ok" or error
529 < count
530 */
531
532 var f queue.Filter
533 xparseJSON(ctl, ctl.xread(), &f)
534 transport := ctl.xread()
535 count, err := queue.TransportSet(ctx, f, transport)
536 ctl.xcheck(err, "adding to next delivery attempts in queue")
537 ctl.xwriteok()
538 ctl.xwrite(fmt.Sprintf("%d", count))
539
540 case "queuerequiretls":
541 /* protocol:
542 > "queuerequiretls"
543 > queuefilters as json
544 > reqtls (empty string, "true" or "false")
545 < "ok" or error
546 < count
547 */
548
549 var f queue.Filter
550 xparseJSON(ctl, ctl.xread(), &f)
551 reqtls := ctl.xread()
552 var req *bool
553 switch reqtls {
554 case "":
555 case "true":
556 v := true
557 req = &v
558 case "false":
559 v := false
560 req = &v
561 default:
562 ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
563 }
564 count, err := queue.RequireTLSSet(ctx, f, req)
565 ctl.xcheck(err, "setting tls requirements on messages in queue")
566 ctl.xwriteok()
567 ctl.xwrite(fmt.Sprintf("%d", count))
568
569 case "queuefail":
570 /* protocol:
571 > "queuefail"
572 > queuefilters as json
573 < "ok" or error
574 < count
575 */
576
577 var f queue.Filter
578 xparseJSON(ctl, ctl.xread(), &f)
579 count, err := queue.Fail(ctx, log, f)
580 ctl.xcheck(err, "marking messages from queue as failed")
581 ctl.xwriteok()
582 ctl.xwrite(fmt.Sprintf("%d", count))
583
584 case "queuedrop":
585 /* protocol:
586 > "queuedrop"
587 > queuefilters as json
588 < "ok" or error
589 < count
590 */
591
592 var f queue.Filter
593 xparseJSON(ctl, ctl.xread(), &f)
594 count, err := queue.Drop(ctx, log, f)
595 ctl.xcheck(err, "dropping messages from queue")
596 ctl.xwriteok()
597 ctl.xwrite(fmt.Sprintf("%d", count))
598
599 case "queuedump":
600 /* protocol:
601 > "queuedump"
602 > id
603 < "ok" or error
604 < stream
605 */
606
607 idstr := ctl.xread()
608 id, err := strconv.ParseInt(idstr, 10, 64)
609 if err != nil {
610 ctl.xcheck(err, "parsing id")
611 }
612 mr, err := queue.OpenMessage(ctx, id)
613 ctl.xcheck(err, "opening message")
614 defer func() {
615 err := mr.Close()
616 log.Check(err, "closing message from queue")
617 }()
618 ctl.xwriteok()
619 ctl.xstreamfrom(mr)
620
621 case "queueretiredlist":
622 /* protocol:
623 > "queueretiredlist"
624 > filters as json
625 > sort as json
626 < "ok"
627 < stream
628 */
629 var f queue.RetiredFilter
630 xparseJSON(ctl, ctl.xread(), &f)
631 var s queue.RetiredSort
632 xparseJSON(ctl, ctl.xread(), &s)
633 qmsgs, err := queue.RetiredList(ctx, f, s)
634 ctl.xcheck(err, "listing retired queue")
635 ctl.xwriteok()
636
637 xw := ctl.writer()
638 fmt.Fprintln(xw, "retired messages:")
639 for _, qm := range qmsgs {
640 var lastAttempt string
641 if qm.LastAttempt != nil {
642 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
643 }
644 result := "failure"
645 if qm.Success {
646 result = "success"
647 }
648 sender, err := qm.Sender()
649 xcheckf(err, "parsing sender")
650 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
651 }
652 if len(qmsgs) == 0 {
653 fmt.Fprint(xw, "(none)\n")
654 }
655 xw.xclose()
656
657 case "queueretiredprint":
658 /* protocol:
659 > "queueretiredprint"
660 > id
661 < "ok"
662 < stream
663 */
664 idstr := ctl.xread()
665 id, err := strconv.ParseInt(idstr, 10, 64)
666 if err != nil {
667 ctl.xcheck(err, "parsing id")
668 }
669 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
670 ctl.xcheck(err, "getting retired messages")
671 if len(l) == 0 {
672 ctl.xcheck(errors.New("not found"), "getting retired message")
673 }
674 m := l[0]
675 ctl.xwriteok()
676 xw := ctl.writer()
677 enc := json.NewEncoder(xw)
678 enc.SetIndent("", "\t")
679 err = enc.Encode(m)
680 ctl.xcheck(err, "encode retired message")
681 xw.xclose()
682
683 case "queuehooklist":
684 /* protocol:
685 > "queuehooklist"
686 > filters as json
687 > sort as json
688 < "ok"
689 < stream
690 */
691 var f queue.HookFilter
692 xparseJSON(ctl, ctl.xread(), &f)
693 var s queue.HookSort
694 xparseJSON(ctl, ctl.xread(), &s)
695 hooks, err := queue.HookList(ctx, f, s)
696 ctl.xcheck(err, "listing webhooks")
697 ctl.xwriteok()
698
699 xw := ctl.writer()
700 fmt.Fprintln(xw, "webhooks:")
701 for _, h := range hooks {
702 var lastAttempt string
703 if len(h.Results) > 0 {
704 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
705 }
706 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
707 }
708 if len(hooks) == 0 {
709 fmt.Fprint(xw, "(none)\n")
710 }
711 xw.xclose()
712
713 case "queuehookschedule":
714 /* protocol:
715 > "queuehookschedule"
716 > hookfilters as json
717 > relative to now
718 > duration
719 < "ok" or error
720 < count
721 */
722
723 var f queue.HookFilter
724 xparseJSON(ctl, ctl.xread(), &f)
725 relnow := ctl.xread()
726 d, err := time.ParseDuration(ctl.xread())
727 ctl.xcheck(err, "parsing duration for next delivery attempt")
728 var count int
729 if relnow == "" {
730 count, err = queue.HookNextAttemptAdd(ctx, f, d)
731 } else {
732 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
733 }
734 ctl.xcheck(err, "setting next delivery attempts in queue")
735 ctl.xwriteok()
736 ctl.xwrite(fmt.Sprintf("%d", count))
737
738 case "queuehookcancel":
739 /* protocol:
740 > "queuehookcancel"
741 > hookfilters as json
742 < "ok" or error
743 < count
744 */
745
746 var f queue.HookFilter
747 xparseJSON(ctl, ctl.xread(), &f)
748 count, err := queue.HookCancel(ctx, log, f)
749 ctl.xcheck(err, "canceling webhooks in queue")
750 ctl.xwriteok()
751 ctl.xwrite(fmt.Sprintf("%d", count))
752
753 case "queuehookprint":
754 /* protocol:
755 > "queuehookprint"
756 > id
757 < "ok"
758 < stream
759 */
760 idstr := ctl.xread()
761 id, err := strconv.ParseInt(idstr, 10, 64)
762 if err != nil {
763 ctl.xcheck(err, "parsing id")
764 }
765 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
766 ctl.xcheck(err, "getting webhooks")
767 if len(l) == 0 {
768 ctl.xcheck(errors.New("not found"), "getting webhook")
769 }
770 h := l[0]
771 ctl.xwriteok()
772 xw := ctl.writer()
773 enc := json.NewEncoder(xw)
774 enc.SetIndent("", "\t")
775 err = enc.Encode(h)
776 ctl.xcheck(err, "encode webhook")
777 xw.xclose()
778
779 case "queuehookretiredlist":
780 /* protocol:
781 > "queuehookretiredlist"
782 > filters as json
783 > sort as json
784 < "ok"
785 < stream
786 */
787 var f queue.HookRetiredFilter
788 xparseJSON(ctl, ctl.xread(), &f)
789 var s queue.HookRetiredSort
790 xparseJSON(ctl, ctl.xread(), &s)
791 l, err := queue.HookRetiredList(ctx, f, s)
792 ctl.xcheck(err, "listing retired webhooks")
793 ctl.xwriteok()
794
795 xw := ctl.writer()
796 fmt.Fprintln(xw, "retired webhooks:")
797 for _, h := range l {
798 var lastAttempt string
799 if len(h.Results) > 0 {
800 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
801 }
802 result := "success"
803 if !h.Success {
804 result = "failure"
805 }
806 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
807 }
808 if len(l) == 0 {
809 fmt.Fprint(xw, "(none)\n")
810 }
811 xw.xclose()
812
813 case "queuehookretiredprint":
814 /* protocol:
815 > "queuehookretiredprint"
816 > id
817 < "ok"
818 < stream
819 */
820 idstr := ctl.xread()
821 id, err := strconv.ParseInt(idstr, 10, 64)
822 if err != nil {
823 ctl.xcheck(err, "parsing id")
824 }
825 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
826 ctl.xcheck(err, "getting retired webhooks")
827 if len(l) == 0 {
828 ctl.xcheck(errors.New("not found"), "getting retired webhook")
829 }
830 h := l[0]
831 ctl.xwriteok()
832 xw := ctl.writer()
833 enc := json.NewEncoder(xw)
834 enc.SetIndent("", "\t")
835 err = enc.Encode(h)
836 ctl.xcheck(err, "encode retired webhook")
837 xw.xclose()
838
839 case "queuesuppresslist":
840 /* protocol:
841 > "queuesuppresslist"
842 > account (or empty)
843 < "ok" or error
844 < stream
845 */
846
847 account := ctl.xread()
848 l, err := queue.SuppressionList(ctx, account)
849 ctl.xcheck(err, "listing suppressions")
850 ctl.xwriteok()
851 xw := ctl.writer()
852 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
853 for _, sup := range l {
854 manual := "No"
855 if sup.Manual {
856 manual = "Yes"
857 }
858 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
859 }
860 if len(l) == 0 {
861 fmt.Fprintln(xw, "(none)")
862 }
863 xw.xclose()
864
865 case "queuesuppressadd":
866 /* protocol:
867 > "queuesuppressadd"
868 > account
869 > address
870 < "ok" or error
871 */
872
873 account := ctl.xread()
874 address := ctl.xread()
875 _, ok := mox.Conf.Account(account)
876 if !ok {
877 ctl.xcheck(errors.New("unknown account"), "looking up account")
878 }
879 addr, err := smtp.ParseAddress(address)
880 ctl.xcheck(err, "parsing address")
881 sup := webapi.Suppression{
882 Account: account,
883 Manual: true,
884 Reason: "added through mox cli",
885 }
886 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
887 ctl.xcheck(err, "adding suppression")
888 ctl.xwriteok()
889
890 case "queuesuppressremove":
891 /* protocol:
892 > "queuesuppressremove"
893 > account
894 > address
895 < "ok" or error
896 */
897
898 account := ctl.xread()
899 address := ctl.xread()
900 addr, err := smtp.ParseAddress(address)
901 ctl.xcheck(err, "parsing address")
902 err = queue.SuppressionRemove(ctx, account, addr.Path())
903 ctl.xcheck(err, "removing suppression")
904 ctl.xwriteok()
905
906 case "queuesuppresslookup":
907 /* protocol:
908 > "queuesuppresslookup"
909 > account or empty
910 > address
911 < "ok" or error
912 < stream
913 */
914
915 account := ctl.xread()
916 address := ctl.xread()
917 if account != "" {
918 _, ok := mox.Conf.Account(account)
919 if !ok {
920 ctl.xcheck(errors.New("unknown account"), "looking up account")
921 }
922 }
923 addr, err := smtp.ParseAddress(address)
924 ctl.xcheck(err, "parsing address")
925 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
926 ctl.xcheck(err, "looking up suppression")
927 ctl.xwriteok()
928 xw := ctl.writer()
929 if sup == nil {
930 fmt.Fprintln(xw, "not present")
931 } else {
932 manual := "no"
933 if sup.Manual {
934 manual = "yes"
935 }
936 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
937 }
938 xw.xclose()
939
940 case "importmaildir", "importmbox":
941 mbox := cmd == "importmbox"
942 importctl(ctx, ctl, mbox)
943
944 case "domainadd":
945 /* protocol:
946 > "domainadd"
947 > domain
948 > account
949 > localpart
950 < "ok" or error
951 */
952 domain := ctl.xread()
953 account := ctl.xread()
954 localpart := ctl.xread()
955 d, err := dns.ParseDomain(domain)
956 ctl.xcheck(err, "parsing domain")
957 err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
958 ctl.xcheck(err, "adding domain")
959 ctl.xwriteok()
960
961 case "domainrm":
962 /* protocol:
963 > "domainrm"
964 > domain
965 < "ok" or error
966 */
967 domain := ctl.xread()
968 d, err := dns.ParseDomain(domain)
969 ctl.xcheck(err, "parsing domain")
970 err = mox.DomainRemove(ctx, d)
971 ctl.xcheck(err, "removing domain")
972 ctl.xwriteok()
973
974 case "accountadd":
975 /* protocol:
976 > "accountadd"
977 > account
978 > address
979 < "ok" or error
980 */
981 account := ctl.xread()
982 address := ctl.xread()
983 err := mox.AccountAdd(ctx, account, address)
984 ctl.xcheck(err, "adding account")
985 ctl.xwriteok()
986
987 case "accountrm":
988 /* protocol:
989 > "accountrm"
990 > account
991 < "ok" or error
992 */
993 account := ctl.xread()
994 err := mox.AccountRemove(ctx, account)
995 ctl.xcheck(err, "removing account")
996 ctl.xwriteok()
997
998 case "addressadd":
999 /* protocol:
1000 > "addressadd"
1001 > address
1002 > account
1003 < "ok" or error
1004 */
1005 address := ctl.xread()
1006 account := ctl.xread()
1007 err := mox.AddressAdd(ctx, address, account)
1008 ctl.xcheck(err, "adding address")
1009 ctl.xwriteok()
1010
1011 case "addressrm":
1012 /* protocol:
1013 > "addressrm"
1014 > address
1015 < "ok" or error
1016 */
1017 address := ctl.xread()
1018 err := mox.AddressRemove(ctx, address)
1019 ctl.xcheck(err, "removing address")
1020 ctl.xwriteok()
1021
1022 case "aliaslist":
1023 /* protocol:
1024 > "aliaslist"
1025 > domain
1026 < "ok" or error
1027 < stream
1028 */
1029 domain := ctl.xread()
1030 d, err := dns.ParseDomain(domain)
1031 ctl.xcheck(err, "parsing domain")
1032 dc, ok := mox.Conf.Domain(d)
1033 if !ok {
1034 ctl.xcheck(errors.New("no such domain"), "listing aliases")
1035 }
1036 ctl.xwriteok()
1037 w := ctl.writer()
1038 for _, a := range dc.Aliases {
1039 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1040 ctl.xcheck(err, "parsing alias localpart")
1041 fmt.Fprintln(w, smtp.NewAddress(lp, a.Domain).Pack(true))
1042 }
1043 w.xclose()
1044
1045 case "aliasprint":
1046 /* protocol:
1047 > "aliasprint"
1048 > address
1049 < "ok" or error
1050 < stream
1051 */
1052 address := ctl.xread()
1053 _, alias, ok := mox.Conf.AccountDestination(address)
1054 if !ok {
1055 ctl.xcheck(errors.New("no such address"), "looking up alias")
1056 } else if alias == nil {
1057 ctl.xcheck(errors.New("address not an alias"), "looking up alias")
1058 }
1059 ctl.xwriteok()
1060 w := ctl.writer()
1061 fmt.Fprintf(w, "# postpublic %v\n", alias.PostPublic)
1062 fmt.Fprintf(w, "# listmembers %v\n", alias.ListMembers)
1063 fmt.Fprintf(w, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1064 fmt.Fprintln(w, "# members:")
1065 for _, a := range alias.Addresses {
1066 fmt.Fprintln(w, a)
1067 }
1068 w.xclose()
1069
1070 case "aliasadd":
1071 /* protocol:
1072 > "aliasadd"
1073 > address
1074 > json alias
1075 < "ok" or error
1076 */
1077 address := ctl.xread()
1078 line := ctl.xread()
1079 addr, err := smtp.ParseAddress(address)
1080 ctl.xcheck(err, "parsing address")
1081 var alias config.Alias
1082 xparseJSON(ctl, line, &alias)
1083 err = mox.AliasAdd(ctx, addr, alias)
1084 ctl.xcheck(err, "adding alias")
1085 ctl.xwriteok()
1086
1087 case "aliasupdate":
1088 /* protocol:
1089 > "aliasupdate"
1090 > alias
1091 > "true" or "false" for postpublic
1092 > "true" or "false" for listmembers
1093 > "true" or "false" for allowmsgfrom
1094 < "ok" or error
1095 */
1096 address := ctl.xread()
1097 postpublic := ctl.xread()
1098 listmembers := ctl.xread()
1099 allowmsgfrom := ctl.xread()
1100 addr, err := smtp.ParseAddress(address)
1101 ctl.xcheck(err, "parsing address")
1102 err = mox.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1103 a, ok := d.Aliases[addr.Localpart.String()]
1104 if !ok {
1105 return fmt.Errorf("alias does not exist")
1106 }
1107
1108 switch postpublic {
1109 case "false":
1110 a.PostPublic = false
1111 case "true":
1112 a.PostPublic = true
1113 }
1114 switch listmembers {
1115 case "false":
1116 a.ListMembers = false
1117 case "true":
1118 a.ListMembers = true
1119 }
1120 switch allowmsgfrom {
1121 case "false":
1122 a.AllowMsgFrom = false
1123 case "true":
1124 a.AllowMsgFrom = true
1125 }
1126
1127 d.Aliases = maps.Clone(d.Aliases)
1128 d.Aliases[addr.Localpart.String()] = a
1129 return nil
1130 })
1131 ctl.xcheck(err, "saving alias")
1132 ctl.xwriteok()
1133
1134 case "aliasrm":
1135 /* protocol:
1136 > "aliasrm"
1137 > alias
1138 < "ok" or error
1139 */
1140 address := ctl.xread()
1141 addr, err := smtp.ParseAddress(address)
1142 ctl.xcheck(err, "parsing address")
1143 err = mox.AliasRemove(ctx, addr)
1144 ctl.xcheck(err, "removing alias")
1145 ctl.xwriteok()
1146
1147 case "aliasaddaddr":
1148 /* protocol:
1149 > "aliasaddaddr"
1150 > alias
1151 > addresses as json
1152 < "ok" or error
1153 */
1154 address := ctl.xread()
1155 line := ctl.xread()
1156 addr, err := smtp.ParseAddress(address)
1157 ctl.xcheck(err, "parsing address")
1158 var addresses []string
1159 xparseJSON(ctl, line, &addresses)
1160 err = mox.AliasAddressesAdd(ctx, addr, addresses)
1161 ctl.xcheck(err, "adding addresses to alias")
1162 ctl.xwriteok()
1163
1164 case "aliasrmaddr":
1165 /* protocol:
1166 > "aliasrmaddr"
1167 > alias
1168 > addresses as json
1169 < "ok" or error
1170 */
1171 address := ctl.xread()
1172 line := ctl.xread()
1173 addr, err := smtp.ParseAddress(address)
1174 ctl.xcheck(err, "parsing address")
1175 var addresses []string
1176 xparseJSON(ctl, line, &addresses)
1177 err = mox.AliasAddressesRemove(ctx, addr, addresses)
1178 ctl.xcheck(err, "removing addresses to alias")
1179 ctl.xwriteok()
1180
1181 case "loglevels":
1182 /* protocol:
1183 > "loglevels"
1184 < "ok"
1185 < stream
1186 */
1187 ctl.xwriteok()
1188 l := mox.Conf.LogLevels()
1189 keys := []string{}
1190 for k := range l {
1191 keys = append(keys, k)
1192 }
1193 sort.Slice(keys, func(i, j int) bool {
1194 return keys[i] < keys[j]
1195 })
1196 s := ""
1197 for _, k := range keys {
1198 ks := k
1199 if ks == "" {
1200 ks = "(default)"
1201 }
1202 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1203 }
1204 ctl.xstreamfrom(strings.NewReader(s))
1205
1206 case "setloglevels":
1207 /* protocol:
1208 > "setloglevels"
1209 > pkg
1210 > level (if empty, log level for pkg will be unset)
1211 < "ok" or error
1212 */
1213 pkg := ctl.xread()
1214 levelstr := ctl.xread()
1215 if levelstr == "" {
1216 mox.Conf.LogLevelRemove(log, pkg)
1217 } else {
1218 level, ok := mlog.Levels[levelstr]
1219 if !ok {
1220 ctl.xerror("bad level")
1221 }
1222 mox.Conf.LogLevelSet(log, pkg, level)
1223 }
1224 ctl.xwriteok()
1225
1226 case "retrain":
1227 /* protocol:
1228 > "retrain"
1229 > account
1230 < "ok" or error
1231 */
1232 account := ctl.xread()
1233 acc, err := store.OpenAccount(log, account)
1234 ctl.xcheck(err, "open account")
1235 defer func() {
1236 if acc != nil {
1237 err := acc.Close()
1238 log.Check(err, "closing account after retraining")
1239 }
1240 }()
1241
1242 acc.WithWLock(func() {
1243 conf, _ := acc.Conf()
1244 if conf.JunkFilter == nil {
1245 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1246 }
1247
1248 // Remove existing junk filter files.
1249 basePath := mox.DataDirPath("accounts")
1250 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1251 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1252 err := os.Remove(dbPath)
1253 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1254 err = os.Remove(bloomPath)
1255 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1256
1257 // Open junk filter, this creates new files.
1258 jf, _, err := acc.OpenJunkFilter(ctx, log)
1259 ctl.xcheck(err, "open new junk filter")
1260 defer func() {
1261 if jf == nil {
1262 return
1263 }
1264 err := jf.Close()
1265 log.Check(err, "closing junk filter during cleanup")
1266 }()
1267
1268 // Read through messages with junk or nonjunk flag set, and train them.
1269 var total, trained int
1270 q := bstore.QueryDB[store.Message](ctx, acc.DB)
1271 q.FilterEqual("Expunged", false)
1272 err = q.ForEach(func(m store.Message) error {
1273 total++
1274 ok, err := acc.TrainMessage(ctx, log, jf, m)
1275 if ok {
1276 trained++
1277 }
1278 return err
1279 })
1280 ctl.xcheck(err, "training messages")
1281 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1282
1283 // Close junk filter, marking success.
1284 err = jf.Close()
1285 jf = nil
1286 ctl.xcheck(err, "closing junk filter")
1287 })
1288 ctl.xwriteok()
1289
1290 case "recalculatemailboxcounts":
1291 /* protocol:
1292 > "recalculatemailboxcounts"
1293 > account
1294 < "ok" or error
1295 < stream
1296 */
1297 account := ctl.xread()
1298 acc, err := store.OpenAccount(log, account)
1299 ctl.xcheck(err, "open account")
1300 defer func() {
1301 if acc != nil {
1302 err := acc.Close()
1303 log.Check(err, "closing account after recalculating mailbox counts")
1304 }
1305 }()
1306 ctl.xwriteok()
1307
1308 w := ctl.writer()
1309
1310 acc.WithWLock(func() {
1311 var changes []store.Change
1312 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1313 var totalSize int64
1314 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1315 mc, err := mb.CalculateCounts(tx)
1316 if err != nil {
1317 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1318 }
1319 totalSize += mc.Size
1320
1321 if !mb.HaveCounts || mc != mb.MailboxCounts {
1322 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1323 ctl.xcheck(err, "write")
1324 mb.HaveCounts = true
1325 mb.MailboxCounts = mc
1326 if err := tx.Update(&mb); err != nil {
1327 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1328 }
1329 changes = append(changes, mb.ChangeCounts())
1330 }
1331 return nil
1332 })
1333 if err != nil {
1334 return err
1335 }
1336
1337 du := store.DiskUsage{ID: 1}
1338 if err := tx.Get(&du); err != nil {
1339 return fmt.Errorf("get disk usage: %v", err)
1340 }
1341 if du.MessageSize != totalSize {
1342 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1343 ctl.xcheck(err, "write")
1344 du.MessageSize = totalSize
1345 if err := tx.Update(&du); err != nil {
1346 return fmt.Errorf("update disk usage: %v", err)
1347 }
1348 }
1349 return nil
1350 })
1351 ctl.xcheck(err, "write transaction for mailbox counts")
1352
1353 store.BroadcastChanges(acc, changes)
1354 })
1355 w.xclose()
1356
1357 case "fixmsgsize":
1358 /* protocol:
1359 > "fixmsgsize"
1360 > account or empty
1361 < "ok" or error
1362 < stream
1363 */
1364
1365 accountOpt := ctl.xread()
1366 ctl.xwriteok()
1367 w := ctl.writer()
1368
1369 var foundProblem bool
1370 const batchSize = 10000
1371
1372 xfixmsgsize := func(accName string) {
1373 acc, err := store.OpenAccount(log, accName)
1374 ctl.xcheck(err, "open account")
1375 defer func() {
1376 err := acc.Close()
1377 log.Check(err, "closing account after fixing message sizes")
1378 }()
1379
1380 total := 0
1381 var lastID int64
1382 for {
1383 var n int
1384
1385 acc.WithRLock(func() {
1386 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1387
1388 // Don't process all message in one transaction, we could block the account for too long.
1389 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1390 q := bstore.QueryTx[store.Message](tx)
1391 q.FilterEqual("Expunged", false)
1392 q.FilterGreater("ID", lastID)
1393 q.Limit(batchSize)
1394 q.SortAsc("ID")
1395 return q.ForEach(func(m store.Message) error {
1396 lastID = m.ID
1397 n++
1398
1399 p := acc.MessagePath(m.ID)
1400 st, err := os.Stat(p)
1401 if err != nil {
1402 mb := store.Mailbox{ID: m.MailboxID}
1403 if xerr := tx.Get(&mb); xerr != nil {
1404 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1405 ctl.xcheck(werr, "write")
1406 }
1407 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1408 ctl.xcheck(werr, "write")
1409 return nil
1410 }
1411 filesize := st.Size()
1412 correctSize := int64(len(m.MsgPrefix)) + filesize
1413 if m.Size == correctSize {
1414 return nil
1415 }
1416
1417 foundProblem = true
1418
1419 mb := store.Mailbox{ID: m.MailboxID}
1420 if err := tx.Get(&mb); err != nil {
1421 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1422 ctl.xcheck(werr, "write")
1423 }
1424 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1425 ctl.xcheck(err, "write")
1426
1427 // We assume that the original message size was accounted as stored in the mailbox
1428 // total size. If this isn't correct, the user can always run
1429 // recalculatemailboxcounts.
1430 mb.Size -= m.Size
1431 mb.Size += correctSize
1432 if err := tx.Update(&mb); err != nil {
1433 return fmt.Errorf("update mailbox counts: %v", err)
1434 }
1435 mailboxCounts[mb.ID] = mb
1436
1437 m.Size = correctSize
1438
1439 mr := acc.MessageReader(m)
1440 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1441 if err != nil {
1442 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
1443 ctl.xcheck(werr, "write")
1444 }
1445 m.ParsedBuf, err = json.Marshal(part)
1446 if err != nil {
1447 return fmt.Errorf("marshal parsed message: %v", err)
1448 }
1449 total++
1450 if err := tx.Update(&m); err != nil {
1451 return fmt.Errorf("update message: %v", err)
1452 }
1453 return nil
1454 })
1455
1456 })
1457 ctl.xcheck(err, "find and fix wrong message sizes")
1458
1459 var changes []store.Change
1460 for _, mb := range mailboxCounts {
1461 changes = append(changes, mb.ChangeCounts())
1462 }
1463 store.BroadcastChanges(acc, changes)
1464 })
1465 if n < batchSize {
1466 break
1467 }
1468 }
1469 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
1470 ctl.xcheck(err, "write")
1471 }
1472
1473 if accountOpt != "" {
1474 xfixmsgsize(accountOpt)
1475 } else {
1476 for i, accName := range mox.Conf.Accounts() {
1477 var line string
1478 if i > 0 {
1479 line = "\n"
1480 }
1481 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
1482 ctl.xcheck(err, "write")
1483 xfixmsgsize(accName)
1484 }
1485 }
1486 if foundProblem {
1487 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1488 ctl.xcheck(err, "write")
1489 }
1490
1491 w.xclose()
1492
1493 case "reparse":
1494 /* protocol:
1495 > "reparse"
1496 > account or empty
1497 < "ok" or error
1498 < stream
1499 */
1500
1501 accountOpt := ctl.xread()
1502 ctl.xwriteok()
1503 w := ctl.writer()
1504
1505 const batchSize = 100
1506
1507 xreparseAccount := func(accName string) {
1508 acc, err := store.OpenAccount(log, accName)
1509 ctl.xcheck(err, "open account")
1510 defer func() {
1511 err := acc.Close()
1512 log.Check(err, "closing account after reparsing messages")
1513 }()
1514
1515 total := 0
1516 var lastID int64
1517 for {
1518 var n int
1519 // Don't process all message in one transaction, we could block the account for too long.
1520 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1521 q := bstore.QueryTx[store.Message](tx)
1522 q.FilterEqual("Expunged", false)
1523 q.FilterGreater("ID", lastID)
1524 q.Limit(batchSize)
1525 q.SortAsc("ID")
1526 return q.ForEach(func(m store.Message) error {
1527 lastID = m.ID
1528 mr := acc.MessageReader(m)
1529 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1530 if err != nil {
1531 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
1532 ctl.xcheck(err, "write")
1533 }
1534 m.ParsedBuf, err = json.Marshal(p)
1535 if err != nil {
1536 return fmt.Errorf("marshal parsed message: %v", err)
1537 }
1538 total++
1539 n++
1540 if err := tx.Update(&m); err != nil {
1541 return fmt.Errorf("update message: %v", err)
1542 }
1543 return nil
1544 })
1545
1546 })
1547 ctl.xcheck(err, "update messages with parsed mime structure")
1548 if n < batchSize {
1549 break
1550 }
1551 }
1552 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
1553 ctl.xcheck(err, "write")
1554 }
1555
1556 if accountOpt != "" {
1557 xreparseAccount(accountOpt)
1558 } else {
1559 for i, accName := range mox.Conf.Accounts() {
1560 var line string
1561 if i > 0 {
1562 line = "\n"
1563 }
1564 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
1565 ctl.xcheck(err, "write")
1566 xreparseAccount(accName)
1567 }
1568 }
1569 w.xclose()
1570
1571 case "reassignthreads":
1572 /* protocol:
1573 > "reassignthreads"
1574 > account or empty
1575 < "ok" or error
1576 < stream
1577 */
1578
1579 accountOpt := ctl.xread()
1580 ctl.xwriteok()
1581 w := ctl.writer()
1582
1583 xreassignThreads := func(accName string) {
1584 acc, err := store.OpenAccount(log, accName)
1585 ctl.xcheck(err, "open account")
1586 defer func() {
1587 err := acc.Close()
1588 log.Check(err, "closing account after reassigning threads")
1589 }()
1590
1591 // We don't want to step on an existing upgrade process.
1592 err = acc.ThreadingWait(log)
1593 ctl.xcheck(err, "waiting for threading upgrade to finish")
1594 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1595
1596 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1597 const batchSize = 50000
1598 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1599 ctl.xcheck(err, "resetting threading fields")
1600 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1601 ctl.xcheck(err, "write")
1602
1603 // Assign threads again. Ideally we would do this in a single transaction, but
1604 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1605 err = acc.AssignThreads(ctx, log, nil, 0, 50000, w)
1606 ctl.xcheck(err, "reassign threads")
1607
1608 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1609 ctl.xcheck(err, "write")
1610 }
1611
1612 if accountOpt != "" {
1613 xreassignThreads(accountOpt)
1614 } else {
1615 for i, accName := range mox.Conf.Accounts() {
1616 var line string
1617 if i > 0 {
1618 line = "\n"
1619 }
1620 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
1621 ctl.xcheck(err, "write")
1622 xreassignThreads(accName)
1623 }
1624 }
1625 w.xclose()
1626
1627 case "backup":
1628 backupctl(ctx, ctl)
1629
1630 default:
1631 log.Info("unrecognized command", slog.String("cmd", cmd))
1632 ctl.xwrite("unrecognized command")
1633 return
1634 }
1635}
1636