1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "log/slog"
13 "maps"
14 "net"
15 "os"
16 "path/filepath"
17 "runtime/debug"
18 "sort"
19 "strconv"
20 "strings"
21 "time"
22
23 "github.com/mjl-/bstore"
24
25 "github.com/mjl-/mox/admin"
26 "github.com/mjl-/mox/config"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/queue"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/webapi"
36)
37
38// ctl represents a connection to the ctl unix domain socket of a running mox instance.
39// ctl provides functions to read/write commands/responses/data streams.
40type ctl struct {
41 cmd string // Set for server-side of commands.
42 conn net.Conn
43 r *bufio.Reader // Set for first reader.
44 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
45 log mlog.Log // If set, along with x, logging is done here.
46}
47
48// xctl opens a ctl connection.
49func xctl() *ctl {
50 p := mox.DataDirPath("ctl")
51 conn, err := net.Dial("unix", p)
52 if err != nil {
53 log.Fatalf("connecting to control socket at %q: %v", p, err)
54 }
55 ctl := &ctl{conn: conn}
56 version := ctl.xread()
57 if version != "ctlv0" {
58 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
59 }
60 return ctl
61}
62
63// Interpret msg as an error.
64// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
65func (c *ctl) xerror(msg string) {
66 if c.x == nil {
67 log.Fatalln(msg)
68 }
69 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
70 c.xwrite(msg)
71 panic(c.x)
72}
73
74// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
75// ctl.x is set, the error string is written to ctl, to be interpreted as an error
76// by the command reading from ctl.
77func (c *ctl) xcheck(err error, msg string) {
78 if err == nil {
79 return
80 }
81 if c.x == nil {
82 log.Fatalf("%s: %s", msg, err)
83 }
84 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
85 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
86 panic(c.x)
87}
88
89// Read a line and return it without trailing newline.
90func (c *ctl) xread() string {
91 if c.r == nil {
92 c.r = bufio.NewReader(c.conn)
93 }
94 line, err := c.r.ReadString('\n')
95 c.xcheck(err, "read from ctl")
96 return strings.TrimSuffix(line, "\n")
97}
98
99// Read a line. If not "ok", the string is interpreted as an error.
100func (c *ctl) xreadok() {
101 line := c.xread()
102 if line != "ok" {
103 c.xerror(line)
104 }
105}
106
107// Write a string, typically a command or parameter.
108func (c *ctl) xwrite(text string) {
109 _, err := fmt.Fprintln(c.conn, text)
110 c.xcheck(err, "write")
111}
112
113// Write "ok" to indicate success.
114func (c *ctl) xwriteok() {
115 c.xwrite("ok")
116}
117
118// Copy data from a stream from ctl to dst.
119func (c *ctl) xstreamto(dst io.Writer) {
120 _, err := io.Copy(dst, c.reader())
121 c.xcheck(err, "reading message")
122}
123
124// Copy data from src to a stream to ctl.
125func (c *ctl) xstreamfrom(src io.Reader) {
126 w := c.writer()
127 _, err := io.Copy(w, src)
128 c.xcheck(err, "copying")
129 w.xclose()
130}
131
132// Writer returns an io.Writer for a data stream to ctl.
133// When done writing, caller must call xclose to signal the end of the stream.
134// Behaviour of "x" is copied from ctl.
135func (c *ctl) writer() *ctlwriter {
136 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
137}
138
139// Reader returns an io.Reader for a data stream from ctl.
140// Behaviour of "x" is copied from ctl.
141func (c *ctl) reader() *ctlreader {
142 if c.r == nil {
143 c.r = bufio.NewReader(c.conn)
144 }
145 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
146}
147
148/*
149Ctlwriter and ctlreader implement the writing and reading a data stream. They
150implement the io.Writer and io.Reader interface. In the protocol below each
151non-data message ends with a newline that is typically stripped when
152interpreting.
153
154Zero or more data transactions:
155
156 > "123" (for data size) or an error message
157 > data, 123 bytes
158 < "ok" or an error message
159
160Followed by a end of stream indicated by zero data bytes message:
161
162 > "0"
163*/
164
165type ctlwriter struct {
166 cmd string // Set for server-side of commands.
167 conn net.Conn // Ctl socket from which messages are read.
168 buf []byte // Scratch buffer, for reading response.
169 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
170 log mlog.Log
171}
172
173func (s *ctlwriter) Write(buf []byte) (int, error) {
174 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
175 s.xcheck(err, "write count")
176 _, err = s.conn.Write(buf)
177 s.xcheck(err, "write data")
178 if s.buf == nil {
179 s.buf = make([]byte, 512)
180 }
181 n, err := s.conn.Read(s.buf)
182 s.xcheck(err, "reading response to write")
183 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
184 if line != "ok" {
185 s.xerror(line)
186 }
187 return len(buf), nil
188}
189
190func (s *ctlwriter) xerror(msg string) {
191 if s.x == nil {
192 log.Fatalln(msg)
193 } else {
194 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
195 panic(s.x)
196 }
197}
198
199func (s *ctlwriter) xcheck(err error, msg string) {
200 if err == nil {
201 return
202 }
203 if s.x == nil {
204 log.Fatalf("%s: %s", msg, err)
205 } else {
206 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
207 panic(s.x)
208 }
209}
210
211func (s *ctlwriter) xclose() {
212 _, err := fmt.Fprintf(s.conn, "0\n")
213 s.xcheck(err, "write eof")
214}
215
216type ctlreader struct {
217 cmd string // Set for server-side of command.
218 conn net.Conn // For writing "ok" after reading.
219 r *bufio.Reader // Buffered ctl socket.
220 err error // If set, returned for each read. can also be io.EOF.
221 npending int // Number of bytes that can still be read until a new count line must be read.
222 x any // If set, errors are handled with panic(x) instead of log.Fatal.
223 log mlog.Log // If x is set, logging goes to log.
224}
225
226func (s *ctlreader) Read(buf []byte) (N int, Err error) {
227 if s.err != nil {
228 return 0, s.err
229 }
230 if s.npending == 0 {
231 line, err := s.r.ReadString('\n')
232 s.xcheck(err, "reading count")
233 line = strings.TrimSuffix(line, "\n")
234 n, err := strconv.ParseInt(line, 10, 32)
235 if err != nil {
236 s.xerror(line)
237 }
238 if n == 0 {
239 s.err = io.EOF
240 return 0, s.err
241 }
242 s.npending = int(n)
243 }
244 rn := len(buf)
245 if rn > s.npending {
246 rn = s.npending
247 }
248 n, err := s.r.Read(buf[:rn])
249 s.xcheck(err, "read from ctl")
250 s.npending -= n
251 if s.npending == 0 {
252 _, err = fmt.Fprintln(s.conn, "ok")
253 s.xcheck(err, "writing ok after reading")
254 }
255 return n, err
256}
257
258func (s *ctlreader) xerror(msg string) {
259 if s.x == nil {
260 log.Fatalln(msg)
261 } else {
262 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
263 panic(s.x)
264 }
265}
266
267func (s *ctlreader) xcheck(err error, msg string) {
268 if err == nil {
269 return
270 }
271 if s.x == nil {
272 log.Fatalf("%s: %s", msg, err)
273 } else {
274 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
275 panic(s.x)
276 }
277}
278
279// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
280func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
281 log.Debug("ctl connection")
282
283 var stop = struct{}{} // Sentinel value for panic and recover.
284 ctl := &ctl{conn: conn, x: stop, log: log}
285 defer func() {
286 x := recover()
287 if x == nil || x == stop {
288 return
289 }
290 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
291 debug.PrintStack()
292 metrics.PanicInc(metrics.Ctl)
293 }()
294
295 defer conn.Close()
296
297 ctl.xwrite("ctlv0")
298 for {
299 servectlcmd(ctx, ctl, shutdown)
300 }
301}
302
303func xparseJSON(ctl *ctl, s string, v any) {
304 dec := json.NewDecoder(strings.NewReader(s))
305 dec.DisallowUnknownFields()
306 err := dec.Decode(v)
307 ctl.xcheck(err, "parsing from ctl as json")
308}
309
310func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
311 log := ctl.log
312 cmd := ctl.xread()
313 ctl.cmd = cmd
314 log.Info("ctl command", slog.String("cmd", cmd))
315 switch cmd {
316 case "stop":
317 shutdown()
318 os.Exit(0)
319
320 case "deliver":
321 /* The protocol, double quoted are literals.
322
323 > "deliver"
324 > address
325 < "ok"
326 > stream
327 < "ok"
328 */
329
330 to := ctl.xread()
331 a, addr, err := store.OpenEmail(log, to)
332 ctl.xcheck(err, "lookup destination address")
333
334 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
335 ctl.xcheck(err, "creating temporary message file")
336 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
337 mw := message.NewWriter(msgFile)
338 ctl.xwriteok()
339
340 ctl.xstreamto(mw)
341 err = msgFile.Sync()
342 ctl.xcheck(err, "syncing message to storage")
343
344 m := store.Message{
345 Received: time.Now(),
346 Size: mw.Size,
347 }
348
349 a.WithWLock(func() {
350 err := a.DeliverDestination(log, addr, &m, msgFile)
351 ctl.xcheck(err, "delivering message")
352 log.Info("message delivered through ctl", slog.Any("to", to))
353 })
354
355 err = a.Close()
356 ctl.xcheck(err, "closing account")
357 ctl.xwriteok()
358
359 case "setaccountpassword":
360 /* protocol:
361 > "setaccountpassword"
362 > account
363 > password
364 < "ok" or error
365 */
366
367 account := ctl.xread()
368 pw := ctl.xread()
369
370 acc, err := store.OpenAccount(log, account)
371 ctl.xcheck(err, "open account")
372 defer func() {
373 if acc != nil {
374 err := acc.Close()
375 log.Check(err, "closing account after setting password")
376 }
377 }()
378
379 err = acc.SetPassword(log, pw)
380 ctl.xcheck(err, "setting password")
381 err = acc.Close()
382 ctl.xcheck(err, "closing account")
383 acc = nil
384 ctl.xwriteok()
385
386 case "queueholdruleslist":
387 /* protocol:
388 > "queueholdruleslist"
389 < "ok"
390 < stream
391 */
392 l, err := queue.HoldRuleList(ctx)
393 ctl.xcheck(err, "listing hold rules")
394 ctl.xwriteok()
395 xw := ctl.writer()
396 fmt.Fprintln(xw, "hold rules:")
397 for _, hr := range l {
398 var elems []string
399 if hr.Account != "" {
400 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
401 }
402 var zerodom dns.Domain
403 if hr.SenderDomain != zerodom {
404 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
405 }
406 if hr.RecipientDomain != zerodom {
407 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
408 }
409 if len(elems) == 0 {
410 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
411 } else {
412 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
413 }
414 }
415 if len(l) == 0 {
416 fmt.Fprint(xw, "(none)\n")
417 }
418 xw.xclose()
419
420 case "queueholdrulesadd":
421 /* protocol:
422 > "queueholdrulesadd"
423 > account
424 > senderdomainstr
425 > recipientdomainstr
426 < "ok" or error
427 */
428 var hr queue.HoldRule
429 hr.Account = ctl.xread()
430 senderdomstr := ctl.xread()
431 rcptdomstr := ctl.xread()
432 var err error
433 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
434 ctl.xcheck(err, "parsing sender domain")
435 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
436 ctl.xcheck(err, "parsing recipient domain")
437 hr, err = queue.HoldRuleAdd(ctx, log, hr)
438 ctl.xcheck(err, "add hold rule")
439 ctl.xwriteok()
440
441 case "queueholdrulesremove":
442 /* protocol:
443 > "queueholdrulesremove"
444 > id
445 < "ok" or error
446 */
447 idstr := ctl.xread()
448 id, err := strconv.ParseInt(idstr, 10, 64)
449 ctl.xcheck(err, "parsing id")
450 err = queue.HoldRuleRemove(ctx, log, id)
451 ctl.xcheck(err, "remove hold rule")
452 ctl.xwriteok()
453
454 case "queuelist":
455 /* protocol:
456 > "queuelist"
457 > filters as json
458 > sort as json
459 < "ok"
460 < stream
461 */
462 filterline := ctl.xread()
463 sortline := ctl.xread()
464 var f queue.Filter
465 xparseJSON(ctl, filterline, &f)
466 var s queue.Sort
467 xparseJSON(ctl, sortline, &s)
468 qmsgs, err := queue.List(ctx, f, s)
469 ctl.xcheck(err, "listing queue")
470 ctl.xwriteok()
471
472 xw := ctl.writer()
473 fmt.Fprintln(xw, "messages:")
474 for _, qm := range qmsgs {
475 var lastAttempt string
476 if qm.LastAttempt != nil {
477 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
478 }
479 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
480 }
481 if len(qmsgs) == 0 {
482 fmt.Fprint(xw, "(none)\n")
483 }
484 xw.xclose()
485
486 case "queueholdset":
487 /* protocol:
488 > "queueholdset"
489 > queuefilters as json
490 > "true" or "false"
491 < "ok" or error
492 < count
493 */
494
495 filterline := ctl.xread()
496 hold := ctl.xread() == "true"
497 var f queue.Filter
498 xparseJSON(ctl, filterline, &f)
499 count, err := queue.HoldSet(ctx, f, hold)
500 ctl.xcheck(err, "setting on hold status for messages")
501 ctl.xwriteok()
502 ctl.xwrite(fmt.Sprintf("%d", count))
503
504 case "queueschedule":
505 /* protocol:
506 > "queueschedule"
507 > queuefilters as json
508 > relative to now
509 > duration
510 < "ok" or error
511 < count
512 */
513
514 filterline := ctl.xread()
515 relnow := ctl.xread()
516 duration := ctl.xread()
517 var f queue.Filter
518 xparseJSON(ctl, filterline, &f)
519 d, err := time.ParseDuration(duration)
520 ctl.xcheck(err, "parsing duration for next delivery attempt")
521 var count int
522 if relnow == "" {
523 count, err = queue.NextAttemptAdd(ctx, f, d)
524 } else {
525 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
526 }
527 ctl.xcheck(err, "setting next delivery attempts in queue")
528 ctl.xwriteok()
529 ctl.xwrite(fmt.Sprintf("%d", count))
530
531 case "queuetransport":
532 /* protocol:
533 > "queuetransport"
534 > queuefilters as json
535 > transport
536 < "ok" or error
537 < count
538 */
539
540 filterline := ctl.xread()
541 transport := ctl.xread()
542 var f queue.Filter
543 xparseJSON(ctl, filterline, &f)
544 count, err := queue.TransportSet(ctx, f, transport)
545 ctl.xcheck(err, "adding to next delivery attempts in queue")
546 ctl.xwriteok()
547 ctl.xwrite(fmt.Sprintf("%d", count))
548
549 case "queuerequiretls":
550 /* protocol:
551 > "queuerequiretls"
552 > queuefilters as json
553 > reqtls (empty string, "true" or "false")
554 < "ok" or error
555 < count
556 */
557
558 filterline := ctl.xread()
559 reqtls := ctl.xread()
560 var req *bool
561 switch reqtls {
562 case "":
563 case "true":
564 v := true
565 req = &v
566 case "false":
567 v := false
568 req = &v
569 default:
570 ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
571 }
572 var f queue.Filter
573 xparseJSON(ctl, filterline, &f)
574 count, err := queue.RequireTLSSet(ctx, f, req)
575 ctl.xcheck(err, "setting tls requirements on messages in queue")
576 ctl.xwriteok()
577 ctl.xwrite(fmt.Sprintf("%d", count))
578
579 case "queuefail":
580 /* protocol:
581 > "queuefail"
582 > queuefilters as json
583 < "ok" or error
584 < count
585 */
586
587 filterline := ctl.xread()
588 var f queue.Filter
589 xparseJSON(ctl, filterline, &f)
590 count, err := queue.Fail(ctx, log, f)
591 ctl.xcheck(err, "marking messages from queue as failed")
592 ctl.xwriteok()
593 ctl.xwrite(fmt.Sprintf("%d", count))
594
595 case "queuedrop":
596 /* protocol:
597 > "queuedrop"
598 > queuefilters as json
599 < "ok" or error
600 < count
601 */
602
603 filterline := ctl.xread()
604 var f queue.Filter
605 xparseJSON(ctl, filterline, &f)
606 count, err := queue.Drop(ctx, log, f)
607 ctl.xcheck(err, "dropping messages from queue")
608 ctl.xwriteok()
609 ctl.xwrite(fmt.Sprintf("%d", count))
610
611 case "queuedump":
612 /* protocol:
613 > "queuedump"
614 > id
615 < "ok" or error
616 < stream
617 */
618
619 idstr := ctl.xread()
620 id, err := strconv.ParseInt(idstr, 10, 64)
621 if err != nil {
622 ctl.xcheck(err, "parsing id")
623 }
624 mr, err := queue.OpenMessage(ctx, id)
625 ctl.xcheck(err, "opening message")
626 defer func() {
627 err := mr.Close()
628 log.Check(err, "closing message from queue")
629 }()
630 ctl.xwriteok()
631 ctl.xstreamfrom(mr)
632
633 case "queueretiredlist":
634 /* protocol:
635 > "queueretiredlist"
636 > filters as json
637 > sort as json
638 < "ok"
639 < stream
640 */
641 filterline := ctl.xread()
642 sortline := ctl.xread()
643 var f queue.RetiredFilter
644 xparseJSON(ctl, filterline, &f)
645 var s queue.RetiredSort
646 xparseJSON(ctl, sortline, &s)
647 qmsgs, err := queue.RetiredList(ctx, f, s)
648 ctl.xcheck(err, "listing retired queue")
649 ctl.xwriteok()
650
651 xw := ctl.writer()
652 fmt.Fprintln(xw, "retired messages:")
653 for _, qm := range qmsgs {
654 var lastAttempt string
655 if qm.LastAttempt != nil {
656 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
657 }
658 result := "failure"
659 if qm.Success {
660 result = "success"
661 }
662 sender, err := qm.Sender()
663 xcheckf(err, "parsing sender")
664 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
665 }
666 if len(qmsgs) == 0 {
667 fmt.Fprint(xw, "(none)\n")
668 }
669 xw.xclose()
670
671 case "queueretiredprint":
672 /* protocol:
673 > "queueretiredprint"
674 > id
675 < "ok"
676 < stream
677 */
678 idstr := ctl.xread()
679 id, err := strconv.ParseInt(idstr, 10, 64)
680 if err != nil {
681 ctl.xcheck(err, "parsing id")
682 }
683 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
684 ctl.xcheck(err, "getting retired messages")
685 if len(l) == 0 {
686 ctl.xcheck(errors.New("not found"), "getting retired message")
687 }
688 m := l[0]
689 ctl.xwriteok()
690 xw := ctl.writer()
691 enc := json.NewEncoder(xw)
692 enc.SetIndent("", "\t")
693 err = enc.Encode(m)
694 ctl.xcheck(err, "encode retired message")
695 xw.xclose()
696
697 case "queuehooklist":
698 /* protocol:
699 > "queuehooklist"
700 > filters as json
701 > sort as json
702 < "ok"
703 < stream
704 */
705 filterline := ctl.xread()
706 sortline := ctl.xread()
707 var f queue.HookFilter
708 xparseJSON(ctl, filterline, &f)
709 var s queue.HookSort
710 xparseJSON(ctl, sortline, &s)
711 hooks, err := queue.HookList(ctx, f, s)
712 ctl.xcheck(err, "listing webhooks")
713 ctl.xwriteok()
714
715 xw := ctl.writer()
716 fmt.Fprintln(xw, "webhooks:")
717 for _, h := range hooks {
718 var lastAttempt string
719 if len(h.Results) > 0 {
720 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
721 }
722 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
723 }
724 if len(hooks) == 0 {
725 fmt.Fprint(xw, "(none)\n")
726 }
727 xw.xclose()
728
729 case "queuehookschedule":
730 /* protocol:
731 > "queuehookschedule"
732 > hookfilters as json
733 > relative to now
734 > duration
735 < "ok" or error
736 < count
737 */
738
739 filterline := ctl.xread()
740 relnow := ctl.xread()
741 duration := ctl.xread()
742 var f queue.HookFilter
743 xparseJSON(ctl, filterline, &f)
744 d, err := time.ParseDuration(duration)
745 ctl.xcheck(err, "parsing duration for next delivery attempt")
746 var count int
747 if relnow == "" {
748 count, err = queue.HookNextAttemptAdd(ctx, f, d)
749 } else {
750 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
751 }
752 ctl.xcheck(err, "setting next delivery attempts in queue")
753 ctl.xwriteok()
754 ctl.xwrite(fmt.Sprintf("%d", count))
755
756 case "queuehookcancel":
757 /* protocol:
758 > "queuehookcancel"
759 > hookfilters as json
760 < "ok" or error
761 < count
762 */
763
764 filterline := ctl.xread()
765 var f queue.HookFilter
766 xparseJSON(ctl, filterline, &f)
767 count, err := queue.HookCancel(ctx, log, f)
768 ctl.xcheck(err, "canceling webhooks in queue")
769 ctl.xwriteok()
770 ctl.xwrite(fmt.Sprintf("%d", count))
771
772 case "queuehookprint":
773 /* protocol:
774 > "queuehookprint"
775 > id
776 < "ok"
777 < stream
778 */
779 idstr := ctl.xread()
780 id, err := strconv.ParseInt(idstr, 10, 64)
781 if err != nil {
782 ctl.xcheck(err, "parsing id")
783 }
784 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
785 ctl.xcheck(err, "getting webhooks")
786 if len(l) == 0 {
787 ctl.xcheck(errors.New("not found"), "getting webhook")
788 }
789 h := l[0]
790 ctl.xwriteok()
791 xw := ctl.writer()
792 enc := json.NewEncoder(xw)
793 enc.SetIndent("", "\t")
794 err = enc.Encode(h)
795 ctl.xcheck(err, "encode webhook")
796 xw.xclose()
797
798 case "queuehookretiredlist":
799 /* protocol:
800 > "queuehookretiredlist"
801 > filters as json
802 > sort as json
803 < "ok"
804 < stream
805 */
806 filterline := ctl.xread()
807 sortline := ctl.xread()
808 var f queue.HookRetiredFilter
809 xparseJSON(ctl, filterline, &f)
810 var s queue.HookRetiredSort
811 xparseJSON(ctl, sortline, &s)
812 l, err := queue.HookRetiredList(ctx, f, s)
813 ctl.xcheck(err, "listing retired webhooks")
814 ctl.xwriteok()
815
816 xw := ctl.writer()
817 fmt.Fprintln(xw, "retired webhooks:")
818 for _, h := range l {
819 var lastAttempt string
820 if len(h.Results) > 0 {
821 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
822 }
823 result := "success"
824 if !h.Success {
825 result = "failure"
826 }
827 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
828 }
829 if len(l) == 0 {
830 fmt.Fprint(xw, "(none)\n")
831 }
832 xw.xclose()
833
834 case "queuehookretiredprint":
835 /* protocol:
836 > "queuehookretiredprint"
837 > id
838 < "ok"
839 < stream
840 */
841 idstr := ctl.xread()
842 id, err := strconv.ParseInt(idstr, 10, 64)
843 if err != nil {
844 ctl.xcheck(err, "parsing id")
845 }
846 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
847 ctl.xcheck(err, "getting retired webhooks")
848 if len(l) == 0 {
849 ctl.xcheck(errors.New("not found"), "getting retired webhook")
850 }
851 h := l[0]
852 ctl.xwriteok()
853 xw := ctl.writer()
854 enc := json.NewEncoder(xw)
855 enc.SetIndent("", "\t")
856 err = enc.Encode(h)
857 ctl.xcheck(err, "encode retired webhook")
858 xw.xclose()
859
860 case "queuesuppresslist":
861 /* protocol:
862 > "queuesuppresslist"
863 > account (or empty)
864 < "ok" or error
865 < stream
866 */
867
868 account := ctl.xread()
869 l, err := queue.SuppressionList(ctx, account)
870 ctl.xcheck(err, "listing suppressions")
871 ctl.xwriteok()
872 xw := ctl.writer()
873 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
874 for _, sup := range l {
875 manual := "No"
876 if sup.Manual {
877 manual = "Yes"
878 }
879 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
880 }
881 if len(l) == 0 {
882 fmt.Fprintln(xw, "(none)")
883 }
884 xw.xclose()
885
886 case "queuesuppressadd":
887 /* protocol:
888 > "queuesuppressadd"
889 > account
890 > address
891 < "ok" or error
892 */
893
894 account := ctl.xread()
895 address := ctl.xread()
896 _, ok := mox.Conf.Account(account)
897 if !ok {
898 ctl.xcheck(errors.New("unknown account"), "looking up account")
899 }
900 addr, err := smtp.ParseAddress(address)
901 ctl.xcheck(err, "parsing address")
902 sup := webapi.Suppression{
903 Account: account,
904 Manual: true,
905 Reason: "added through mox cli",
906 }
907 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
908 ctl.xcheck(err, "adding suppression")
909 ctl.xwriteok()
910
911 case "queuesuppressremove":
912 /* protocol:
913 > "queuesuppressremove"
914 > account
915 > address
916 < "ok" or error
917 */
918
919 account := ctl.xread()
920 address := ctl.xread()
921 addr, err := smtp.ParseAddress(address)
922 ctl.xcheck(err, "parsing address")
923 err = queue.SuppressionRemove(ctx, account, addr.Path())
924 ctl.xcheck(err, "removing suppression")
925 ctl.xwriteok()
926
927 case "queuesuppresslookup":
928 /* protocol:
929 > "queuesuppresslookup"
930 > account or empty
931 > address
932 < "ok" or error
933 < stream
934 */
935
936 account := ctl.xread()
937 address := ctl.xread()
938 if account != "" {
939 _, ok := mox.Conf.Account(account)
940 if !ok {
941 ctl.xcheck(errors.New("unknown account"), "looking up account")
942 }
943 }
944 addr, err := smtp.ParseAddress(address)
945 ctl.xcheck(err, "parsing address")
946 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
947 ctl.xcheck(err, "looking up suppression")
948 ctl.xwriteok()
949 xw := ctl.writer()
950 if sup == nil {
951 fmt.Fprintln(xw, "not present")
952 } else {
953 manual := "no"
954 if sup.Manual {
955 manual = "yes"
956 }
957 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
958 }
959 xw.xclose()
960
961 case "importmaildir", "importmbox":
962 mbox := cmd == "importmbox"
963 importctl(ctx, ctl, mbox)
964
965 case "domainadd":
966 /* protocol:
967 > "domainadd"
968 > domain
969 > account
970 > localpart
971 < "ok" or error
972 */
973 domain := ctl.xread()
974 account := ctl.xread()
975 localpart := ctl.xread()
976 d, err := dns.ParseDomain(domain)
977 ctl.xcheck(err, "parsing domain")
978 err = admin.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
979 ctl.xcheck(err, "adding domain")
980 ctl.xwriteok()
981
982 case "domainrm":
983 /* protocol:
984 > "domainrm"
985 > domain
986 < "ok" or error
987 */
988 domain := ctl.xread()
989 d, err := dns.ParseDomain(domain)
990 ctl.xcheck(err, "parsing domain")
991 err = admin.DomainRemove(ctx, d)
992 ctl.xcheck(err, "removing domain")
993 ctl.xwriteok()
994
995 case "accountadd":
996 /* protocol:
997 > "accountadd"
998 > account
999 > address
1000 < "ok" or error
1001 */
1002 account := ctl.xread()
1003 address := ctl.xread()
1004 err := admin.AccountAdd(ctx, account, address)
1005 ctl.xcheck(err, "adding account")
1006 ctl.xwriteok()
1007
1008 case "accountrm":
1009 /* protocol:
1010 > "accountrm"
1011 > account
1012 < "ok" or error
1013 */
1014 account := ctl.xread()
1015 err := admin.AccountRemove(ctx, account)
1016 ctl.xcheck(err, "removing account")
1017 ctl.xwriteok()
1018
1019 case "tlspubkeylist":
1020 /* protocol:
1021 > "tlspubkeylist"
1022 > account (or empty)
1023 < "ok" or error
1024 < stream
1025 */
1026 accountOpt := ctl.xread()
1027 tlspubkeys, err := store.TLSPublicKeyList(ctx, accountOpt)
1028 ctl.xcheck(err, "list tls public keys")
1029 ctl.xwriteok()
1030 xw := ctl.writer()
1031 fmt.Fprintf(xw, "# fingerprint, type, name, account, login address, no imap preauth (%d)\n", len(tlspubkeys))
1032 for _, k := range tlspubkeys {
1033 fmt.Fprintf(xw, "%s\t%s\t%q\t%s\t%s\t%v\n", k.Fingerprint, k.Type, k.Name, k.Account, k.LoginAddress, k.NoIMAPPreauth)
1034 }
1035 xw.xclose()
1036
1037 case "tlspubkeyget":
1038 /* protocol:
1039 > "tlspubkeyget"
1040 > fingerprint
1041 < "ok" or error
1042 < type
1043 < name
1044 < account
1045 < address
1046 < noimappreauth (true/false)
1047 < stream (certder)
1048 */
1049 fp := ctl.xread()
1050 tlspubkey, err := store.TLSPublicKeyGet(ctx, fp)
1051 ctl.xcheck(err, "looking tls public key")
1052 ctl.xwriteok()
1053 ctl.xwrite(tlspubkey.Type)
1054 ctl.xwrite(tlspubkey.Name)
1055 ctl.xwrite(tlspubkey.Account)
1056 ctl.xwrite(tlspubkey.LoginAddress)
1057 ctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth))
1058 ctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER))
1059
1060 case "tlspubkeyadd":
1061 /* protocol:
1062 > "tlspubkeyadd"
1063 > loginaddress
1064 > name (or empty)
1065 > noimappreauth (true/false)
1066 > stream (certder)
1067 < "ok" or error
1068 */
1069 loginAddress := ctl.xread()
1070 name := ctl.xread()
1071 noimappreauth := ctl.xread()
1072 if noimappreauth != "true" && noimappreauth != "false" {
1073 ctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth")
1074 }
1075 var b bytes.Buffer
1076 ctl.xstreamto(&b)
1077 tlspubkey, err := store.ParseTLSPublicKeyCert(b.Bytes())
1078 ctl.xcheck(err, "parsing certificate")
1079 if name != "" {
1080 tlspubkey.Name = name
1081 }
1082 acc, _, err := store.OpenEmail(ctl.log, loginAddress)
1083 ctl.xcheck(err, "open account for address")
1084 defer func() {
1085 err := acc.Close()
1086 ctl.log.Check(err, "close account")
1087 }()
1088 tlspubkey.Account = acc.Name
1089 tlspubkey.LoginAddress = loginAddress
1090 tlspubkey.NoIMAPPreauth = noimappreauth == "true"
1091
1092 err = store.TLSPublicKeyAdd(ctx, &tlspubkey)
1093 ctl.xcheck(err, "adding tls public key")
1094 ctl.xwriteok()
1095
1096 case "tlspubkeyrm":
1097 /* protocol:
1098 > "tlspubkeyadd"
1099 > fingerprint
1100 < "ok" or error
1101 */
1102 fp := ctl.xread()
1103 err := store.TLSPublicKeyRemove(ctx, fp)
1104 ctl.xcheck(err, "removing tls public key")
1105 ctl.xwriteok()
1106
1107 case "addressadd":
1108 /* protocol:
1109 > "addressadd"
1110 > address
1111 > account
1112 < "ok" or error
1113 */
1114 address := ctl.xread()
1115 account := ctl.xread()
1116 err := admin.AddressAdd(ctx, address, account)
1117 ctl.xcheck(err, "adding address")
1118 ctl.xwriteok()
1119
1120 case "addressrm":
1121 /* protocol:
1122 > "addressrm"
1123 > address
1124 < "ok" or error
1125 */
1126 address := ctl.xread()
1127 err := admin.AddressRemove(ctx, address)
1128 ctl.xcheck(err, "removing address")
1129 ctl.xwriteok()
1130
1131 case "aliaslist":
1132 /* protocol:
1133 > "aliaslist"
1134 > domain
1135 < "ok" or error
1136 < stream
1137 */
1138 domain := ctl.xread()
1139 d, err := dns.ParseDomain(domain)
1140 ctl.xcheck(err, "parsing domain")
1141 dc, ok := mox.Conf.Domain(d)
1142 if !ok {
1143 ctl.xcheck(errors.New("no such domain"), "listing aliases")
1144 }
1145 ctl.xwriteok()
1146 w := ctl.writer()
1147 for _, a := range dc.Aliases {
1148 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1149 ctl.xcheck(err, "parsing alias localpart")
1150 fmt.Fprintln(w, smtp.NewAddress(lp, a.Domain).Pack(true))
1151 }
1152 w.xclose()
1153
1154 case "aliasprint":
1155 /* protocol:
1156 > "aliasprint"
1157 > address
1158 < "ok" or error
1159 < stream
1160 */
1161 address := ctl.xread()
1162 _, alias, ok := mox.Conf.AccountDestination(address)
1163 if !ok {
1164 ctl.xcheck(errors.New("no such address"), "looking up alias")
1165 } else if alias == nil {
1166 ctl.xcheck(errors.New("address not an alias"), "looking up alias")
1167 }
1168 ctl.xwriteok()
1169 w := ctl.writer()
1170 fmt.Fprintf(w, "# postpublic %v\n", alias.PostPublic)
1171 fmt.Fprintf(w, "# listmembers %v\n", alias.ListMembers)
1172 fmt.Fprintf(w, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1173 fmt.Fprintln(w, "# members:")
1174 for _, a := range alias.Addresses {
1175 fmt.Fprintln(w, a)
1176 }
1177 w.xclose()
1178
1179 case "aliasadd":
1180 /* protocol:
1181 > "aliasadd"
1182 > address
1183 > json alias
1184 < "ok" or error
1185 */
1186 address := ctl.xread()
1187 line := ctl.xread()
1188 addr, err := smtp.ParseAddress(address)
1189 ctl.xcheck(err, "parsing address")
1190 var alias config.Alias
1191 xparseJSON(ctl, line, &alias)
1192 err = admin.AliasAdd(ctx, addr, alias)
1193 ctl.xcheck(err, "adding alias")
1194 ctl.xwriteok()
1195
1196 case "aliasupdate":
1197 /* protocol:
1198 > "aliasupdate"
1199 > alias
1200 > "true" or "false" for postpublic
1201 > "true" or "false" for listmembers
1202 > "true" or "false" for allowmsgfrom
1203 < "ok" or error
1204 */
1205 address := ctl.xread()
1206 postpublic := ctl.xread()
1207 listmembers := ctl.xread()
1208 allowmsgfrom := ctl.xread()
1209 addr, err := smtp.ParseAddress(address)
1210 ctl.xcheck(err, "parsing address")
1211 err = admin.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1212 a, ok := d.Aliases[addr.Localpart.String()]
1213 if !ok {
1214 return fmt.Errorf("alias does not exist")
1215 }
1216
1217 switch postpublic {
1218 case "false":
1219 a.PostPublic = false
1220 case "true":
1221 a.PostPublic = true
1222 }
1223 switch listmembers {
1224 case "false":
1225 a.ListMembers = false
1226 case "true":
1227 a.ListMembers = true
1228 }
1229 switch allowmsgfrom {
1230 case "false":
1231 a.AllowMsgFrom = false
1232 case "true":
1233 a.AllowMsgFrom = true
1234 }
1235
1236 d.Aliases = maps.Clone(d.Aliases)
1237 d.Aliases[addr.Localpart.String()] = a
1238 return nil
1239 })
1240 ctl.xcheck(err, "saving alias")
1241 ctl.xwriteok()
1242
1243 case "aliasrm":
1244 /* protocol:
1245 > "aliasrm"
1246 > alias
1247 < "ok" or error
1248 */
1249 address := ctl.xread()
1250 addr, err := smtp.ParseAddress(address)
1251 ctl.xcheck(err, "parsing address")
1252 err = admin.AliasRemove(ctx, addr)
1253 ctl.xcheck(err, "removing alias")
1254 ctl.xwriteok()
1255
1256 case "aliasaddaddr":
1257 /* protocol:
1258 > "aliasaddaddr"
1259 > alias
1260 > addresses as json
1261 < "ok" or error
1262 */
1263 address := ctl.xread()
1264 line := ctl.xread()
1265 addr, err := smtp.ParseAddress(address)
1266 ctl.xcheck(err, "parsing address")
1267 var addresses []string
1268 xparseJSON(ctl, line, &addresses)
1269 err = admin.AliasAddressesAdd(ctx, addr, addresses)
1270 ctl.xcheck(err, "adding addresses to alias")
1271 ctl.xwriteok()
1272
1273 case "aliasrmaddr":
1274 /* protocol:
1275 > "aliasrmaddr"
1276 > alias
1277 > addresses as json
1278 < "ok" or error
1279 */
1280 address := ctl.xread()
1281 line := ctl.xread()
1282 addr, err := smtp.ParseAddress(address)
1283 ctl.xcheck(err, "parsing address")
1284 var addresses []string
1285 xparseJSON(ctl, line, &addresses)
1286 err = admin.AliasAddressesRemove(ctx, addr, addresses)
1287 ctl.xcheck(err, "removing addresses to alias")
1288 ctl.xwriteok()
1289
1290 case "loglevels":
1291 /* protocol:
1292 > "loglevels"
1293 < "ok"
1294 < stream
1295 */
1296 ctl.xwriteok()
1297 l := mox.Conf.LogLevels()
1298 keys := []string{}
1299 for k := range l {
1300 keys = append(keys, k)
1301 }
1302 sort.Slice(keys, func(i, j int) bool {
1303 return keys[i] < keys[j]
1304 })
1305 s := ""
1306 for _, k := range keys {
1307 ks := k
1308 if ks == "" {
1309 ks = "(default)"
1310 }
1311 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1312 }
1313 ctl.xstreamfrom(strings.NewReader(s))
1314
1315 case "setloglevels":
1316 /* protocol:
1317 > "setloglevels"
1318 > pkg
1319 > level (if empty, log level for pkg will be unset)
1320 < "ok" or error
1321 */
1322 pkg := ctl.xread()
1323 levelstr := ctl.xread()
1324 if levelstr == "" {
1325 mox.Conf.LogLevelRemove(log, pkg)
1326 } else {
1327 level, ok := mlog.Levels[levelstr]
1328 if !ok {
1329 ctl.xerror("bad level")
1330 }
1331 mox.Conf.LogLevelSet(log, pkg, level)
1332 }
1333 ctl.xwriteok()
1334
1335 case "retrain":
1336 /* protocol:
1337 > "retrain"
1338 > account or empty
1339 < "ok" or error
1340 */
1341 account := ctl.xread()
1342
1343 xretrain := func(name string) {
1344 acc, err := store.OpenAccount(log, name)
1345 ctl.xcheck(err, "open account")
1346 defer func() {
1347 if acc != nil {
1348 err := acc.Close()
1349 log.Check(err, "closing account after retraining")
1350 }
1351 }()
1352
1353 // todo: can we retrain an account without holding a write lock? perhaps by writing a junkfilter to a new location, and staying informed of message changes while we go through all messages in the account?
1354
1355 acc.WithWLock(func() {
1356 conf, _ := acc.Conf()
1357 if conf.JunkFilter == nil {
1358 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1359 }
1360
1361 // Remove existing junk filter files.
1362 basePath := mox.DataDirPath("accounts")
1363 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1364 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1365 err := os.Remove(dbPath)
1366 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1367 err = os.Remove(bloomPath)
1368 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1369
1370 // Open junk filter, this creates new files.
1371 jf, _, err := acc.OpenJunkFilter(ctx, log)
1372 ctl.xcheck(err, "open new junk filter")
1373 defer func() {
1374 if jf == nil {
1375 return
1376 }
1377 err := jf.Close()
1378 log.Check(err, "closing junk filter during cleanup")
1379 }()
1380
1381 // Read through messages with junk or nonjunk flag set, and train them.
1382 var total, trained int
1383 q := bstore.QueryDB[store.Message](ctx, acc.DB)
1384 q.FilterEqual("Expunged", false)
1385 err = q.ForEach(func(m store.Message) error {
1386 total++
1387 ok, err := acc.TrainMessage(ctx, log, jf, m)
1388 if ok {
1389 trained++
1390 }
1391 return err
1392 })
1393 ctl.xcheck(err, "training messages")
1394 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1395
1396 // Close junk filter, marking success.
1397 err = jf.Close()
1398 jf = nil
1399 ctl.xcheck(err, "closing junk filter")
1400 })
1401 }
1402
1403 if account == "" {
1404 for _, name := range mox.Conf.Accounts() {
1405 xretrain(name)
1406 }
1407 } else {
1408 xretrain(account)
1409 }
1410 ctl.xwriteok()
1411
1412 case "recalculatemailboxcounts":
1413 /* protocol:
1414 > "recalculatemailboxcounts"
1415 > account
1416 < "ok" or error
1417 < stream
1418 */
1419 account := ctl.xread()
1420 acc, err := store.OpenAccount(log, account)
1421 ctl.xcheck(err, "open account")
1422 defer func() {
1423 if acc != nil {
1424 err := acc.Close()
1425 log.Check(err, "closing account after recalculating mailbox counts")
1426 }
1427 }()
1428 ctl.xwriteok()
1429
1430 w := ctl.writer()
1431
1432 acc.WithWLock(func() {
1433 var changes []store.Change
1434 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1435 var totalSize int64
1436 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1437 mc, err := mb.CalculateCounts(tx)
1438 if err != nil {
1439 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1440 }
1441 totalSize += mc.Size
1442
1443 if !mb.HaveCounts || mc != mb.MailboxCounts {
1444 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1445 ctl.xcheck(err, "write")
1446 mb.HaveCounts = true
1447 mb.MailboxCounts = mc
1448 if err := tx.Update(&mb); err != nil {
1449 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1450 }
1451 changes = append(changes, mb.ChangeCounts())
1452 }
1453 return nil
1454 })
1455 if err != nil {
1456 return err
1457 }
1458
1459 du := store.DiskUsage{ID: 1}
1460 if err := tx.Get(&du); err != nil {
1461 return fmt.Errorf("get disk usage: %v", err)
1462 }
1463 if du.MessageSize != totalSize {
1464 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1465 ctl.xcheck(err, "write")
1466 du.MessageSize = totalSize
1467 if err := tx.Update(&du); err != nil {
1468 return fmt.Errorf("update disk usage: %v", err)
1469 }
1470 }
1471 return nil
1472 })
1473 ctl.xcheck(err, "write transaction for mailbox counts")
1474
1475 store.BroadcastChanges(acc, changes)
1476 })
1477 w.xclose()
1478
1479 case "fixmsgsize":
1480 /* protocol:
1481 > "fixmsgsize"
1482 > account or empty
1483 < "ok" or error
1484 < stream
1485 */
1486
1487 accountOpt := ctl.xread()
1488 ctl.xwriteok()
1489 w := ctl.writer()
1490
1491 var foundProblem bool
1492 const batchSize = 10000
1493
1494 xfixmsgsize := func(accName string) {
1495 acc, err := store.OpenAccount(log, accName)
1496 ctl.xcheck(err, "open account")
1497 defer func() {
1498 err := acc.Close()
1499 log.Check(err, "closing account after fixing message sizes")
1500 }()
1501
1502 total := 0
1503 var lastID int64
1504 for {
1505 var n int
1506
1507 acc.WithRLock(func() {
1508 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1509
1510 // Don't process all message in one transaction, we could block the account for too long.
1511 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1512 q := bstore.QueryTx[store.Message](tx)
1513 q.FilterEqual("Expunged", false)
1514 q.FilterGreater("ID", lastID)
1515 q.Limit(batchSize)
1516 q.SortAsc("ID")
1517 return q.ForEach(func(m store.Message) error {
1518 lastID = m.ID
1519 n++
1520
1521 p := acc.MessagePath(m.ID)
1522 st, err := os.Stat(p)
1523 if err != nil {
1524 mb := store.Mailbox{ID: m.MailboxID}
1525 if xerr := tx.Get(&mb); xerr != nil {
1526 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1527 ctl.xcheck(werr, "write")
1528 }
1529 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1530 ctl.xcheck(werr, "write")
1531 return nil
1532 }
1533 filesize := st.Size()
1534 correctSize := int64(len(m.MsgPrefix)) + filesize
1535 if m.Size == correctSize {
1536 return nil
1537 }
1538
1539 foundProblem = true
1540
1541 mb := store.Mailbox{ID: m.MailboxID}
1542 if err := tx.Get(&mb); err != nil {
1543 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1544 ctl.xcheck(werr, "write")
1545 }
1546 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1547 ctl.xcheck(err, "write")
1548
1549 // We assume that the original message size was accounted as stored in the mailbox
1550 // total size. If this isn't correct, the user can always run
1551 // recalculatemailboxcounts.
1552 mb.Size -= m.Size
1553 mb.Size += correctSize
1554 if err := tx.Update(&mb); err != nil {
1555 return fmt.Errorf("update mailbox counts: %v", err)
1556 }
1557 mailboxCounts[mb.ID] = mb
1558
1559 m.Size = correctSize
1560
1561 mr := acc.MessageReader(m)
1562 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1563 if err != nil {
1564 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
1565 ctl.xcheck(werr, "write")
1566 }
1567 m.ParsedBuf, err = json.Marshal(part)
1568 if err != nil {
1569 return fmt.Errorf("marshal parsed message: %v", err)
1570 }
1571 total++
1572 if err := tx.Update(&m); err != nil {
1573 return fmt.Errorf("update message: %v", err)
1574 }
1575 return nil
1576 })
1577
1578 })
1579 ctl.xcheck(err, "find and fix wrong message sizes")
1580
1581 var changes []store.Change
1582 for _, mb := range mailboxCounts {
1583 changes = append(changes, mb.ChangeCounts())
1584 }
1585 store.BroadcastChanges(acc, changes)
1586 })
1587 if n < batchSize {
1588 break
1589 }
1590 }
1591 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
1592 ctl.xcheck(err, "write")
1593 }
1594
1595 if accountOpt != "" {
1596 xfixmsgsize(accountOpt)
1597 } else {
1598 for i, accName := range mox.Conf.Accounts() {
1599 var line string
1600 if i > 0 {
1601 line = "\n"
1602 }
1603 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
1604 ctl.xcheck(err, "write")
1605 xfixmsgsize(accName)
1606 }
1607 }
1608 if foundProblem {
1609 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1610 ctl.xcheck(err, "write")
1611 }
1612
1613 w.xclose()
1614
1615 case "reparse":
1616 /* protocol:
1617 > "reparse"
1618 > account or empty
1619 < "ok" or error
1620 < stream
1621 */
1622
1623 accountOpt := ctl.xread()
1624 ctl.xwriteok()
1625 w := ctl.writer()
1626
1627 const batchSize = 100
1628
1629 xreparseAccount := func(accName string) {
1630 acc, err := store.OpenAccount(log, accName)
1631 ctl.xcheck(err, "open account")
1632 defer func() {
1633 err := acc.Close()
1634 log.Check(err, "closing account after reparsing messages")
1635 }()
1636
1637 total := 0
1638 var lastID int64
1639 for {
1640 var n int
1641 // Don't process all message in one transaction, we could block the account for too long.
1642 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1643 q := bstore.QueryTx[store.Message](tx)
1644 q.FilterEqual("Expunged", false)
1645 q.FilterGreater("ID", lastID)
1646 q.Limit(batchSize)
1647 q.SortAsc("ID")
1648 return q.ForEach(func(m store.Message) error {
1649 lastID = m.ID
1650 mr := acc.MessageReader(m)
1651 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1652 if err != nil {
1653 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
1654 ctl.xcheck(err, "write")
1655 }
1656 m.ParsedBuf, err = json.Marshal(p)
1657 if err != nil {
1658 return fmt.Errorf("marshal parsed message: %v", err)
1659 }
1660 total++
1661 n++
1662 if err := tx.Update(&m); err != nil {
1663 return fmt.Errorf("update message: %v", err)
1664 }
1665 return nil
1666 })
1667
1668 })
1669 ctl.xcheck(err, "update messages with parsed mime structure")
1670 if n < batchSize {
1671 break
1672 }
1673 }
1674 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
1675 ctl.xcheck(err, "write")
1676 }
1677
1678 if accountOpt != "" {
1679 xreparseAccount(accountOpt)
1680 } else {
1681 for i, accName := range mox.Conf.Accounts() {
1682 var line string
1683 if i > 0 {
1684 line = "\n"
1685 }
1686 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
1687 ctl.xcheck(err, "write")
1688 xreparseAccount(accName)
1689 }
1690 }
1691 w.xclose()
1692
1693 case "reassignthreads":
1694 /* protocol:
1695 > "reassignthreads"
1696 > account or empty
1697 < "ok" or error
1698 < stream
1699 */
1700
1701 accountOpt := ctl.xread()
1702 ctl.xwriteok()
1703 w := ctl.writer()
1704
1705 xreassignThreads := func(accName string) {
1706 acc, err := store.OpenAccount(log, accName)
1707 ctl.xcheck(err, "open account")
1708 defer func() {
1709 err := acc.Close()
1710 log.Check(err, "closing account after reassigning threads")
1711 }()
1712
1713 // We don't want to step on an existing upgrade process.
1714 err = acc.ThreadingWait(log)
1715 ctl.xcheck(err, "waiting for threading upgrade to finish")
1716 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1717
1718 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1719 const batchSize = 50000
1720 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1721 ctl.xcheck(err, "resetting threading fields")
1722 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1723 ctl.xcheck(err, "write")
1724
1725 // Assign threads again. Ideally we would do this in a single transaction, but
1726 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1727 err = acc.AssignThreads(ctx, log, nil, 0, 50000, w)
1728 ctl.xcheck(err, "reassign threads")
1729
1730 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1731 ctl.xcheck(err, "write")
1732 }
1733
1734 if accountOpt != "" {
1735 xreassignThreads(accountOpt)
1736 } else {
1737 for i, accName := range mox.Conf.Accounts() {
1738 var line string
1739 if i > 0 {
1740 line = "\n"
1741 }
1742 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
1743 ctl.xcheck(err, "write")
1744 xreassignThreads(accName)
1745 }
1746 }
1747 w.xclose()
1748
1749 case "backup":
1750 backupctl(ctx, ctl)
1751
1752 default:
1753 log.Info("unrecognized command", slog.String("cmd", cmd))
1754 ctl.xwrite("unrecognized command")
1755 return
1756 }
1757}
1758