1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "log"
11 "log/slog"
12 "maps"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime/debug"
17 "sort"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/mjl-/bstore"
23
24 "github.com/mjl-/mox/config"
25 "github.com/mjl-/mox/dns"
26 "github.com/mjl-/mox/message"
27 "github.com/mjl-/mox/metrics"
28 "github.com/mjl-/mox/mlog"
29 "github.com/mjl-/mox/mox-"
30 "github.com/mjl-/mox/queue"
31 "github.com/mjl-/mox/smtp"
32 "github.com/mjl-/mox/store"
33 "github.com/mjl-/mox/webapi"
34)
35
36// ctl represents a connection to the ctl unix domain socket of a running mox instance.
37// ctl provides functions to read/write commands/responses/data streams.
38type ctl struct {
39 cmd string // Set for server-side of commands.
40 conn net.Conn
41 r *bufio.Reader // Set for first reader.
42 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
43 log mlog.Log // If set, along with x, logging is done here.
44}
45
46// xctl opens a ctl connection.
47func xctl() *ctl {
48 p := mox.DataDirPath("ctl")
49 conn, err := net.Dial("unix", p)
50 if err != nil {
51 log.Fatalf("connecting to control socket at %q: %v", p, err)
52 }
53 ctl := &ctl{conn: conn}
54 version := ctl.xread()
55 if version != "ctlv0" {
56 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
57 }
58 return ctl
59}
60
61// Interpret msg as an error.
62// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
63func (c *ctl) xerror(msg string) {
64 if c.x == nil {
65 log.Fatalln(msg)
66 }
67 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
68 c.xwrite(msg)
69 panic(c.x)
70}
71
72// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
73// ctl.x is set, the error string is written to ctl, to be interpreted as an error
74// by the command reading from ctl.
75func (c *ctl) xcheck(err error, msg string) {
76 if err == nil {
77 return
78 }
79 if c.x == nil {
80 log.Fatalf("%s: %s", msg, err)
81 }
82 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
83 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
84 panic(c.x)
85}
86
87// Read a line and return it without trailing newline.
88func (c *ctl) xread() string {
89 if c.r == nil {
90 c.r = bufio.NewReader(c.conn)
91 }
92 line, err := c.r.ReadString('\n')
93 c.xcheck(err, "read from ctl")
94 return strings.TrimSuffix(line, "\n")
95}
96
97// Read a line. If not "ok", the string is interpreted as an error.
98func (c *ctl) xreadok() {
99 line := c.xread()
100 if line != "ok" {
101 c.xerror(line)
102 }
103}
104
105// Write a string, typically a command or parameter.
106func (c *ctl) xwrite(text string) {
107 _, err := fmt.Fprintln(c.conn, text)
108 c.xcheck(err, "write")
109}
110
111// Write "ok" to indicate success.
112func (c *ctl) xwriteok() {
113 c.xwrite("ok")
114}
115
116// Copy data from a stream from ctl to dst.
117func (c *ctl) xstreamto(dst io.Writer) {
118 _, err := io.Copy(dst, c.reader())
119 c.xcheck(err, "reading message")
120}
121
122// Copy data from src to a stream to ctl.
123func (c *ctl) xstreamfrom(src io.Reader) {
124 w := c.writer()
125 _, err := io.Copy(w, src)
126 c.xcheck(err, "copying")
127 w.xclose()
128}
129
130// Writer returns an io.Writer for a data stream to ctl.
131// When done writing, caller must call xclose to signal the end of the stream.
132// Behaviour of "x" is copied from ctl.
133func (c *ctl) writer() *ctlwriter {
134 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
135}
136
137// Reader returns an io.Reader for a data stream from ctl.
138// Behaviour of "x" is copied from ctl.
139func (c *ctl) reader() *ctlreader {
140 if c.r == nil {
141 c.r = bufio.NewReader(c.conn)
142 }
143 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
144}
145
146/*
147Ctlwriter and ctlreader implement the writing and reading a data stream. They
148implement the io.Writer and io.Reader interface. In the protocol below each
149non-data message ends with a newline that is typically stripped when
150interpreting.
151
152Zero or more data transactions:
153
154 > "123" (for data size) or an error message
155 > data, 123 bytes
156 < "ok" or an error message
157
158Followed by a end of stream indicated by zero data bytes message:
159
160 > "0"
161*/
162
163type ctlwriter struct {
164 cmd string // Set for server-side of commands.
165 conn net.Conn // Ctl socket from which messages are read.
166 buf []byte // Scratch buffer, for reading response.
167 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
168 log mlog.Log
169}
170
171func (s *ctlwriter) Write(buf []byte) (int, error) {
172 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
173 s.xcheck(err, "write count")
174 _, err = s.conn.Write(buf)
175 s.xcheck(err, "write data")
176 if s.buf == nil {
177 s.buf = make([]byte, 512)
178 }
179 n, err := s.conn.Read(s.buf)
180 s.xcheck(err, "reading response to write")
181 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
182 if line != "ok" {
183 s.xerror(line)
184 }
185 return len(buf), nil
186}
187
188func (s *ctlwriter) xerror(msg string) {
189 if s.x == nil {
190 log.Fatalln(msg)
191 } else {
192 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
193 panic(s.x)
194 }
195}
196
197func (s *ctlwriter) xcheck(err error, msg string) {
198 if err == nil {
199 return
200 }
201 if s.x == nil {
202 log.Fatalf("%s: %s", msg, err)
203 } else {
204 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
205 panic(s.x)
206 }
207}
208
209func (s *ctlwriter) xclose() {
210 _, err := fmt.Fprintf(s.conn, "0\n")
211 s.xcheck(err, "write eof")
212}
213
214type ctlreader struct {
215 cmd string // Set for server-side of command.
216 conn net.Conn // For writing "ok" after reading.
217 r *bufio.Reader // Buffered ctl socket.
218 err error // If set, returned for each read. can also be io.EOF.
219 npending int // Number of bytes that can still be read until a new count line must be read.
220 x any // If set, errors are handled with panic(x) instead of log.Fatal.
221 log mlog.Log // If x is set, logging goes to log.
222}
223
224func (s *ctlreader) Read(buf []byte) (N int, Err error) {
225 if s.err != nil {
226 return 0, s.err
227 }
228 if s.npending == 0 {
229 line, err := s.r.ReadString('\n')
230 s.xcheck(err, "reading count")
231 line = strings.TrimSuffix(line, "\n")
232 n, err := strconv.ParseInt(line, 10, 32)
233 if err != nil {
234 s.xerror(line)
235 }
236 if n == 0 {
237 s.err = io.EOF
238 return 0, s.err
239 }
240 s.npending = int(n)
241 }
242 rn := len(buf)
243 if rn > s.npending {
244 rn = s.npending
245 }
246 n, err := s.r.Read(buf[:rn])
247 s.xcheck(err, "read from ctl")
248 s.npending -= n
249 if s.npending == 0 {
250 _, err = fmt.Fprintln(s.conn, "ok")
251 s.xcheck(err, "writing ok after reading")
252 }
253 return n, err
254}
255
256func (s *ctlreader) xerror(msg string) {
257 if s.x == nil {
258 log.Fatalln(msg)
259 } else {
260 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
261 panic(s.x)
262 }
263}
264
265func (s *ctlreader) xcheck(err error, msg string) {
266 if err == nil {
267 return
268 }
269 if s.x == nil {
270 log.Fatalf("%s: %s", msg, err)
271 } else {
272 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
273 panic(s.x)
274 }
275}
276
277// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
278func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
279 log.Debug("ctl connection")
280
281 var stop = struct{}{} // Sentinel value for panic and recover.
282 ctl := &ctl{conn: conn, x: stop, log: log}
283 defer func() {
284 x := recover()
285 if x == nil || x == stop {
286 return
287 }
288 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
289 debug.PrintStack()
290 metrics.PanicInc(metrics.Ctl)
291 }()
292
293 defer conn.Close()
294
295 ctl.xwrite("ctlv0")
296 for {
297 servectlcmd(ctx, ctl, shutdown)
298 }
299}
300
301func xparseJSON(ctl *ctl, s string, v any) {
302 dec := json.NewDecoder(strings.NewReader(s))
303 dec.DisallowUnknownFields()
304 err := dec.Decode(v)
305 ctl.xcheck(err, "parsing from ctl as json")
306}
307
308func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
309 log := ctl.log
310 cmd := ctl.xread()
311 ctl.cmd = cmd
312 log.Info("ctl command", slog.String("cmd", cmd))
313 switch cmd {
314 case "stop":
315 shutdown()
316 os.Exit(0)
317
318 case "deliver":
319 /* The protocol, double quoted are literals.
320
321 > "deliver"
322 > address
323 < "ok"
324 > stream
325 < "ok"
326 */
327
328 to := ctl.xread()
329 a, addr, err := store.OpenEmail(log, to)
330 ctl.xcheck(err, "lookup destination address")
331
332 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
333 ctl.xcheck(err, "creating temporary message file")
334 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
335 mw := message.NewWriter(msgFile)
336 ctl.xwriteok()
337
338 ctl.xstreamto(mw)
339 err = msgFile.Sync()
340 ctl.xcheck(err, "syncing message to storage")
341
342 m := store.Message{
343 Received: time.Now(),
344 Size: mw.Size,
345 }
346
347 a.WithWLock(func() {
348 err := a.DeliverDestination(log, addr, &m, msgFile)
349 ctl.xcheck(err, "delivering message")
350 log.Info("message delivered through ctl", slog.Any("to", to))
351 })
352
353 err = a.Close()
354 ctl.xcheck(err, "closing account")
355 ctl.xwriteok()
356
357 case "setaccountpassword":
358 /* protocol:
359 > "setaccountpassword"
360 > account
361 > password
362 < "ok" or error
363 */
364
365 account := ctl.xread()
366 pw := ctl.xread()
367
368 acc, err := store.OpenAccount(log, account)
369 ctl.xcheck(err, "open account")
370 defer func() {
371 if acc != nil {
372 err := acc.Close()
373 log.Check(err, "closing account after setting password")
374 }
375 }()
376
377 err = acc.SetPassword(log, pw)
378 ctl.xcheck(err, "setting password")
379 err = acc.Close()
380 ctl.xcheck(err, "closing account")
381 acc = nil
382 ctl.xwriteok()
383
384 case "queueholdruleslist":
385 /* protocol:
386 > "queueholdruleslist"
387 < "ok"
388 < stream
389 */
390 l, err := queue.HoldRuleList(ctx)
391 ctl.xcheck(err, "listing hold rules")
392 ctl.xwriteok()
393 xw := ctl.writer()
394 fmt.Fprintln(xw, "hold rules:")
395 for _, hr := range l {
396 var elems []string
397 if hr.Account != "" {
398 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
399 }
400 var zerodom dns.Domain
401 if hr.SenderDomain != zerodom {
402 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
403 }
404 if hr.RecipientDomain != zerodom {
405 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
406 }
407 if len(elems) == 0 {
408 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
409 } else {
410 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
411 }
412 }
413 if len(l) == 0 {
414 fmt.Fprint(xw, "(none)\n")
415 }
416 xw.xclose()
417
418 case "queueholdrulesadd":
419 /* protocol:
420 > "queueholdrulesadd"
421 > account
422 > senderdomainstr
423 > recipientdomainstr
424 < "ok" or error
425 */
426 var hr queue.HoldRule
427 hr.Account = ctl.xread()
428 senderdomstr := ctl.xread()
429 rcptdomstr := ctl.xread()
430 var err error
431 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
432 ctl.xcheck(err, "parsing sender domain")
433 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
434 ctl.xcheck(err, "parsing recipient domain")
435 hr, err = queue.HoldRuleAdd(ctx, log, hr)
436 ctl.xcheck(err, "add hold rule")
437 ctl.xwriteok()
438
439 case "queueholdrulesremove":
440 /* protocol:
441 > "queueholdrulesremove"
442 > id
443 < "ok" or error
444 */
445 idstr := ctl.xread()
446 id, err := strconv.ParseInt(idstr, 10, 64)
447 ctl.xcheck(err, "parsing id")
448 err = queue.HoldRuleRemove(ctx, log, id)
449 ctl.xcheck(err, "remove hold rule")
450 ctl.xwriteok()
451
452 case "queuelist":
453 /* protocol:
454 > "queuelist"
455 > filters as json
456 > sort as json
457 < "ok"
458 < stream
459 */
460 filterline := ctl.xread()
461 sortline := ctl.xread()
462 var f queue.Filter
463 xparseJSON(ctl, filterline, &f)
464 var s queue.Sort
465 xparseJSON(ctl, sortline, &s)
466 qmsgs, err := queue.List(ctx, f, s)
467 ctl.xcheck(err, "listing queue")
468 ctl.xwriteok()
469
470 xw := ctl.writer()
471 fmt.Fprintln(xw, "messages:")
472 for _, qm := range qmsgs {
473 var lastAttempt string
474 if qm.LastAttempt != nil {
475 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
476 }
477 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
478 }
479 if len(qmsgs) == 0 {
480 fmt.Fprint(xw, "(none)\n")
481 }
482 xw.xclose()
483
484 case "queueholdset":
485 /* protocol:
486 > "queueholdset"
487 > queuefilters as json
488 > "true" or "false"
489 < "ok" or error
490 < count
491 */
492
493 filterline := ctl.xread()
494 hold := ctl.xread() == "true"
495 var f queue.Filter
496 xparseJSON(ctl, filterline, &f)
497 count, err := queue.HoldSet(ctx, f, hold)
498 ctl.xcheck(err, "setting on hold status for messages")
499 ctl.xwriteok()
500 ctl.xwrite(fmt.Sprintf("%d", count))
501
502 case "queueschedule":
503 /* protocol:
504 > "queueschedule"
505 > queuefilters as json
506 > relative to now
507 > duration
508 < "ok" or error
509 < count
510 */
511
512 filterline := ctl.xread()
513 relnow := ctl.xread()
514 duration := ctl.xread()
515 var f queue.Filter
516 xparseJSON(ctl, filterline, &f)
517 d, err := time.ParseDuration(duration)
518 ctl.xcheck(err, "parsing duration for next delivery attempt")
519 var count int
520 if relnow == "" {
521 count, err = queue.NextAttemptAdd(ctx, f, d)
522 } else {
523 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
524 }
525 ctl.xcheck(err, "setting next delivery attempts in queue")
526 ctl.xwriteok()
527 ctl.xwrite(fmt.Sprintf("%d", count))
528
529 case "queuetransport":
530 /* protocol:
531 > "queuetransport"
532 > queuefilters as json
533 > transport
534 < "ok" or error
535 < count
536 */
537
538 filterline := ctl.xread()
539 transport := ctl.xread()
540 var f queue.Filter
541 xparseJSON(ctl, filterline, &f)
542 count, err := queue.TransportSet(ctx, f, transport)
543 ctl.xcheck(err, "adding to next delivery attempts in queue")
544 ctl.xwriteok()
545 ctl.xwrite(fmt.Sprintf("%d", count))
546
547 case "queuerequiretls":
548 /* protocol:
549 > "queuerequiretls"
550 > queuefilters as json
551 > reqtls (empty string, "true" or "false")
552 < "ok" or error
553 < count
554 */
555
556 filterline := ctl.xread()
557 reqtls := ctl.xread()
558 var req *bool
559 switch reqtls {
560 case "":
561 case "true":
562 v := true
563 req = &v
564 case "false":
565 v := false
566 req = &v
567 default:
568 ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
569 }
570 var f queue.Filter
571 xparseJSON(ctl, filterline, &f)
572 count, err := queue.RequireTLSSet(ctx, f, req)
573 ctl.xcheck(err, "setting tls requirements on messages in queue")
574 ctl.xwriteok()
575 ctl.xwrite(fmt.Sprintf("%d", count))
576
577 case "queuefail":
578 /* protocol:
579 > "queuefail"
580 > queuefilters as json
581 < "ok" or error
582 < count
583 */
584
585 filterline := ctl.xread()
586 var f queue.Filter
587 xparseJSON(ctl, filterline, &f)
588 count, err := queue.Fail(ctx, log, f)
589 ctl.xcheck(err, "marking messages from queue as failed")
590 ctl.xwriteok()
591 ctl.xwrite(fmt.Sprintf("%d", count))
592
593 case "queuedrop":
594 /* protocol:
595 > "queuedrop"
596 > queuefilters as json
597 < "ok" or error
598 < count
599 */
600
601 filterline := ctl.xread()
602 var f queue.Filter
603 xparseJSON(ctl, filterline, &f)
604 count, err := queue.Drop(ctx, log, f)
605 ctl.xcheck(err, "dropping messages from queue")
606 ctl.xwriteok()
607 ctl.xwrite(fmt.Sprintf("%d", count))
608
609 case "queuedump":
610 /* protocol:
611 > "queuedump"
612 > id
613 < "ok" or error
614 < stream
615 */
616
617 idstr := ctl.xread()
618 id, err := strconv.ParseInt(idstr, 10, 64)
619 if err != nil {
620 ctl.xcheck(err, "parsing id")
621 }
622 mr, err := queue.OpenMessage(ctx, id)
623 ctl.xcheck(err, "opening message")
624 defer func() {
625 err := mr.Close()
626 log.Check(err, "closing message from queue")
627 }()
628 ctl.xwriteok()
629 ctl.xstreamfrom(mr)
630
631 case "queueretiredlist":
632 /* protocol:
633 > "queueretiredlist"
634 > filters as json
635 > sort as json
636 < "ok"
637 < stream
638 */
639 filterline := ctl.xread()
640 sortline := ctl.xread()
641 var f queue.RetiredFilter
642 xparseJSON(ctl, filterline, &f)
643 var s queue.RetiredSort
644 xparseJSON(ctl, sortline, &s)
645 qmsgs, err := queue.RetiredList(ctx, f, s)
646 ctl.xcheck(err, "listing retired queue")
647 ctl.xwriteok()
648
649 xw := ctl.writer()
650 fmt.Fprintln(xw, "retired messages:")
651 for _, qm := range qmsgs {
652 var lastAttempt string
653 if qm.LastAttempt != nil {
654 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
655 }
656 result := "failure"
657 if qm.Success {
658 result = "success"
659 }
660 sender, err := qm.Sender()
661 xcheckf(err, "parsing sender")
662 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
663 }
664 if len(qmsgs) == 0 {
665 fmt.Fprint(xw, "(none)\n")
666 }
667 xw.xclose()
668
669 case "queueretiredprint":
670 /* protocol:
671 > "queueretiredprint"
672 > id
673 < "ok"
674 < stream
675 */
676 idstr := ctl.xread()
677 id, err := strconv.ParseInt(idstr, 10, 64)
678 if err != nil {
679 ctl.xcheck(err, "parsing id")
680 }
681 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
682 ctl.xcheck(err, "getting retired messages")
683 if len(l) == 0 {
684 ctl.xcheck(errors.New("not found"), "getting retired message")
685 }
686 m := l[0]
687 ctl.xwriteok()
688 xw := ctl.writer()
689 enc := json.NewEncoder(xw)
690 enc.SetIndent("", "\t")
691 err = enc.Encode(m)
692 ctl.xcheck(err, "encode retired message")
693 xw.xclose()
694
695 case "queuehooklist":
696 /* protocol:
697 > "queuehooklist"
698 > filters as json
699 > sort as json
700 < "ok"
701 < stream
702 */
703 filterline := ctl.xread()
704 sortline := ctl.xread()
705 var f queue.HookFilter
706 xparseJSON(ctl, filterline, &f)
707 var s queue.HookSort
708 xparseJSON(ctl, sortline, &s)
709 hooks, err := queue.HookList(ctx, f, s)
710 ctl.xcheck(err, "listing webhooks")
711 ctl.xwriteok()
712
713 xw := ctl.writer()
714 fmt.Fprintln(xw, "webhooks:")
715 for _, h := range hooks {
716 var lastAttempt string
717 if len(h.Results) > 0 {
718 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
719 }
720 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
721 }
722 if len(hooks) == 0 {
723 fmt.Fprint(xw, "(none)\n")
724 }
725 xw.xclose()
726
727 case "queuehookschedule":
728 /* protocol:
729 > "queuehookschedule"
730 > hookfilters as json
731 > relative to now
732 > duration
733 < "ok" or error
734 < count
735 */
736
737 filterline := ctl.xread()
738 relnow := ctl.xread()
739 duration := ctl.xread()
740 var f queue.HookFilter
741 xparseJSON(ctl, filterline, &f)
742 d, err := time.ParseDuration(duration)
743 ctl.xcheck(err, "parsing duration for next delivery attempt")
744 var count int
745 if relnow == "" {
746 count, err = queue.HookNextAttemptAdd(ctx, f, d)
747 } else {
748 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
749 }
750 ctl.xcheck(err, "setting next delivery attempts in queue")
751 ctl.xwriteok()
752 ctl.xwrite(fmt.Sprintf("%d", count))
753
754 case "queuehookcancel":
755 /* protocol:
756 > "queuehookcancel"
757 > hookfilters as json
758 < "ok" or error
759 < count
760 */
761
762 filterline := ctl.xread()
763 var f queue.HookFilter
764 xparseJSON(ctl, filterline, &f)
765 count, err := queue.HookCancel(ctx, log, f)
766 ctl.xcheck(err, "canceling webhooks in queue")
767 ctl.xwriteok()
768 ctl.xwrite(fmt.Sprintf("%d", count))
769
770 case "queuehookprint":
771 /* protocol:
772 > "queuehookprint"
773 > id
774 < "ok"
775 < stream
776 */
777 idstr := ctl.xread()
778 id, err := strconv.ParseInt(idstr, 10, 64)
779 if err != nil {
780 ctl.xcheck(err, "parsing id")
781 }
782 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
783 ctl.xcheck(err, "getting webhooks")
784 if len(l) == 0 {
785 ctl.xcheck(errors.New("not found"), "getting webhook")
786 }
787 h := l[0]
788 ctl.xwriteok()
789 xw := ctl.writer()
790 enc := json.NewEncoder(xw)
791 enc.SetIndent("", "\t")
792 err = enc.Encode(h)
793 ctl.xcheck(err, "encode webhook")
794 xw.xclose()
795
796 case "queuehookretiredlist":
797 /* protocol:
798 > "queuehookretiredlist"
799 > filters as json
800 > sort as json
801 < "ok"
802 < stream
803 */
804 filterline := ctl.xread()
805 sortline := ctl.xread()
806 var f queue.HookRetiredFilter
807 xparseJSON(ctl, filterline, &f)
808 var s queue.HookRetiredSort
809 xparseJSON(ctl, sortline, &s)
810 l, err := queue.HookRetiredList(ctx, f, s)
811 ctl.xcheck(err, "listing retired webhooks")
812 ctl.xwriteok()
813
814 xw := ctl.writer()
815 fmt.Fprintln(xw, "retired webhooks:")
816 for _, h := range l {
817 var lastAttempt string
818 if len(h.Results) > 0 {
819 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
820 }
821 result := "success"
822 if !h.Success {
823 result = "failure"
824 }
825 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
826 }
827 if len(l) == 0 {
828 fmt.Fprint(xw, "(none)\n")
829 }
830 xw.xclose()
831
832 case "queuehookretiredprint":
833 /* protocol:
834 > "queuehookretiredprint"
835 > id
836 < "ok"
837 < stream
838 */
839 idstr := ctl.xread()
840 id, err := strconv.ParseInt(idstr, 10, 64)
841 if err != nil {
842 ctl.xcheck(err, "parsing id")
843 }
844 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
845 ctl.xcheck(err, "getting retired webhooks")
846 if len(l) == 0 {
847 ctl.xcheck(errors.New("not found"), "getting retired webhook")
848 }
849 h := l[0]
850 ctl.xwriteok()
851 xw := ctl.writer()
852 enc := json.NewEncoder(xw)
853 enc.SetIndent("", "\t")
854 err = enc.Encode(h)
855 ctl.xcheck(err, "encode retired webhook")
856 xw.xclose()
857
858 case "queuesuppresslist":
859 /* protocol:
860 > "queuesuppresslist"
861 > account (or empty)
862 < "ok" or error
863 < stream
864 */
865
866 account := ctl.xread()
867 l, err := queue.SuppressionList(ctx, account)
868 ctl.xcheck(err, "listing suppressions")
869 ctl.xwriteok()
870 xw := ctl.writer()
871 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
872 for _, sup := range l {
873 manual := "No"
874 if sup.Manual {
875 manual = "Yes"
876 }
877 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
878 }
879 if len(l) == 0 {
880 fmt.Fprintln(xw, "(none)")
881 }
882 xw.xclose()
883
884 case "queuesuppressadd":
885 /* protocol:
886 > "queuesuppressadd"
887 > account
888 > address
889 < "ok" or error
890 */
891
892 account := ctl.xread()
893 address := ctl.xread()
894 _, ok := mox.Conf.Account(account)
895 if !ok {
896 ctl.xcheck(errors.New("unknown account"), "looking up account")
897 }
898 addr, err := smtp.ParseAddress(address)
899 ctl.xcheck(err, "parsing address")
900 sup := webapi.Suppression{
901 Account: account,
902 Manual: true,
903 Reason: "added through mox cli",
904 }
905 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
906 ctl.xcheck(err, "adding suppression")
907 ctl.xwriteok()
908
909 case "queuesuppressremove":
910 /* protocol:
911 > "queuesuppressremove"
912 > account
913 > address
914 < "ok" or error
915 */
916
917 account := ctl.xread()
918 address := ctl.xread()
919 addr, err := smtp.ParseAddress(address)
920 ctl.xcheck(err, "parsing address")
921 err = queue.SuppressionRemove(ctx, account, addr.Path())
922 ctl.xcheck(err, "removing suppression")
923 ctl.xwriteok()
924
925 case "queuesuppresslookup":
926 /* protocol:
927 > "queuesuppresslookup"
928 > account or empty
929 > address
930 < "ok" or error
931 < stream
932 */
933
934 account := ctl.xread()
935 address := ctl.xread()
936 if account != "" {
937 _, ok := mox.Conf.Account(account)
938 if !ok {
939 ctl.xcheck(errors.New("unknown account"), "looking up account")
940 }
941 }
942 addr, err := smtp.ParseAddress(address)
943 ctl.xcheck(err, "parsing address")
944 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
945 ctl.xcheck(err, "looking up suppression")
946 ctl.xwriteok()
947 xw := ctl.writer()
948 if sup == nil {
949 fmt.Fprintln(xw, "not present")
950 } else {
951 manual := "no"
952 if sup.Manual {
953 manual = "yes"
954 }
955 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
956 }
957 xw.xclose()
958
959 case "importmaildir", "importmbox":
960 mbox := cmd == "importmbox"
961 importctl(ctx, ctl, mbox)
962
963 case "domainadd":
964 /* protocol:
965 > "domainadd"
966 > domain
967 > account
968 > localpart
969 < "ok" or error
970 */
971 domain := ctl.xread()
972 account := ctl.xread()
973 localpart := ctl.xread()
974 d, err := dns.ParseDomain(domain)
975 ctl.xcheck(err, "parsing domain")
976 err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
977 ctl.xcheck(err, "adding domain")
978 ctl.xwriteok()
979
980 case "domainrm":
981 /* protocol:
982 > "domainrm"
983 > domain
984 < "ok" or error
985 */
986 domain := ctl.xread()
987 d, err := dns.ParseDomain(domain)
988 ctl.xcheck(err, "parsing domain")
989 err = mox.DomainRemove(ctx, d)
990 ctl.xcheck(err, "removing domain")
991 ctl.xwriteok()
992
993 case "accountadd":
994 /* protocol:
995 > "accountadd"
996 > account
997 > address
998 < "ok" or error
999 */
1000 account := ctl.xread()
1001 address := ctl.xread()
1002 err := mox.AccountAdd(ctx, account, address)
1003 ctl.xcheck(err, "adding account")
1004 ctl.xwriteok()
1005
1006 case "accountrm":
1007 /* protocol:
1008 > "accountrm"
1009 > account
1010 < "ok" or error
1011 */
1012 account := ctl.xread()
1013 err := mox.AccountRemove(ctx, account)
1014 ctl.xcheck(err, "removing account")
1015 ctl.xwriteok()
1016
1017 case "addressadd":
1018 /* protocol:
1019 > "addressadd"
1020 > address
1021 > account
1022 < "ok" or error
1023 */
1024 address := ctl.xread()
1025 account := ctl.xread()
1026 err := mox.AddressAdd(ctx, address, account)
1027 ctl.xcheck(err, "adding address")
1028 ctl.xwriteok()
1029
1030 case "addressrm":
1031 /* protocol:
1032 > "addressrm"
1033 > address
1034 < "ok" or error
1035 */
1036 address := ctl.xread()
1037 err := mox.AddressRemove(ctx, address)
1038 ctl.xcheck(err, "removing address")
1039 ctl.xwriteok()
1040
1041 case "aliaslist":
1042 /* protocol:
1043 > "aliaslist"
1044 > domain
1045 < "ok" or error
1046 < stream
1047 */
1048 domain := ctl.xread()
1049 d, err := dns.ParseDomain(domain)
1050 ctl.xcheck(err, "parsing domain")
1051 dc, ok := mox.Conf.Domain(d)
1052 if !ok {
1053 ctl.xcheck(errors.New("no such domain"), "listing aliases")
1054 }
1055 ctl.xwriteok()
1056 w := ctl.writer()
1057 for _, a := range dc.Aliases {
1058 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1059 ctl.xcheck(err, "parsing alias localpart")
1060 fmt.Fprintln(w, smtp.NewAddress(lp, a.Domain).Pack(true))
1061 }
1062 w.xclose()
1063
1064 case "aliasprint":
1065 /* protocol:
1066 > "aliasprint"
1067 > address
1068 < "ok" or error
1069 < stream
1070 */
1071 address := ctl.xread()
1072 _, alias, ok := mox.Conf.AccountDestination(address)
1073 if !ok {
1074 ctl.xcheck(errors.New("no such address"), "looking up alias")
1075 } else if alias == nil {
1076 ctl.xcheck(errors.New("address not an alias"), "looking up alias")
1077 }
1078 ctl.xwriteok()
1079 w := ctl.writer()
1080 fmt.Fprintf(w, "# postpublic %v\n", alias.PostPublic)
1081 fmt.Fprintf(w, "# listmembers %v\n", alias.ListMembers)
1082 fmt.Fprintf(w, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1083 fmt.Fprintln(w, "# members:")
1084 for _, a := range alias.Addresses {
1085 fmt.Fprintln(w, a)
1086 }
1087 w.xclose()
1088
1089 case "aliasadd":
1090 /* protocol:
1091 > "aliasadd"
1092 > address
1093 > json alias
1094 < "ok" or error
1095 */
1096 address := ctl.xread()
1097 line := ctl.xread()
1098 addr, err := smtp.ParseAddress(address)
1099 ctl.xcheck(err, "parsing address")
1100 var alias config.Alias
1101 xparseJSON(ctl, line, &alias)
1102 err = mox.AliasAdd(ctx, addr, alias)
1103 ctl.xcheck(err, "adding alias")
1104 ctl.xwriteok()
1105
1106 case "aliasupdate":
1107 /* protocol:
1108 > "aliasupdate"
1109 > alias
1110 > "true" or "false" for postpublic
1111 > "true" or "false" for listmembers
1112 > "true" or "false" for allowmsgfrom
1113 < "ok" or error
1114 */
1115 address := ctl.xread()
1116 postpublic := ctl.xread()
1117 listmembers := ctl.xread()
1118 allowmsgfrom := ctl.xread()
1119 addr, err := smtp.ParseAddress(address)
1120 ctl.xcheck(err, "parsing address")
1121 err = mox.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1122 a, ok := d.Aliases[addr.Localpart.String()]
1123 if !ok {
1124 return fmt.Errorf("alias does not exist")
1125 }
1126
1127 switch postpublic {
1128 case "false":
1129 a.PostPublic = false
1130 case "true":
1131 a.PostPublic = true
1132 }
1133 switch listmembers {
1134 case "false":
1135 a.ListMembers = false
1136 case "true":
1137 a.ListMembers = true
1138 }
1139 switch allowmsgfrom {
1140 case "false":
1141 a.AllowMsgFrom = false
1142 case "true":
1143 a.AllowMsgFrom = true
1144 }
1145
1146 d.Aliases = maps.Clone(d.Aliases)
1147 d.Aliases[addr.Localpart.String()] = a
1148 return nil
1149 })
1150 ctl.xcheck(err, "saving alias")
1151 ctl.xwriteok()
1152
1153 case "aliasrm":
1154 /* protocol:
1155 > "aliasrm"
1156 > alias
1157 < "ok" or error
1158 */
1159 address := ctl.xread()
1160 addr, err := smtp.ParseAddress(address)
1161 ctl.xcheck(err, "parsing address")
1162 err = mox.AliasRemove(ctx, addr)
1163 ctl.xcheck(err, "removing alias")
1164 ctl.xwriteok()
1165
1166 case "aliasaddaddr":
1167 /* protocol:
1168 > "aliasaddaddr"
1169 > alias
1170 > addresses as json
1171 < "ok" or error
1172 */
1173 address := ctl.xread()
1174 line := ctl.xread()
1175 addr, err := smtp.ParseAddress(address)
1176 ctl.xcheck(err, "parsing address")
1177 var addresses []string
1178 xparseJSON(ctl, line, &addresses)
1179 err = mox.AliasAddressesAdd(ctx, addr, addresses)
1180 ctl.xcheck(err, "adding addresses to alias")
1181 ctl.xwriteok()
1182
1183 case "aliasrmaddr":
1184 /* protocol:
1185 > "aliasrmaddr"
1186 > alias
1187 > addresses as json
1188 < "ok" or error
1189 */
1190 address := ctl.xread()
1191 line := ctl.xread()
1192 addr, err := smtp.ParseAddress(address)
1193 ctl.xcheck(err, "parsing address")
1194 var addresses []string
1195 xparseJSON(ctl, line, &addresses)
1196 err = mox.AliasAddressesRemove(ctx, addr, addresses)
1197 ctl.xcheck(err, "removing addresses to alias")
1198 ctl.xwriteok()
1199
1200 case "loglevels":
1201 /* protocol:
1202 > "loglevels"
1203 < "ok"
1204 < stream
1205 */
1206 ctl.xwriteok()
1207 l := mox.Conf.LogLevels()
1208 keys := []string{}
1209 for k := range l {
1210 keys = append(keys, k)
1211 }
1212 sort.Slice(keys, func(i, j int) bool {
1213 return keys[i] < keys[j]
1214 })
1215 s := ""
1216 for _, k := range keys {
1217 ks := k
1218 if ks == "" {
1219 ks = "(default)"
1220 }
1221 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1222 }
1223 ctl.xstreamfrom(strings.NewReader(s))
1224
1225 case "setloglevels":
1226 /* protocol:
1227 > "setloglevels"
1228 > pkg
1229 > level (if empty, log level for pkg will be unset)
1230 < "ok" or error
1231 */
1232 pkg := ctl.xread()
1233 levelstr := ctl.xread()
1234 if levelstr == "" {
1235 mox.Conf.LogLevelRemove(log, pkg)
1236 } else {
1237 level, ok := mlog.Levels[levelstr]
1238 if !ok {
1239 ctl.xerror("bad level")
1240 }
1241 mox.Conf.LogLevelSet(log, pkg, level)
1242 }
1243 ctl.xwriteok()
1244
1245 case "retrain":
1246 /* protocol:
1247 > "retrain"
1248 > account
1249 < "ok" or error
1250 */
1251 account := ctl.xread()
1252 acc, err := store.OpenAccount(log, account)
1253 ctl.xcheck(err, "open account")
1254 defer func() {
1255 if acc != nil {
1256 err := acc.Close()
1257 log.Check(err, "closing account after retraining")
1258 }
1259 }()
1260
1261 acc.WithWLock(func() {
1262 conf, _ := acc.Conf()
1263 if conf.JunkFilter == nil {
1264 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1265 }
1266
1267 // Remove existing junk filter files.
1268 basePath := mox.DataDirPath("accounts")
1269 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1270 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1271 err := os.Remove(dbPath)
1272 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1273 err = os.Remove(bloomPath)
1274 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1275
1276 // Open junk filter, this creates new files.
1277 jf, _, err := acc.OpenJunkFilter(ctx, log)
1278 ctl.xcheck(err, "open new junk filter")
1279 defer func() {
1280 if jf == nil {
1281 return
1282 }
1283 err := jf.Close()
1284 log.Check(err, "closing junk filter during cleanup")
1285 }()
1286
1287 // Read through messages with junk or nonjunk flag set, and train them.
1288 var total, trained int
1289 q := bstore.QueryDB[store.Message](ctx, acc.DB)
1290 q.FilterEqual("Expunged", false)
1291 err = q.ForEach(func(m store.Message) error {
1292 total++
1293 ok, err := acc.TrainMessage(ctx, log, jf, m)
1294 if ok {
1295 trained++
1296 }
1297 return err
1298 })
1299 ctl.xcheck(err, "training messages")
1300 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1301
1302 // Close junk filter, marking success.
1303 err = jf.Close()
1304 jf = nil
1305 ctl.xcheck(err, "closing junk filter")
1306 })
1307 ctl.xwriteok()
1308
1309 case "recalculatemailboxcounts":
1310 /* protocol:
1311 > "recalculatemailboxcounts"
1312 > account
1313 < "ok" or error
1314 < stream
1315 */
1316 account := ctl.xread()
1317 acc, err := store.OpenAccount(log, account)
1318 ctl.xcheck(err, "open account")
1319 defer func() {
1320 if acc != nil {
1321 err := acc.Close()
1322 log.Check(err, "closing account after recalculating mailbox counts")
1323 }
1324 }()
1325 ctl.xwriteok()
1326
1327 w := ctl.writer()
1328
1329 acc.WithWLock(func() {
1330 var changes []store.Change
1331 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1332 var totalSize int64
1333 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1334 mc, err := mb.CalculateCounts(tx)
1335 if err != nil {
1336 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1337 }
1338 totalSize += mc.Size
1339
1340 if !mb.HaveCounts || mc != mb.MailboxCounts {
1341 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1342 ctl.xcheck(err, "write")
1343 mb.HaveCounts = true
1344 mb.MailboxCounts = mc
1345 if err := tx.Update(&mb); err != nil {
1346 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1347 }
1348 changes = append(changes, mb.ChangeCounts())
1349 }
1350 return nil
1351 })
1352 if err != nil {
1353 return err
1354 }
1355
1356 du := store.DiskUsage{ID: 1}
1357 if err := tx.Get(&du); err != nil {
1358 return fmt.Errorf("get disk usage: %v", err)
1359 }
1360 if du.MessageSize != totalSize {
1361 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1362 ctl.xcheck(err, "write")
1363 du.MessageSize = totalSize
1364 if err := tx.Update(&du); err != nil {
1365 return fmt.Errorf("update disk usage: %v", err)
1366 }
1367 }
1368 return nil
1369 })
1370 ctl.xcheck(err, "write transaction for mailbox counts")
1371
1372 store.BroadcastChanges(acc, changes)
1373 })
1374 w.xclose()
1375
1376 case "fixmsgsize":
1377 /* protocol:
1378 > "fixmsgsize"
1379 > account or empty
1380 < "ok" or error
1381 < stream
1382 */
1383
1384 accountOpt := ctl.xread()
1385 ctl.xwriteok()
1386 w := ctl.writer()
1387
1388 var foundProblem bool
1389 const batchSize = 10000
1390
1391 xfixmsgsize := func(accName string) {
1392 acc, err := store.OpenAccount(log, accName)
1393 ctl.xcheck(err, "open account")
1394 defer func() {
1395 err := acc.Close()
1396 log.Check(err, "closing account after fixing message sizes")
1397 }()
1398
1399 total := 0
1400 var lastID int64
1401 for {
1402 var n int
1403
1404 acc.WithRLock(func() {
1405 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1406
1407 // Don't process all message in one transaction, we could block the account for too long.
1408 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1409 q := bstore.QueryTx[store.Message](tx)
1410 q.FilterEqual("Expunged", false)
1411 q.FilterGreater("ID", lastID)
1412 q.Limit(batchSize)
1413 q.SortAsc("ID")
1414 return q.ForEach(func(m store.Message) error {
1415 lastID = m.ID
1416 n++
1417
1418 p := acc.MessagePath(m.ID)
1419 st, err := os.Stat(p)
1420 if err != nil {
1421 mb := store.Mailbox{ID: m.MailboxID}
1422 if xerr := tx.Get(&mb); xerr != nil {
1423 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1424 ctl.xcheck(werr, "write")
1425 }
1426 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1427 ctl.xcheck(werr, "write")
1428 return nil
1429 }
1430 filesize := st.Size()
1431 correctSize := int64(len(m.MsgPrefix)) + filesize
1432 if m.Size == correctSize {
1433 return nil
1434 }
1435
1436 foundProblem = true
1437
1438 mb := store.Mailbox{ID: m.MailboxID}
1439 if err := tx.Get(&mb); err != nil {
1440 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1441 ctl.xcheck(werr, "write")
1442 }
1443 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1444 ctl.xcheck(err, "write")
1445
1446 // We assume that the original message size was accounted as stored in the mailbox
1447 // total size. If this isn't correct, the user can always run
1448 // recalculatemailboxcounts.
1449 mb.Size -= m.Size
1450 mb.Size += correctSize
1451 if err := tx.Update(&mb); err != nil {
1452 return fmt.Errorf("update mailbox counts: %v", err)
1453 }
1454 mailboxCounts[mb.ID] = mb
1455
1456 m.Size = correctSize
1457
1458 mr := acc.MessageReader(m)
1459 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1460 if err != nil {
1461 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
1462 ctl.xcheck(werr, "write")
1463 }
1464 m.ParsedBuf, err = json.Marshal(part)
1465 if err != nil {
1466 return fmt.Errorf("marshal parsed message: %v", err)
1467 }
1468 total++
1469 if err := tx.Update(&m); err != nil {
1470 return fmt.Errorf("update message: %v", err)
1471 }
1472 return nil
1473 })
1474
1475 })
1476 ctl.xcheck(err, "find and fix wrong message sizes")
1477
1478 var changes []store.Change
1479 for _, mb := range mailboxCounts {
1480 changes = append(changes, mb.ChangeCounts())
1481 }
1482 store.BroadcastChanges(acc, changes)
1483 })
1484 if n < batchSize {
1485 break
1486 }
1487 }
1488 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
1489 ctl.xcheck(err, "write")
1490 }
1491
1492 if accountOpt != "" {
1493 xfixmsgsize(accountOpt)
1494 } else {
1495 for i, accName := range mox.Conf.Accounts() {
1496 var line string
1497 if i > 0 {
1498 line = "\n"
1499 }
1500 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
1501 ctl.xcheck(err, "write")
1502 xfixmsgsize(accName)
1503 }
1504 }
1505 if foundProblem {
1506 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1507 ctl.xcheck(err, "write")
1508 }
1509
1510 w.xclose()
1511
1512 case "reparse":
1513 /* protocol:
1514 > "reparse"
1515 > account or empty
1516 < "ok" or error
1517 < stream
1518 */
1519
1520 accountOpt := ctl.xread()
1521 ctl.xwriteok()
1522 w := ctl.writer()
1523
1524 const batchSize = 100
1525
1526 xreparseAccount := func(accName string) {
1527 acc, err := store.OpenAccount(log, accName)
1528 ctl.xcheck(err, "open account")
1529 defer func() {
1530 err := acc.Close()
1531 log.Check(err, "closing account after reparsing messages")
1532 }()
1533
1534 total := 0
1535 var lastID int64
1536 for {
1537 var n int
1538 // Don't process all message in one transaction, we could block the account for too long.
1539 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1540 q := bstore.QueryTx[store.Message](tx)
1541 q.FilterEqual("Expunged", false)
1542 q.FilterGreater("ID", lastID)
1543 q.Limit(batchSize)
1544 q.SortAsc("ID")
1545 return q.ForEach(func(m store.Message) error {
1546 lastID = m.ID
1547 mr := acc.MessageReader(m)
1548 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1549 if err != nil {
1550 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
1551 ctl.xcheck(err, "write")
1552 }
1553 m.ParsedBuf, err = json.Marshal(p)
1554 if err != nil {
1555 return fmt.Errorf("marshal parsed message: %v", err)
1556 }
1557 total++
1558 n++
1559 if err := tx.Update(&m); err != nil {
1560 return fmt.Errorf("update message: %v", err)
1561 }
1562 return nil
1563 })
1564
1565 })
1566 ctl.xcheck(err, "update messages with parsed mime structure")
1567 if n < batchSize {
1568 break
1569 }
1570 }
1571 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
1572 ctl.xcheck(err, "write")
1573 }
1574
1575 if accountOpt != "" {
1576 xreparseAccount(accountOpt)
1577 } else {
1578 for i, accName := range mox.Conf.Accounts() {
1579 var line string
1580 if i > 0 {
1581 line = "\n"
1582 }
1583 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
1584 ctl.xcheck(err, "write")
1585 xreparseAccount(accName)
1586 }
1587 }
1588 w.xclose()
1589
1590 case "reassignthreads":
1591 /* protocol:
1592 > "reassignthreads"
1593 > account or empty
1594 < "ok" or error
1595 < stream
1596 */
1597
1598 accountOpt := ctl.xread()
1599 ctl.xwriteok()
1600 w := ctl.writer()
1601
1602 xreassignThreads := func(accName string) {
1603 acc, err := store.OpenAccount(log, accName)
1604 ctl.xcheck(err, "open account")
1605 defer func() {
1606 err := acc.Close()
1607 log.Check(err, "closing account after reassigning threads")
1608 }()
1609
1610 // We don't want to step on an existing upgrade process.
1611 err = acc.ThreadingWait(log)
1612 ctl.xcheck(err, "waiting for threading upgrade to finish")
1613 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1614
1615 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1616 const batchSize = 50000
1617 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1618 ctl.xcheck(err, "resetting threading fields")
1619 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1620 ctl.xcheck(err, "write")
1621
1622 // Assign threads again. Ideally we would do this in a single transaction, but
1623 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1624 err = acc.AssignThreads(ctx, log, nil, 0, 50000, w)
1625 ctl.xcheck(err, "reassign threads")
1626
1627 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1628 ctl.xcheck(err, "write")
1629 }
1630
1631 if accountOpt != "" {
1632 xreassignThreads(accountOpt)
1633 } else {
1634 for i, accName := range mox.Conf.Accounts() {
1635 var line string
1636 if i > 0 {
1637 line = "\n"
1638 }
1639 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
1640 ctl.xcheck(err, "write")
1641 xreassignThreads(accName)
1642 }
1643 }
1644 w.xclose()
1645
1646 case "backup":
1647 backupctl(ctx, ctl)
1648
1649 default:
1650 log.Info("unrecognized command", slog.String("cmd", cmd))
1651 ctl.xwrite("unrecognized command")
1652 return
1653 }
1654}
1655