1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "log/slog"
13 "maps"
14 "net"
15 "os"
16 "path/filepath"
17 "runtime/debug"
18 "sort"
19 "strconv"
20 "strings"
21 "time"
22
23 "github.com/mjl-/bstore"
24
25 "github.com/mjl-/mox/admin"
26 "github.com/mjl-/mox/config"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/imapserver"
29 "github.com/mjl-/mox/message"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/queue"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36 "github.com/mjl-/mox/webapi"
37)
38
39// ctl represents a connection to the ctl unix domain socket of a running mox instance.
40// ctl provides functions to read/write commands/responses/data streams.
41type ctl struct {
42 cmd string // Set for server-side of commands.
43 conn net.Conn
44 r *bufio.Reader // Set for first reader.
45 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
46 log mlog.Log // If set, along with x, logging is done here.
47}
48
49// xctl opens a ctl connection.
50func xctl() *ctl {
51 p := mox.DataDirPath("ctl")
52 conn, err := net.Dial("unix", p)
53 if err != nil {
54 log.Fatalf("connecting to control socket at %q: %v", p, err)
55 }
56 ctl := &ctl{conn: conn}
57 version := ctl.xread()
58 if version != "ctlv0" {
59 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
60 }
61 return ctl
62}
63
64// Interpret msg as an error.
65// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
66func (c *ctl) xerror(msg string) {
67 if c.x == nil {
68 log.Fatalln(msg)
69 }
70 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
71 c.xwrite(msg)
72 panic(c.x)
73}
74
75// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
76// ctl.x is set, the error string is written to ctl, to be interpreted as an error
77// by the command reading from ctl.
78func (c *ctl) xcheck(err error, msg string) {
79 if err == nil {
80 return
81 }
82 if c.x == nil {
83 log.Fatalf("%s: %s", msg, err)
84 }
85 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
86 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
87 panic(c.x)
88}
89
90// Read a line and return it without trailing newline.
91func (c *ctl) xread() string {
92 if c.r == nil {
93 c.r = bufio.NewReader(c.conn)
94 }
95 line, err := c.r.ReadString('\n')
96 c.xcheck(err, "read from ctl")
97 return strings.TrimSuffix(line, "\n")
98}
99
100// Read a line. If not "ok", the string is interpreted as an error.
101func (c *ctl) xreadok() {
102 line := c.xread()
103 if line != "ok" {
104 c.xerror(line)
105 }
106}
107
108// Write a string, typically a command or parameter.
109func (c *ctl) xwrite(text string) {
110 _, err := fmt.Fprintln(c.conn, text)
111 c.xcheck(err, "write")
112}
113
114// Write "ok" to indicate success.
115func (c *ctl) xwriteok() {
116 c.xwrite("ok")
117}
118
119// Copy data from a stream from ctl to dst.
120func (c *ctl) xstreamto(dst io.Writer) {
121 _, err := io.Copy(dst, c.reader())
122 c.xcheck(err, "reading message")
123}
124
125// Copy data from src to a stream to ctl.
126func (c *ctl) xstreamfrom(src io.Reader) {
127 w := c.writer()
128 _, err := io.Copy(w, src)
129 c.xcheck(err, "copying")
130 w.xclose()
131}
132
133// Writer returns an io.Writer for a data stream to ctl.
134// When done writing, caller must call xclose to signal the end of the stream.
135// Behaviour of "x" is copied from ctl.
136func (c *ctl) writer() *ctlwriter {
137 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
138}
139
140// Reader returns an io.Reader for a data stream from ctl.
141// Behaviour of "x" is copied from ctl.
142func (c *ctl) reader() *ctlreader {
143 if c.r == nil {
144 c.r = bufio.NewReader(c.conn)
145 }
146 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
147}
148
149/*
150Ctlwriter and ctlreader implement the writing and reading a data stream. They
151implement the io.Writer and io.Reader interface. In the protocol below each
152non-data message ends with a newline that is typically stripped when
153interpreting.
154
155Zero or more data transactions:
156
157 > "123" (for data size) or an error message
158 > data, 123 bytes
159 < "ok" or an error message
160
161Followed by a end of stream indicated by zero data bytes message:
162
163 > "0"
164*/
165
166type ctlwriter struct {
167 cmd string // Set for server-side of commands.
168 conn net.Conn // Ctl socket from which messages are read.
169 buf []byte // Scratch buffer, for reading response.
170 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
171 log mlog.Log
172}
173
174func (s *ctlwriter) Write(buf []byte) (int, error) {
175 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
176 s.xcheck(err, "write count")
177 _, err = s.conn.Write(buf)
178 s.xcheck(err, "write data")
179 if s.buf == nil {
180 s.buf = make([]byte, 512)
181 }
182 n, err := s.conn.Read(s.buf)
183 s.xcheck(err, "reading response to write")
184 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
185 if line != "ok" {
186 s.xerror(line)
187 }
188 return len(buf), nil
189}
190
191func (s *ctlwriter) xerror(msg string) {
192 if s.x == nil {
193 log.Fatalln(msg)
194 } else {
195 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
196 panic(s.x)
197 }
198}
199
200func (s *ctlwriter) xcheck(err error, msg string) {
201 if err == nil {
202 return
203 }
204 if s.x == nil {
205 log.Fatalf("%s: %s", msg, err)
206 } else {
207 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
208 panic(s.x)
209 }
210}
211
212func (s *ctlwriter) xclose() {
213 _, err := fmt.Fprintf(s.conn, "0\n")
214 s.xcheck(err, "write eof")
215}
216
217type ctlreader struct {
218 cmd string // Set for server-side of command.
219 conn net.Conn // For writing "ok" after reading.
220 r *bufio.Reader // Buffered ctl socket.
221 err error // If set, returned for each read. can also be io.EOF.
222 npending int // Number of bytes that can still be read until a new count line must be read.
223 x any // If set, errors are handled with panic(x) instead of log.Fatal.
224 log mlog.Log // If x is set, logging goes to log.
225}
226
227func (s *ctlreader) Read(buf []byte) (N int, Err error) {
228 if s.err != nil {
229 return 0, s.err
230 }
231 if s.npending == 0 {
232 line, err := s.r.ReadString('\n')
233 s.xcheck(err, "reading count")
234 line = strings.TrimSuffix(line, "\n")
235 n, err := strconv.ParseInt(line, 10, 32)
236 if err != nil {
237 s.xerror(line)
238 }
239 if n == 0 {
240 s.err = io.EOF
241 return 0, s.err
242 }
243 s.npending = int(n)
244 }
245 rn := len(buf)
246 if rn > s.npending {
247 rn = s.npending
248 }
249 n, err := s.r.Read(buf[:rn])
250 s.xcheck(err, "read from ctl")
251 s.npending -= n
252 if s.npending == 0 {
253 _, err = fmt.Fprintln(s.conn, "ok")
254 s.xcheck(err, "writing ok after reading")
255 }
256 return n, err
257}
258
259func (s *ctlreader) xerror(msg string) {
260 if s.x == nil {
261 log.Fatalln(msg)
262 } else {
263 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
264 panic(s.x)
265 }
266}
267
268func (s *ctlreader) xcheck(err error, msg string) {
269 if err == nil {
270 return
271 }
272 if s.x == nil {
273 log.Fatalf("%s: %s", msg, err)
274 } else {
275 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
276 panic(s.x)
277 }
278}
279
280// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
281func servectl(ctx context.Context, cid int64, log mlog.Log, conn net.Conn, shutdown func()) {
282 log.Debug("ctl connection")
283
284 var stop = struct{}{} // Sentinel value for panic and recover.
285 ctl := &ctl{conn: conn, x: stop, log: log}
286 defer func() {
287 x := recover()
288 if x == nil || x == stop {
289 return
290 }
291 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
292 debug.PrintStack()
293 metrics.PanicInc(metrics.Ctl)
294 }()
295
296 defer conn.Close()
297
298 ctl.xwrite("ctlv0")
299 for {
300 servectlcmd(ctx, ctl, cid, shutdown)
301 }
302}
303
304func xparseJSON(ctl *ctl, s string, v any) {
305 dec := json.NewDecoder(strings.NewReader(s))
306 dec.DisallowUnknownFields()
307 err := dec.Decode(v)
308 ctl.xcheck(err, "parsing from ctl as json")
309}
310
311func servectlcmd(ctx context.Context, ctl *ctl, cid int64, shutdown func()) {
312 log := ctl.log
313 cmd := ctl.xread()
314 ctl.cmd = cmd
315 log.Info("ctl command", slog.String("cmd", cmd))
316 switch cmd {
317 case "stop":
318 shutdown()
319 os.Exit(0)
320
321 case "deliver":
322 /* The protocol, double quoted are literals.
323
324 > "deliver"
325 > address
326 < "ok"
327 > stream
328 < "ok"
329 */
330
331 to := ctl.xread()
332 a, _, addr, err := store.OpenEmail(log, to, false)
333 ctl.xcheck(err, "lookup destination address")
334
335 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
336 ctl.xcheck(err, "creating temporary message file")
337 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
338 mw := message.NewWriter(msgFile)
339 ctl.xwriteok()
340
341 ctl.xstreamto(mw)
342 err = msgFile.Sync()
343 ctl.xcheck(err, "syncing message to storage")
344
345 m := store.Message{
346 Received: time.Now(),
347 Size: mw.Size,
348 }
349
350 a.WithWLock(func() {
351 err := a.DeliverDestination(log, addr, &m, msgFile)
352 ctl.xcheck(err, "delivering message")
353 log.Info("message delivered through ctl", slog.Any("to", to))
354 })
355
356 err = a.Close()
357 ctl.xcheck(err, "closing account")
358 ctl.xwriteok()
359
360 case "setaccountpassword":
361 /* protocol:
362 > "setaccountpassword"
363 > account
364 > password
365 < "ok" or error
366 */
367
368 account := ctl.xread()
369 pw := ctl.xread()
370
371 acc, err := store.OpenAccount(log, account, false)
372 ctl.xcheck(err, "open account")
373 defer func() {
374 if acc != nil {
375 err := acc.Close()
376 log.Check(err, "closing account after setting password")
377 }
378 }()
379
380 err = acc.SetPassword(log, pw)
381 ctl.xcheck(err, "setting password")
382 err = acc.Close()
383 ctl.xcheck(err, "closing account")
384 acc = nil
385 ctl.xwriteok()
386
387 case "queueholdruleslist":
388 /* protocol:
389 > "queueholdruleslist"
390 < "ok"
391 < stream
392 */
393 l, err := queue.HoldRuleList(ctx)
394 ctl.xcheck(err, "listing hold rules")
395 ctl.xwriteok()
396 xw := ctl.writer()
397 fmt.Fprintln(xw, "hold rules:")
398 for _, hr := range l {
399 var elems []string
400 if hr.Account != "" {
401 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
402 }
403 var zerodom dns.Domain
404 if hr.SenderDomain != zerodom {
405 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
406 }
407 if hr.RecipientDomain != zerodom {
408 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
409 }
410 if len(elems) == 0 {
411 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
412 } else {
413 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
414 }
415 }
416 if len(l) == 0 {
417 fmt.Fprint(xw, "(none)\n")
418 }
419 xw.xclose()
420
421 case "queueholdrulesadd":
422 /* protocol:
423 > "queueholdrulesadd"
424 > account
425 > senderdomainstr
426 > recipientdomainstr
427 < "ok" or error
428 */
429 var hr queue.HoldRule
430 hr.Account = ctl.xread()
431 senderdomstr := ctl.xread()
432 rcptdomstr := ctl.xread()
433 var err error
434 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
435 ctl.xcheck(err, "parsing sender domain")
436 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
437 ctl.xcheck(err, "parsing recipient domain")
438 hr, err = queue.HoldRuleAdd(ctx, log, hr)
439 ctl.xcheck(err, "add hold rule")
440 ctl.xwriteok()
441
442 case "queueholdrulesremove":
443 /* protocol:
444 > "queueholdrulesremove"
445 > id
446 < "ok" or error
447 */
448 idstr := ctl.xread()
449 id, err := strconv.ParseInt(idstr, 10, 64)
450 ctl.xcheck(err, "parsing id")
451 err = queue.HoldRuleRemove(ctx, log, id)
452 ctl.xcheck(err, "remove hold rule")
453 ctl.xwriteok()
454
455 case "queuelist":
456 /* protocol:
457 > "queuelist"
458 > filters as json
459 > sort as json
460 < "ok"
461 < stream
462 */
463 filterline := ctl.xread()
464 sortline := ctl.xread()
465 var f queue.Filter
466 xparseJSON(ctl, filterline, &f)
467 var s queue.Sort
468 xparseJSON(ctl, sortline, &s)
469 qmsgs, err := queue.List(ctx, f, s)
470 ctl.xcheck(err, "listing queue")
471 ctl.xwriteok()
472
473 xw := ctl.writer()
474 fmt.Fprintln(xw, "messages:")
475 for _, qm := range qmsgs {
476 var lastAttempt string
477 if qm.LastAttempt != nil {
478 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
479 }
480 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
481 }
482 if len(qmsgs) == 0 {
483 fmt.Fprint(xw, "(none)\n")
484 }
485 xw.xclose()
486
487 case "queueholdset":
488 /* protocol:
489 > "queueholdset"
490 > queuefilters as json
491 > "true" or "false"
492 < "ok" or error
493 < count
494 */
495
496 filterline := ctl.xread()
497 hold := ctl.xread() == "true"
498 var f queue.Filter
499 xparseJSON(ctl, filterline, &f)
500 count, err := queue.HoldSet(ctx, f, hold)
501 ctl.xcheck(err, "setting on hold status for messages")
502 ctl.xwriteok()
503 ctl.xwrite(fmt.Sprintf("%d", count))
504
505 case "queueschedule":
506 /* protocol:
507 > "queueschedule"
508 > queuefilters as json
509 > relative to now
510 > duration
511 < "ok" or error
512 < count
513 */
514
515 filterline := ctl.xread()
516 relnow := ctl.xread()
517 duration := ctl.xread()
518 var f queue.Filter
519 xparseJSON(ctl, filterline, &f)
520 d, err := time.ParseDuration(duration)
521 ctl.xcheck(err, "parsing duration for next delivery attempt")
522 var count int
523 if relnow == "" {
524 count, err = queue.NextAttemptAdd(ctx, f, d)
525 } else {
526 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
527 }
528 ctl.xcheck(err, "setting next delivery attempts in queue")
529 ctl.xwriteok()
530 ctl.xwrite(fmt.Sprintf("%d", count))
531
532 case "queuetransport":
533 /* protocol:
534 > "queuetransport"
535 > queuefilters as json
536 > transport
537 < "ok" or error
538 < count
539 */
540
541 filterline := ctl.xread()
542 transport := ctl.xread()
543 var f queue.Filter
544 xparseJSON(ctl, filterline, &f)
545 count, err := queue.TransportSet(ctx, f, transport)
546 ctl.xcheck(err, "adding to next delivery attempts in queue")
547 ctl.xwriteok()
548 ctl.xwrite(fmt.Sprintf("%d", count))
549
550 case "queuerequiretls":
551 /* protocol:
552 > "queuerequiretls"
553 > queuefilters as json
554 > reqtls (empty string, "true" or "false")
555 < "ok" or error
556 < count
557 */
558
559 filterline := ctl.xread()
560 reqtls := ctl.xread()
561 var req *bool
562 switch reqtls {
563 case "":
564 case "true":
565 v := true
566 req = &v
567 case "false":
568 v := false
569 req = &v
570 default:
571 ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
572 }
573 var f queue.Filter
574 xparseJSON(ctl, filterline, &f)
575 count, err := queue.RequireTLSSet(ctx, f, req)
576 ctl.xcheck(err, "setting tls requirements on messages in queue")
577 ctl.xwriteok()
578 ctl.xwrite(fmt.Sprintf("%d", count))
579
580 case "queuefail":
581 /* protocol:
582 > "queuefail"
583 > queuefilters as json
584 < "ok" or error
585 < count
586 */
587
588 filterline := ctl.xread()
589 var f queue.Filter
590 xparseJSON(ctl, filterline, &f)
591 count, err := queue.Fail(ctx, log, f)
592 ctl.xcheck(err, "marking messages from queue as failed")
593 ctl.xwriteok()
594 ctl.xwrite(fmt.Sprintf("%d", count))
595
596 case "queuedrop":
597 /* protocol:
598 > "queuedrop"
599 > queuefilters as json
600 < "ok" or error
601 < count
602 */
603
604 filterline := ctl.xread()
605 var f queue.Filter
606 xparseJSON(ctl, filterline, &f)
607 count, err := queue.Drop(ctx, log, f)
608 ctl.xcheck(err, "dropping messages from queue")
609 ctl.xwriteok()
610 ctl.xwrite(fmt.Sprintf("%d", count))
611
612 case "queuedump":
613 /* protocol:
614 > "queuedump"
615 > id
616 < "ok" or error
617 < stream
618 */
619
620 idstr := ctl.xread()
621 id, err := strconv.ParseInt(idstr, 10, 64)
622 if err != nil {
623 ctl.xcheck(err, "parsing id")
624 }
625 mr, err := queue.OpenMessage(ctx, id)
626 ctl.xcheck(err, "opening message")
627 defer func() {
628 err := mr.Close()
629 log.Check(err, "closing message from queue")
630 }()
631 ctl.xwriteok()
632 ctl.xstreamfrom(mr)
633
634 case "queueretiredlist":
635 /* protocol:
636 > "queueretiredlist"
637 > filters as json
638 > sort as json
639 < "ok"
640 < stream
641 */
642 filterline := ctl.xread()
643 sortline := ctl.xread()
644 var f queue.RetiredFilter
645 xparseJSON(ctl, filterline, &f)
646 var s queue.RetiredSort
647 xparseJSON(ctl, sortline, &s)
648 qmsgs, err := queue.RetiredList(ctx, f, s)
649 ctl.xcheck(err, "listing retired queue")
650 ctl.xwriteok()
651
652 xw := ctl.writer()
653 fmt.Fprintln(xw, "retired messages:")
654 for _, qm := range qmsgs {
655 var lastAttempt string
656 if qm.LastAttempt != nil {
657 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
658 }
659 result := "failure"
660 if qm.Success {
661 result = "success"
662 }
663 sender, err := qm.Sender()
664 xcheckf(err, "parsing sender")
665 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
666 }
667 if len(qmsgs) == 0 {
668 fmt.Fprint(xw, "(none)\n")
669 }
670 xw.xclose()
671
672 case "queueretiredprint":
673 /* protocol:
674 > "queueretiredprint"
675 > id
676 < "ok"
677 < stream
678 */
679 idstr := ctl.xread()
680 id, err := strconv.ParseInt(idstr, 10, 64)
681 if err != nil {
682 ctl.xcheck(err, "parsing id")
683 }
684 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
685 ctl.xcheck(err, "getting retired messages")
686 if len(l) == 0 {
687 ctl.xcheck(errors.New("not found"), "getting retired message")
688 }
689 m := l[0]
690 ctl.xwriteok()
691 xw := ctl.writer()
692 enc := json.NewEncoder(xw)
693 enc.SetIndent("", "\t")
694 err = enc.Encode(m)
695 ctl.xcheck(err, "encode retired message")
696 xw.xclose()
697
698 case "queuehooklist":
699 /* protocol:
700 > "queuehooklist"
701 > filters as json
702 > sort as json
703 < "ok"
704 < stream
705 */
706 filterline := ctl.xread()
707 sortline := ctl.xread()
708 var f queue.HookFilter
709 xparseJSON(ctl, filterline, &f)
710 var s queue.HookSort
711 xparseJSON(ctl, sortline, &s)
712 hooks, err := queue.HookList(ctx, f, s)
713 ctl.xcheck(err, "listing webhooks")
714 ctl.xwriteok()
715
716 xw := ctl.writer()
717 fmt.Fprintln(xw, "webhooks:")
718 for _, h := range hooks {
719 var lastAttempt string
720 if len(h.Results) > 0 {
721 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
722 }
723 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
724 }
725 if len(hooks) == 0 {
726 fmt.Fprint(xw, "(none)\n")
727 }
728 xw.xclose()
729
730 case "queuehookschedule":
731 /* protocol:
732 > "queuehookschedule"
733 > hookfilters as json
734 > relative to now
735 > duration
736 < "ok" or error
737 < count
738 */
739
740 filterline := ctl.xread()
741 relnow := ctl.xread()
742 duration := ctl.xread()
743 var f queue.HookFilter
744 xparseJSON(ctl, filterline, &f)
745 d, err := time.ParseDuration(duration)
746 ctl.xcheck(err, "parsing duration for next delivery attempt")
747 var count int
748 if relnow == "" {
749 count, err = queue.HookNextAttemptAdd(ctx, f, d)
750 } else {
751 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
752 }
753 ctl.xcheck(err, "setting next delivery attempts in queue")
754 ctl.xwriteok()
755 ctl.xwrite(fmt.Sprintf("%d", count))
756
757 case "queuehookcancel":
758 /* protocol:
759 > "queuehookcancel"
760 > hookfilters as json
761 < "ok" or error
762 < count
763 */
764
765 filterline := ctl.xread()
766 var f queue.HookFilter
767 xparseJSON(ctl, filterline, &f)
768 count, err := queue.HookCancel(ctx, log, f)
769 ctl.xcheck(err, "canceling webhooks in queue")
770 ctl.xwriteok()
771 ctl.xwrite(fmt.Sprintf("%d", count))
772
773 case "queuehookprint":
774 /* protocol:
775 > "queuehookprint"
776 > id
777 < "ok"
778 < stream
779 */
780 idstr := ctl.xread()
781 id, err := strconv.ParseInt(idstr, 10, 64)
782 if err != nil {
783 ctl.xcheck(err, "parsing id")
784 }
785 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
786 ctl.xcheck(err, "getting webhooks")
787 if len(l) == 0 {
788 ctl.xcheck(errors.New("not found"), "getting webhook")
789 }
790 h := l[0]
791 ctl.xwriteok()
792 xw := ctl.writer()
793 enc := json.NewEncoder(xw)
794 enc.SetIndent("", "\t")
795 err = enc.Encode(h)
796 ctl.xcheck(err, "encode webhook")
797 xw.xclose()
798
799 case "queuehookretiredlist":
800 /* protocol:
801 > "queuehookretiredlist"
802 > filters as json
803 > sort as json
804 < "ok"
805 < stream
806 */
807 filterline := ctl.xread()
808 sortline := ctl.xread()
809 var f queue.HookRetiredFilter
810 xparseJSON(ctl, filterline, &f)
811 var s queue.HookRetiredSort
812 xparseJSON(ctl, sortline, &s)
813 l, err := queue.HookRetiredList(ctx, f, s)
814 ctl.xcheck(err, "listing retired webhooks")
815 ctl.xwriteok()
816
817 xw := ctl.writer()
818 fmt.Fprintln(xw, "retired webhooks:")
819 for _, h := range l {
820 var lastAttempt string
821 if len(h.Results) > 0 {
822 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
823 }
824 result := "success"
825 if !h.Success {
826 result = "failure"
827 }
828 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
829 }
830 if len(l) == 0 {
831 fmt.Fprint(xw, "(none)\n")
832 }
833 xw.xclose()
834
835 case "queuehookretiredprint":
836 /* protocol:
837 > "queuehookretiredprint"
838 > id
839 < "ok"
840 < stream
841 */
842 idstr := ctl.xread()
843 id, err := strconv.ParseInt(idstr, 10, 64)
844 if err != nil {
845 ctl.xcheck(err, "parsing id")
846 }
847 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
848 ctl.xcheck(err, "getting retired webhooks")
849 if len(l) == 0 {
850 ctl.xcheck(errors.New("not found"), "getting retired webhook")
851 }
852 h := l[0]
853 ctl.xwriteok()
854 xw := ctl.writer()
855 enc := json.NewEncoder(xw)
856 enc.SetIndent("", "\t")
857 err = enc.Encode(h)
858 ctl.xcheck(err, "encode retired webhook")
859 xw.xclose()
860
861 case "queuesuppresslist":
862 /* protocol:
863 > "queuesuppresslist"
864 > account (or empty)
865 < "ok" or error
866 < stream
867 */
868
869 account := ctl.xread()
870 l, err := queue.SuppressionList(ctx, account)
871 ctl.xcheck(err, "listing suppressions")
872 ctl.xwriteok()
873 xw := ctl.writer()
874 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
875 for _, sup := range l {
876 manual := "No"
877 if sup.Manual {
878 manual = "Yes"
879 }
880 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
881 }
882 if len(l) == 0 {
883 fmt.Fprintln(xw, "(none)")
884 }
885 xw.xclose()
886
887 case "queuesuppressadd":
888 /* protocol:
889 > "queuesuppressadd"
890 > account
891 > address
892 < "ok" or error
893 */
894
895 account := ctl.xread()
896 address := ctl.xread()
897 _, ok := mox.Conf.Account(account)
898 if !ok {
899 ctl.xcheck(errors.New("unknown account"), "looking up account")
900 }
901 addr, err := smtp.ParseAddress(address)
902 ctl.xcheck(err, "parsing address")
903 sup := webapi.Suppression{
904 Account: account,
905 Manual: true,
906 Reason: "added through mox cli",
907 }
908 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
909 ctl.xcheck(err, "adding suppression")
910 ctl.xwriteok()
911
912 case "queuesuppressremove":
913 /* protocol:
914 > "queuesuppressremove"
915 > account
916 > address
917 < "ok" or error
918 */
919
920 account := ctl.xread()
921 address := ctl.xread()
922 addr, err := smtp.ParseAddress(address)
923 ctl.xcheck(err, "parsing address")
924 err = queue.SuppressionRemove(ctx, account, addr.Path())
925 ctl.xcheck(err, "removing suppression")
926 ctl.xwriteok()
927
928 case "queuesuppresslookup":
929 /* protocol:
930 > "queuesuppresslookup"
931 > account or empty
932 > address
933 < "ok" or error
934 < stream
935 */
936
937 account := ctl.xread()
938 address := ctl.xread()
939 if account != "" {
940 _, ok := mox.Conf.Account(account)
941 if !ok {
942 ctl.xcheck(errors.New("unknown account"), "looking up account")
943 }
944 }
945 addr, err := smtp.ParseAddress(address)
946 ctl.xcheck(err, "parsing address")
947 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
948 ctl.xcheck(err, "looking up suppression")
949 ctl.xwriteok()
950 xw := ctl.writer()
951 if sup == nil {
952 fmt.Fprintln(xw, "not present")
953 } else {
954 manual := "no"
955 if sup.Manual {
956 manual = "yes"
957 }
958 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
959 }
960 xw.xclose()
961
962 case "importmaildir", "importmbox":
963 mbox := cmd == "importmbox"
964 importctl(ctx, ctl, mbox)
965
966 case "domainadd":
967 /* protocol:
968 > "domainadd"
969 > disabled as "true" or "false"
970 > domain
971 > account
972 > localpart
973 < "ok" or error
974 */
975 var disabled bool
976 switch s := ctl.xread(); s {
977 case "true":
978 disabled = true
979 case "false":
980 disabled = false
981 default:
982 ctl.xcheck(fmt.Errorf("invalid value %q", s), "parsing disabled boolean")
983 }
984
985 domain := ctl.xread()
986 account := ctl.xread()
987 localpart := ctl.xread()
988 d, err := dns.ParseDomain(domain)
989 ctl.xcheck(err, "parsing domain")
990 err = admin.DomainAdd(ctx, disabled, d, account, smtp.Localpart(localpart))
991 ctl.xcheck(err, "adding domain")
992 ctl.xwriteok()
993
994 case "domainrm":
995 /* protocol:
996 > "domainrm"
997 > domain
998 < "ok" or error
999 */
1000 domain := ctl.xread()
1001 d, err := dns.ParseDomain(domain)
1002 ctl.xcheck(err, "parsing domain")
1003 err = admin.DomainRemove(ctx, d)
1004 ctl.xcheck(err, "removing domain")
1005 ctl.xwriteok()
1006
1007 case "domaindisabled":
1008 /* protocol:
1009 > "domaindisabled"
1010 > "true" or "false"
1011 > domain
1012 < "ok" or error
1013 */
1014 domain := ctl.xread()
1015 var disabled bool
1016 switch s := ctl.xread(); s {
1017 case "true":
1018 disabled = true
1019 case "false":
1020 disabled = false
1021 default:
1022 ctl.xerror("bad boolean value")
1023 }
1024 err := admin.DomainSave(ctx, domain, func(d *config.Domain) error {
1025 d.Disabled = disabled
1026 return nil
1027 })
1028 ctl.xcheck(err, "saving domain")
1029 ctl.xwriteok()
1030
1031 case "accountadd":
1032 /* protocol:
1033 > "accountadd"
1034 > account
1035 > address
1036 < "ok" or error
1037 */
1038 account := ctl.xread()
1039 address := ctl.xread()
1040 err := admin.AccountAdd(ctx, account, address)
1041 ctl.xcheck(err, "adding account")
1042 ctl.xwriteok()
1043
1044 case "accountrm":
1045 /* protocol:
1046 > "accountrm"
1047 > account
1048 < "ok" or error
1049 */
1050 account := ctl.xread()
1051 err := admin.AccountRemove(ctx, account)
1052 ctl.xcheck(err, "removing account")
1053 ctl.xwriteok()
1054
1055 case "accountdisabled":
1056 /* protocol:
1057 > "accountdisabled"
1058 > account
1059 > message (if empty, then enabled)
1060 < "ok" or error
1061 */
1062 account := ctl.xread()
1063 message := ctl.xread()
1064
1065 acc, err := store.OpenAccount(log, account, false)
1066 ctl.xcheck(err, "open account")
1067 defer func() {
1068 err := acc.Close()
1069 log.Check(err, "closing account")
1070 }()
1071
1072 err = admin.AccountSave(ctx, account, func(acc *config.Account) {
1073 acc.LoginDisabled = message
1074 })
1075 ctl.xcheck(err, "saving account")
1076
1077 err = acc.SessionsClear(ctx, ctl.log)
1078 ctl.xcheck(err, "clearing active web sessions")
1079
1080 ctl.xwriteok()
1081
1082 case "accountenable":
1083 /* protocol:
1084 > "accountenable"
1085 > account
1086 < "ok" or error
1087 */
1088 account := ctl.xread()
1089 err := admin.AccountSave(ctx, account, func(acc *config.Account) {
1090 acc.LoginDisabled = ""
1091 })
1092 ctl.xcheck(err, "enabling account")
1093 ctl.xwriteok()
1094
1095 case "tlspubkeylist":
1096 /* protocol:
1097 > "tlspubkeylist"
1098 > account (or empty)
1099 < "ok" or error
1100 < stream
1101 */
1102 accountOpt := ctl.xread()
1103 tlspubkeys, err := store.TLSPublicKeyList(ctx, accountOpt)
1104 ctl.xcheck(err, "list tls public keys")
1105 ctl.xwriteok()
1106 xw := ctl.writer()
1107 fmt.Fprintf(xw, "# fingerprint, type, name, account, login address, no imap preauth (%d)\n", len(tlspubkeys))
1108 for _, k := range tlspubkeys {
1109 fmt.Fprintf(xw, "%s\t%s\t%q\t%s\t%s\t%v\n", k.Fingerprint, k.Type, k.Name, k.Account, k.LoginAddress, k.NoIMAPPreauth)
1110 }
1111 xw.xclose()
1112
1113 case "tlspubkeyget":
1114 /* protocol:
1115 > "tlspubkeyget"
1116 > fingerprint
1117 < "ok" or error
1118 < type
1119 < name
1120 < account
1121 < address
1122 < noimappreauth (true/false)
1123 < stream (certder)
1124 */
1125 fp := ctl.xread()
1126 tlspubkey, err := store.TLSPublicKeyGet(ctx, fp)
1127 ctl.xcheck(err, "looking tls public key")
1128 ctl.xwriteok()
1129 ctl.xwrite(tlspubkey.Type)
1130 ctl.xwrite(tlspubkey.Name)
1131 ctl.xwrite(tlspubkey.Account)
1132 ctl.xwrite(tlspubkey.LoginAddress)
1133 ctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth))
1134 ctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER))
1135
1136 case "tlspubkeyadd":
1137 /* protocol:
1138 > "tlspubkeyadd"
1139 > loginaddress
1140 > name (or empty)
1141 > noimappreauth (true/false)
1142 > stream (certder)
1143 < "ok" or error
1144 */
1145 loginAddress := ctl.xread()
1146 name := ctl.xread()
1147 noimappreauth := ctl.xread()
1148 if noimappreauth != "true" && noimappreauth != "false" {
1149 ctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth")
1150 }
1151 var b bytes.Buffer
1152 ctl.xstreamto(&b)
1153 tlspubkey, err := store.ParseTLSPublicKeyCert(b.Bytes())
1154 ctl.xcheck(err, "parsing certificate")
1155 if name != "" {
1156 tlspubkey.Name = name
1157 }
1158 acc, _, _, err := store.OpenEmail(ctl.log, loginAddress, false)
1159 ctl.xcheck(err, "open account for address")
1160 defer func() {
1161 err := acc.Close()
1162 ctl.log.Check(err, "close account")
1163 }()
1164 tlspubkey.Account = acc.Name
1165 tlspubkey.LoginAddress = loginAddress
1166 tlspubkey.NoIMAPPreauth = noimappreauth == "true"
1167
1168 err = store.TLSPublicKeyAdd(ctx, &tlspubkey)
1169 ctl.xcheck(err, "adding tls public key")
1170 ctl.xwriteok()
1171
1172 case "tlspubkeyrm":
1173 /* protocol:
1174 > "tlspubkeyadd"
1175 > fingerprint
1176 < "ok" or error
1177 */
1178 fp := ctl.xread()
1179 err := store.TLSPublicKeyRemove(ctx, fp)
1180 ctl.xcheck(err, "removing tls public key")
1181 ctl.xwriteok()
1182
1183 case "addressadd":
1184 /* protocol:
1185 > "addressadd"
1186 > address
1187 > account
1188 < "ok" or error
1189 */
1190 address := ctl.xread()
1191 account := ctl.xread()
1192 err := admin.AddressAdd(ctx, address, account)
1193 ctl.xcheck(err, "adding address")
1194 ctl.xwriteok()
1195
1196 case "addressrm":
1197 /* protocol:
1198 > "addressrm"
1199 > address
1200 < "ok" or error
1201 */
1202 address := ctl.xread()
1203 err := admin.AddressRemove(ctx, address)
1204 ctl.xcheck(err, "removing address")
1205 ctl.xwriteok()
1206
1207 case "aliaslist":
1208 /* protocol:
1209 > "aliaslist"
1210 > domain
1211 < "ok" or error
1212 < stream
1213 */
1214 domain := ctl.xread()
1215 d, err := dns.ParseDomain(domain)
1216 ctl.xcheck(err, "parsing domain")
1217 dc, ok := mox.Conf.Domain(d)
1218 if !ok {
1219 ctl.xcheck(errors.New("no such domain"), "listing aliases")
1220 }
1221 ctl.xwriteok()
1222 w := ctl.writer()
1223 for _, a := range dc.Aliases {
1224 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1225 ctl.xcheck(err, "parsing alias localpart")
1226 fmt.Fprintln(w, smtp.NewAddress(lp, a.Domain).Pack(true))
1227 }
1228 w.xclose()
1229
1230 case "aliasprint":
1231 /* protocol:
1232 > "aliasprint"
1233 > address
1234 < "ok" or error
1235 < stream
1236 */
1237 address := ctl.xread()
1238 _, alias, ok := mox.Conf.AccountDestination(address)
1239 if !ok {
1240 ctl.xcheck(errors.New("no such address"), "looking up alias")
1241 } else if alias == nil {
1242 ctl.xcheck(errors.New("address not an alias"), "looking up alias")
1243 }
1244 ctl.xwriteok()
1245 w := ctl.writer()
1246 fmt.Fprintf(w, "# postpublic %v\n", alias.PostPublic)
1247 fmt.Fprintf(w, "# listmembers %v\n", alias.ListMembers)
1248 fmt.Fprintf(w, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1249 fmt.Fprintln(w, "# members:")
1250 for _, a := range alias.Addresses {
1251 fmt.Fprintln(w, a)
1252 }
1253 w.xclose()
1254
1255 case "aliasadd":
1256 /* protocol:
1257 > "aliasadd"
1258 > address
1259 > json alias
1260 < "ok" or error
1261 */
1262 address := ctl.xread()
1263 line := ctl.xread()
1264 addr, err := smtp.ParseAddress(address)
1265 ctl.xcheck(err, "parsing address")
1266 var alias config.Alias
1267 xparseJSON(ctl, line, &alias)
1268 err = admin.AliasAdd(ctx, addr, alias)
1269 ctl.xcheck(err, "adding alias")
1270 ctl.xwriteok()
1271
1272 case "aliasupdate":
1273 /* protocol:
1274 > "aliasupdate"
1275 > alias
1276 > "true" or "false" for postpublic
1277 > "true" or "false" for listmembers
1278 > "true" or "false" for allowmsgfrom
1279 < "ok" or error
1280 */
1281 address := ctl.xread()
1282 postpublic := ctl.xread()
1283 listmembers := ctl.xread()
1284 allowmsgfrom := ctl.xread()
1285 addr, err := smtp.ParseAddress(address)
1286 ctl.xcheck(err, "parsing address")
1287 err = admin.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1288 a, ok := d.Aliases[addr.Localpart.String()]
1289 if !ok {
1290 return fmt.Errorf("alias does not exist")
1291 }
1292
1293 switch postpublic {
1294 case "false":
1295 a.PostPublic = false
1296 case "true":
1297 a.PostPublic = true
1298 }
1299 switch listmembers {
1300 case "false":
1301 a.ListMembers = false
1302 case "true":
1303 a.ListMembers = true
1304 }
1305 switch allowmsgfrom {
1306 case "false":
1307 a.AllowMsgFrom = false
1308 case "true":
1309 a.AllowMsgFrom = true
1310 }
1311
1312 d.Aliases = maps.Clone(d.Aliases)
1313 d.Aliases[addr.Localpart.String()] = a
1314 return nil
1315 })
1316 ctl.xcheck(err, "saving alias")
1317 ctl.xwriteok()
1318
1319 case "aliasrm":
1320 /* protocol:
1321 > "aliasrm"
1322 > alias
1323 < "ok" or error
1324 */
1325 address := ctl.xread()
1326 addr, err := smtp.ParseAddress(address)
1327 ctl.xcheck(err, "parsing address")
1328 err = admin.AliasRemove(ctx, addr)
1329 ctl.xcheck(err, "removing alias")
1330 ctl.xwriteok()
1331
1332 case "aliasaddaddr":
1333 /* protocol:
1334 > "aliasaddaddr"
1335 > alias
1336 > addresses as json
1337 < "ok" or error
1338 */
1339 address := ctl.xread()
1340 line := ctl.xread()
1341 addr, err := smtp.ParseAddress(address)
1342 ctl.xcheck(err, "parsing address")
1343 var addresses []string
1344 xparseJSON(ctl, line, &addresses)
1345 err = admin.AliasAddressesAdd(ctx, addr, addresses)
1346 ctl.xcheck(err, "adding addresses to alias")
1347 ctl.xwriteok()
1348
1349 case "aliasrmaddr":
1350 /* protocol:
1351 > "aliasrmaddr"
1352 > alias
1353 > addresses as json
1354 < "ok" or error
1355 */
1356 address := ctl.xread()
1357 line := ctl.xread()
1358 addr, err := smtp.ParseAddress(address)
1359 ctl.xcheck(err, "parsing address")
1360 var addresses []string
1361 xparseJSON(ctl, line, &addresses)
1362 err = admin.AliasAddressesRemove(ctx, addr, addresses)
1363 ctl.xcheck(err, "removing addresses to alias")
1364 ctl.xwriteok()
1365
1366 case "loglevels":
1367 /* protocol:
1368 > "loglevels"
1369 < "ok"
1370 < stream
1371 */
1372 ctl.xwriteok()
1373 l := mox.Conf.LogLevels()
1374 keys := []string{}
1375 for k := range l {
1376 keys = append(keys, k)
1377 }
1378 sort.Slice(keys, func(i, j int) bool {
1379 return keys[i] < keys[j]
1380 })
1381 s := ""
1382 for _, k := range keys {
1383 ks := k
1384 if ks == "" {
1385 ks = "(default)"
1386 }
1387 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1388 }
1389 ctl.xstreamfrom(strings.NewReader(s))
1390
1391 case "setloglevels":
1392 /* protocol:
1393 > "setloglevels"
1394 > pkg
1395 > level (if empty, log level for pkg will be unset)
1396 < "ok" or error
1397 */
1398 pkg := ctl.xread()
1399 levelstr := ctl.xread()
1400 if levelstr == "" {
1401 mox.Conf.LogLevelRemove(log, pkg)
1402 } else {
1403 level, ok := mlog.Levels[levelstr]
1404 if !ok {
1405 ctl.xerror("bad level")
1406 }
1407 mox.Conf.LogLevelSet(log, pkg, level)
1408 }
1409 ctl.xwriteok()
1410
1411 case "retrain":
1412 /* protocol:
1413 > "retrain"
1414 > account or empty
1415 < "ok" or error
1416 */
1417 account := ctl.xread()
1418
1419 xretrain := func(name string) {
1420 acc, err := store.OpenAccount(log, name, false)
1421 ctl.xcheck(err, "open account")
1422 defer func() {
1423 if acc != nil {
1424 err := acc.Close()
1425 log.Check(err, "closing account after retraining")
1426 }
1427 }()
1428
1429 // todo: can we retrain an account without holding a write lock? perhaps by writing a junkfilter to a new location, and staying informed of message changes while we go through all messages in the account?
1430
1431 acc.WithWLock(func() {
1432 conf, _ := acc.Conf()
1433 if conf.JunkFilter == nil {
1434 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1435 }
1436
1437 // Remove existing junk filter files.
1438 basePath := mox.DataDirPath("accounts")
1439 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1440 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1441 err := os.Remove(dbPath)
1442 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1443 err = os.Remove(bloomPath)
1444 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1445
1446 // Open junk filter, this creates new files.
1447 jf, _, err := acc.OpenJunkFilter(ctx, log)
1448 ctl.xcheck(err, "open new junk filter")
1449 defer func() {
1450 if jf == nil {
1451 return
1452 }
1453 err := jf.Close()
1454 log.Check(err, "closing junk filter during cleanup")
1455 }()
1456
1457 // Read through messages with junk or nonjunk flag set, and train them.
1458 var total, trained int
1459 q := bstore.QueryDB[store.Message](ctx, acc.DB)
1460 q.FilterEqual("Expunged", false)
1461 err = q.ForEach(func(m store.Message) error {
1462 total++
1463 ok, err := acc.TrainMessage(ctx, log, jf, m)
1464 if ok {
1465 trained++
1466 }
1467 return err
1468 })
1469 ctl.xcheck(err, "training messages")
1470 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1471
1472 // Close junk filter, marking success.
1473 err = jf.Close()
1474 jf = nil
1475 ctl.xcheck(err, "closing junk filter")
1476 })
1477 }
1478
1479 if account == "" {
1480 for _, name := range mox.Conf.Accounts() {
1481 xretrain(name)
1482 }
1483 } else {
1484 xretrain(account)
1485 }
1486 ctl.xwriteok()
1487
1488 case "recalculatemailboxcounts":
1489 /* protocol:
1490 > "recalculatemailboxcounts"
1491 > account
1492 < "ok" or error
1493 < stream
1494 */
1495 account := ctl.xread()
1496 acc, err := store.OpenAccount(log, account, false)
1497 ctl.xcheck(err, "open account")
1498 defer func() {
1499 if acc != nil {
1500 err := acc.Close()
1501 log.Check(err, "closing account after recalculating mailbox counts")
1502 }
1503 }()
1504 ctl.xwriteok()
1505
1506 w := ctl.writer()
1507
1508 acc.WithWLock(func() {
1509 var changes []store.Change
1510 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1511 var totalSize int64
1512 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1513 mc, err := mb.CalculateCounts(tx)
1514 if err != nil {
1515 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1516 }
1517 totalSize += mc.Size
1518
1519 if !mb.HaveCounts || mc != mb.MailboxCounts {
1520 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1521 ctl.xcheck(err, "write")
1522 mb.HaveCounts = true
1523 mb.MailboxCounts = mc
1524 if err := tx.Update(&mb); err != nil {
1525 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1526 }
1527 changes = append(changes, mb.ChangeCounts())
1528 }
1529 return nil
1530 })
1531 if err != nil {
1532 return err
1533 }
1534
1535 du := store.DiskUsage{ID: 1}
1536 if err := tx.Get(&du); err != nil {
1537 return fmt.Errorf("get disk usage: %v", err)
1538 }
1539 if du.MessageSize != totalSize {
1540 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1541 ctl.xcheck(err, "write")
1542 du.MessageSize = totalSize
1543 if err := tx.Update(&du); err != nil {
1544 return fmt.Errorf("update disk usage: %v", err)
1545 }
1546 }
1547 return nil
1548 })
1549 ctl.xcheck(err, "write transaction for mailbox counts")
1550
1551 store.BroadcastChanges(acc, changes)
1552 })
1553 w.xclose()
1554
1555 case "fixmsgsize":
1556 /* protocol:
1557 > "fixmsgsize"
1558 > account or empty
1559 < "ok" or error
1560 < stream
1561 */
1562
1563 accountOpt := ctl.xread()
1564 ctl.xwriteok()
1565 w := ctl.writer()
1566
1567 var foundProblem bool
1568 const batchSize = 10000
1569
1570 xfixmsgsize := func(accName string) {
1571 acc, err := store.OpenAccount(log, accName, false)
1572 ctl.xcheck(err, "open account")
1573 defer func() {
1574 err := acc.Close()
1575 log.Check(err, "closing account after fixing message sizes")
1576 }()
1577
1578 total := 0
1579 var lastID int64
1580 for {
1581 var n int
1582
1583 acc.WithRLock(func() {
1584 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1585
1586 // Don't process all message in one transaction, we could block the account for too long.
1587 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1588 q := bstore.QueryTx[store.Message](tx)
1589 q.FilterEqual("Expunged", false)
1590 q.FilterGreater("ID", lastID)
1591 q.Limit(batchSize)
1592 q.SortAsc("ID")
1593 return q.ForEach(func(m store.Message) error {
1594 lastID = m.ID
1595 n++
1596
1597 p := acc.MessagePath(m.ID)
1598 st, err := os.Stat(p)
1599 if err != nil {
1600 mb := store.Mailbox{ID: m.MailboxID}
1601 if xerr := tx.Get(&mb); xerr != nil {
1602 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1603 ctl.xcheck(werr, "write")
1604 }
1605 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1606 ctl.xcheck(werr, "write")
1607 return nil
1608 }
1609 filesize := st.Size()
1610 correctSize := int64(len(m.MsgPrefix)) + filesize
1611 if m.Size == correctSize {
1612 return nil
1613 }
1614
1615 foundProblem = true
1616
1617 mb := store.Mailbox{ID: m.MailboxID}
1618 if err := tx.Get(&mb); err != nil {
1619 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1620 ctl.xcheck(werr, "write")
1621 }
1622 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1623 ctl.xcheck(err, "write")
1624
1625 // We assume that the original message size was accounted as stored in the mailbox
1626 // total size. If this isn't correct, the user can always run
1627 // recalculatemailboxcounts.
1628 mb.Size -= m.Size
1629 mb.Size += correctSize
1630 if err := tx.Update(&mb); err != nil {
1631 return fmt.Errorf("update mailbox counts: %v", err)
1632 }
1633 mailboxCounts[mb.ID] = mb
1634
1635 m.Size = correctSize
1636
1637 mr := acc.MessageReader(m)
1638 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1639 if err != nil {
1640 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
1641 ctl.xcheck(werr, "write")
1642 }
1643 m.ParsedBuf, err = json.Marshal(part)
1644 if err != nil {
1645 return fmt.Errorf("marshal parsed message: %v", err)
1646 }
1647 total++
1648 if err := tx.Update(&m); err != nil {
1649 return fmt.Errorf("update message: %v", err)
1650 }
1651 return nil
1652 })
1653
1654 })
1655 ctl.xcheck(err, "find and fix wrong message sizes")
1656
1657 var changes []store.Change
1658 for _, mb := range mailboxCounts {
1659 changes = append(changes, mb.ChangeCounts())
1660 }
1661 store.BroadcastChanges(acc, changes)
1662 })
1663 if n < batchSize {
1664 break
1665 }
1666 }
1667 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
1668 ctl.xcheck(err, "write")
1669 }
1670
1671 if accountOpt != "" {
1672 xfixmsgsize(accountOpt)
1673 } else {
1674 for i, accName := range mox.Conf.Accounts() {
1675 var line string
1676 if i > 0 {
1677 line = "\n"
1678 }
1679 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
1680 ctl.xcheck(err, "write")
1681 xfixmsgsize(accName)
1682 }
1683 }
1684 if foundProblem {
1685 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1686 ctl.xcheck(err, "write")
1687 }
1688
1689 w.xclose()
1690
1691 case "reparse":
1692 /* protocol:
1693 > "reparse"
1694 > account or empty
1695 < "ok" or error
1696 < stream
1697 */
1698
1699 accountOpt := ctl.xread()
1700 ctl.xwriteok()
1701 w := ctl.writer()
1702
1703 const batchSize = 100
1704
1705 xreparseAccount := func(accName string) {
1706 acc, err := store.OpenAccount(log, accName, false)
1707 ctl.xcheck(err, "open account")
1708 defer func() {
1709 err := acc.Close()
1710 log.Check(err, "closing account after reparsing messages")
1711 }()
1712
1713 total := 0
1714 var lastID int64
1715 for {
1716 var n int
1717 // Don't process all message in one transaction, we could block the account for too long.
1718 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1719 q := bstore.QueryTx[store.Message](tx)
1720 q.FilterEqual("Expunged", false)
1721 q.FilterGreater("ID", lastID)
1722 q.Limit(batchSize)
1723 q.SortAsc("ID")
1724 return q.ForEach(func(m store.Message) error {
1725 lastID = m.ID
1726 mr := acc.MessageReader(m)
1727 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1728 if err != nil {
1729 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
1730 ctl.xcheck(err, "write")
1731 }
1732 m.ParsedBuf, err = json.Marshal(p)
1733 if err != nil {
1734 return fmt.Errorf("marshal parsed message: %v", err)
1735 }
1736 total++
1737 n++
1738 if err := tx.Update(&m); err != nil {
1739 return fmt.Errorf("update message: %v", err)
1740 }
1741 return nil
1742 })
1743
1744 })
1745 ctl.xcheck(err, "update messages with parsed mime structure")
1746 if n < batchSize {
1747 break
1748 }
1749 }
1750 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
1751 ctl.xcheck(err, "write")
1752 }
1753
1754 if accountOpt != "" {
1755 xreparseAccount(accountOpt)
1756 } else {
1757 for i, accName := range mox.Conf.Accounts() {
1758 var line string
1759 if i > 0 {
1760 line = "\n"
1761 }
1762 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
1763 ctl.xcheck(err, "write")
1764 xreparseAccount(accName)
1765 }
1766 }
1767 w.xclose()
1768
1769 case "reassignthreads":
1770 /* protocol:
1771 > "reassignthreads"
1772 > account or empty
1773 < "ok" or error
1774 < stream
1775 */
1776
1777 accountOpt := ctl.xread()
1778 ctl.xwriteok()
1779 w := ctl.writer()
1780
1781 xreassignThreads := func(accName string) {
1782 acc, err := store.OpenAccount(log, accName, false)
1783 ctl.xcheck(err, "open account")
1784 defer func() {
1785 err := acc.Close()
1786 log.Check(err, "closing account after reassigning threads")
1787 }()
1788
1789 // We don't want to step on an existing upgrade process.
1790 err = acc.ThreadingWait(log)
1791 ctl.xcheck(err, "waiting for threading upgrade to finish")
1792 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1793
1794 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1795 const batchSize = 50000
1796 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1797 ctl.xcheck(err, "resetting threading fields")
1798 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1799 ctl.xcheck(err, "write")
1800
1801 // Assign threads again. Ideally we would do this in a single transaction, but
1802 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1803 err = acc.AssignThreads(ctx, log, nil, 0, 50000, w)
1804 ctl.xcheck(err, "reassign threads")
1805
1806 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1807 ctl.xcheck(err, "write")
1808 }
1809
1810 if accountOpt != "" {
1811 xreassignThreads(accountOpt)
1812 } else {
1813 for i, accName := range mox.Conf.Accounts() {
1814 var line string
1815 if i > 0 {
1816 line = "\n"
1817 }
1818 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
1819 ctl.xcheck(err, "write")
1820 xreassignThreads(accName)
1821 }
1822 }
1823 w.xclose()
1824
1825 case "backup":
1826 backupctl(ctx, ctl)
1827
1828 case "imapserve":
1829 /* protocol:
1830 > "imapserve"
1831 > address
1832 < "ok or error"
1833 imap protocol
1834 */
1835 address := ctl.xread()
1836 ctl.xwriteok()
1837 imapserver.ServeConnPreauth("(imapserve)", cid, ctl.conn, address)
1838 ctl.log.Debug("imap connection finished")
1839
1840 default:
1841 log.Info("unrecognized command", slog.String("cmd", cmd))
1842 ctl.xwrite("unrecognized command")
1843 return
1844 }
1845}
1846