22 "github.com/mjl-/bstore"
24 "github.com/mjl-/mox/admin"
25 "github.com/mjl-/mox/config"
26 "github.com/mjl-/mox/dns"
27 "github.com/mjl-/mox/imapserver"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/queue"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/webapi"
39// ctl represents a connection to the ctl unix domain socket of a running mox instance.
40// ctl provides functions to read/write commands/responses/data streams.
42 cmd string // Set for server-side of commands.
44 r *bufio.Reader // Set for first reader.
45 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
46 log mlog.Log // If set, along with x, logging is done here.
49// xctl opens a ctl connection.
51 p := mox.DataDirPath("ctl")
52 conn, err := net.Dial("unix", p)
54 log.Fatalf("connecting to control socket at %q: %v", p, err)
56 ctl := &ctl{conn: conn}
57 version := ctl.xread()
58 if version != "ctlv0" {
59 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
64// Interpret msg as an error.
65// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
66func (c *ctl) xerror(msg string) {
70 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
75// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
76// ctl.x is set, the error string is written to ctl, to be interpreted as an error
77// by the command reading from ctl.
78func (c *ctl) xcheck(err error, msg string) {
83 log.Fatalf("%s: %s", msg, err)
85 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
86 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
90// Read a line and return it without trailing newline.
91func (c *ctl) xread() string {
93 c.r = bufio.NewReader(c.conn)
95 line, err := c.r.ReadString('\n')
96 c.xcheck(err, "read from ctl")
97 return strings.TrimSuffix(line, "\n")
100// Read a line. If not "ok", the string is interpreted as an error.
101func (c *ctl) xreadok() {
108// Write a string, typically a command or parameter.
109func (c *ctl) xwrite(text string) {
110 _, err := fmt.Fprintln(c.conn, text)
111 c.xcheck(err, "write")
114// Write "ok" to indicate success.
115func (c *ctl) xwriteok() {
119// Copy data from a stream from ctl to dst.
120func (c *ctl) xstreamto(dst io.Writer) {
121 _, err := io.Copy(dst, c.reader())
122 c.xcheck(err, "reading message")
125// Copy data from src to a stream to ctl.
126func (c *ctl) xstreamfrom(src io.Reader) {
128 _, err := io.Copy(xw, src)
129 c.xcheck(err, "copying")
133// Writer returns an io.Writer for a data stream to ctl.
134// When done writing, caller must call xclose to signal the end of the stream.
135// Behaviour of "x" is copied from ctl.
136func (c *ctl) writer() *ctlwriter {
137 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
140// Reader returns an io.Reader for a data stream from ctl.
141// Behaviour of "x" is copied from ctl.
142func (c *ctl) reader() *ctlreader {
144 c.r = bufio.NewReader(c.conn)
146 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
150Ctlwriter and ctlreader implement the writing and reading a data stream. They
151implement the io.Writer and io.Reader interface. In the protocol below each
152non-data message ends with a newline that is typically stripped when
155Zero or more data transactions:
157 > "123" (for data size) or an error message
159 < "ok" or an error message
161Followed by a end of stream indicated by zero data bytes message:
166type ctlwriter struct {
167 cmd string // Set for server-side of commands.
168 conn net.Conn // Ctl socket from which messages are read.
169 buf []byte // Scratch buffer, for reading response.
170 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
174// Write implements io.Writer. Errors other than EOF are handled through behaviour
175// for s.x, either a panic or log.Fatal.
176func (s *ctlwriter) Write(buf []byte) (int, error) {
177 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
178 s.xcheck(err, "write count")
179 _, err = s.conn.Write(buf)
180 s.xcheck(err, "write data")
182 s.buf = make([]byte, 512)
184 n, err := s.conn.Read(s.buf)
185 s.xcheck(err, "reading response to write")
186 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
193func (s *ctlwriter) xerror(msg string) {
197 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
202func (s *ctlwriter) xcheck(err error, msg string) {
207 log.Fatalf("%s: %s", msg, err)
209 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
214func (s *ctlwriter) xclose() {
215 _, err := fmt.Fprintf(s.conn, "0\n")
216 s.xcheck(err, "write eof")
219type ctlreader struct {
220 cmd string // Set for server-side of command.
221 conn net.Conn // For writing "ok" after reading.
222 r *bufio.Reader // Buffered ctl socket.
223 err error // If set, returned for each read. can also be io.EOF.
224 npending int // Number of bytes that can still be read until a new count line must be read.
225 x any // If set, errors are handled with panic(x) instead of log.Fatal.
226 log mlog.Log // If x is set, logging goes to log.
229// Read implements io.Reader. Errors other than EOF are handled through behaviour
230// for s.x, either a panic or log.Fatal.
231func (s *ctlreader) Read(buf []byte) (N int, Err error) {
236 line, err := s.r.ReadString('\n')
237 s.xcheck(err, "reading count")
238 line = strings.TrimSuffix(line, "\n")
239 n, err := strconv.ParseInt(line, 10, 32)
249 rn := min(len(buf), s.npending)
250 n, err := s.r.Read(buf[:rn])
251 s.xcheck(err, "read from ctl")
254 _, err = fmt.Fprintln(s.conn, "ok")
255 s.xcheck(err, "writing ok after reading")
260func (s *ctlreader) xerror(msg string) {
264 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
269func (s *ctlreader) xcheck(err error, msg string) {
274 log.Fatalf("%s: %s", msg, err)
276 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
281// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
282func servectl(ctx context.Context, cid int64, log mlog.Log, conn net.Conn, shutdown func()) {
283 log.Debug("ctl connection")
285 var stop = struct{}{} // Sentinel value for panic and recover.
286 xctl := &ctl{conn: conn, x: stop, log: log}
289 if x == nil || x == stop {
292 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", xctl.cmd))
294 metrics.PanicInc(metrics.Ctl)
299 log.Check(err, "close ctl connection")
304 servectlcmd(ctx, xctl, cid, shutdown)
308func xparseJSON(xctl *ctl, s string, v any) {
309 dec := json.NewDecoder(strings.NewReader(s))
310 dec.DisallowUnknownFields()
312 xctl.xcheck(err, "parsing from ctl as json")
315func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) {
319 log.Info("ctl command", slog.String("cmd", cmd))
326 /* The protocol, double quoted are literals.
336 a, _, addr, err := store.OpenEmail(log, to, false)
337 xctl.xcheck(err, "lookup destination address")
339 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
340 xctl.xcheck(err, "creating temporary message file")
341 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
342 mw := message.NewWriter(msgFile)
347 xctl.xcheck(err, "syncing message to storage")
350 Received: time.Now(),
355 err := a.DeliverDestination(log, addr, &m, msgFile)
356 xctl.xcheck(err, "delivering message")
357 log.Info("message delivered through ctl", slog.Any("to", to))
361 xctl.xcheck(err, "closing account")
364 case "setaccountpassword":
366 > "setaccountpassword"
372 account := xctl.xread()
375 acc, err := store.OpenAccount(log, account, false)
376 xctl.xcheck(err, "open account")
380 log.Check(err, "closing account after setting password")
384 err = acc.SetPassword(log, pw)
385 xctl.xcheck(err, "setting password")
387 xctl.xcheck(err, "closing account")
391 case "queueholdruleslist":
393 > "queueholdruleslist"
397 l, err := queue.HoldRuleList(ctx)
398 xctl.xcheck(err, "listing hold rules")
401 fmt.Fprintln(xw, "hold rules:")
402 for _, hr := range l {
404 if hr.Account != "" {
405 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
407 var zerodom dns.Domain
408 if hr.SenderDomain != zerodom {
409 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
411 if hr.RecipientDomain != zerodom {
412 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
415 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
417 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
421 fmt.Fprint(xw, "(none)\n")
425 case "queueholdrulesadd":
427 > "queueholdrulesadd"
433 var hr queue.HoldRule
434 hr.Account = xctl.xread()
435 senderdomstr := xctl.xread()
436 rcptdomstr := xctl.xread()
438 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
439 xctl.xcheck(err, "parsing sender domain")
440 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
441 xctl.xcheck(err, "parsing recipient domain")
442 hr, err = queue.HoldRuleAdd(ctx, log, hr)
443 xctl.xcheck(err, "add hold rule")
446 case "queueholdrulesremove":
448 > "queueholdrulesremove"
452 idstr := xctl.xread()
453 id, err := strconv.ParseInt(idstr, 10, 64)
454 xctl.xcheck(err, "parsing id")
455 err = queue.HoldRuleRemove(ctx, log, id)
456 xctl.xcheck(err, "remove hold rule")
467 filterline := xctl.xread()
468 sortline := xctl.xread()
470 xparseJSON(xctl, filterline, &f)
472 xparseJSON(xctl, sortline, &s)
473 qmsgs, err := queue.List(ctx, f, s)
474 xctl.xcheck(err, "listing queue")
478 fmt.Fprintln(xw, "messages:")
479 for _, qm := range qmsgs {
480 var lastAttempt string
481 if qm.LastAttempt != nil {
482 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
484 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
487 fmt.Fprint(xw, "(none)\n")
494 > queuefilters as json
500 filterline := xctl.xread()
501 hold := xctl.xread() == "true"
503 xparseJSON(xctl, filterline, &f)
504 count, err := queue.HoldSet(ctx, f, hold)
505 xctl.xcheck(err, "setting on hold status for messages")
507 xctl.xwrite(fmt.Sprintf("%d", count))
509 case "queueschedule":
512 > queuefilters as json
519 filterline := xctl.xread()
520 relnow := xctl.xread()
521 duration := xctl.xread()
523 xparseJSON(xctl, filterline, &f)
524 d, err := time.ParseDuration(duration)
525 xctl.xcheck(err, "parsing duration for next delivery attempt")
528 count, err = queue.NextAttemptAdd(ctx, f, d)
530 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
532 xctl.xcheck(err, "setting next delivery attempts in queue")
534 xctl.xwrite(fmt.Sprintf("%d", count))
536 case "queuetransport":
539 > queuefilters as json
545 filterline := xctl.xread()
546 transport := xctl.xread()
548 xparseJSON(xctl, filterline, &f)
549 count, err := queue.TransportSet(ctx, f, transport)
550 xctl.xcheck(err, "adding to next delivery attempts in queue")
552 xctl.xwrite(fmt.Sprintf("%d", count))
554 case "queuerequiretls":
557 > queuefilters as json
558 > reqtls (empty string, "true" or "false")
563 filterline := xctl.xread()
564 reqtls := xctl.xread()
575 xctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
578 xparseJSON(xctl, filterline, &f)
579 count, err := queue.RequireTLSSet(ctx, f, req)
580 xctl.xcheck(err, "setting tls requirements on messages in queue")
582 xctl.xwrite(fmt.Sprintf("%d", count))
587 > queuefilters as json
592 filterline := xctl.xread()
594 xparseJSON(xctl, filterline, &f)
595 count, err := queue.Fail(ctx, log, f)
596 xctl.xcheck(err, "marking messages from queue as failed")
598 xctl.xwrite(fmt.Sprintf("%d", count))
603 > queuefilters as json
608 filterline := xctl.xread()
610 xparseJSON(xctl, filterline, &f)
611 count, err := queue.Drop(ctx, log, f)
612 xctl.xcheck(err, "dropping messages from queue")
614 xctl.xwrite(fmt.Sprintf("%d", count))
624 idstr := xctl.xread()
625 id, err := strconv.ParseInt(idstr, 10, 64)
627 xctl.xcheck(err, "parsing id")
629 mr, err := queue.OpenMessage(ctx, id)
630 xctl.xcheck(err, "opening message")
633 log.Check(err, "closing message from queue")
638 case "queueretiredlist":
646 filterline := xctl.xread()
647 sortline := xctl.xread()
648 var f queue.RetiredFilter
649 xparseJSON(xctl, filterline, &f)
650 var s queue.RetiredSort
651 xparseJSON(xctl, sortline, &s)
652 qmsgs, err := queue.RetiredList(ctx, f, s)
653 xctl.xcheck(err, "listing retired queue")
657 fmt.Fprintln(xw, "retired messages:")
658 for _, qm := range qmsgs {
659 var lastAttempt string
660 if qm.LastAttempt != nil {
661 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
667 sender, err := qm.Sender()
668 xcheckf(err, "parsing sender")
669 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
672 fmt.Fprint(xw, "(none)\n")
676 case "queueretiredprint":
678 > "queueretiredprint"
683 idstr := xctl.xread()
684 id, err := strconv.ParseInt(idstr, 10, 64)
686 xctl.xcheck(err, "parsing id")
688 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
689 xctl.xcheck(err, "getting retired messages")
691 xctl.xcheck(errors.New("not found"), "getting retired message")
696 enc := json.NewEncoder(xw)
697 enc.SetIndent("", "\t")
699 xctl.xcheck(err, "encode retired message")
702 case "queuehooklist":
710 filterline := xctl.xread()
711 sortline := xctl.xread()
712 var f queue.HookFilter
713 xparseJSON(xctl, filterline, &f)
715 xparseJSON(xctl, sortline, &s)
716 hooks, err := queue.HookList(ctx, f, s)
717 xctl.xcheck(err, "listing webhooks")
721 fmt.Fprintln(xw, "webhooks:")
722 for _, h := range hooks {
723 var lastAttempt string
724 if len(h.Results) > 0 {
725 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
727 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
730 fmt.Fprint(xw, "(none)\n")
734 case "queuehookschedule":
736 > "queuehookschedule"
737 > hookfilters as json
744 filterline := xctl.xread()
745 relnow := xctl.xread()
746 duration := xctl.xread()
747 var f queue.HookFilter
748 xparseJSON(xctl, filterline, &f)
749 d, err := time.ParseDuration(duration)
750 xctl.xcheck(err, "parsing duration for next delivery attempt")
753 count, err = queue.HookNextAttemptAdd(ctx, f, d)
755 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
757 xctl.xcheck(err, "setting next delivery attempts in queue")
759 xctl.xwrite(fmt.Sprintf("%d", count))
761 case "queuehookcancel":
764 > hookfilters as json
769 filterline := xctl.xread()
770 var f queue.HookFilter
771 xparseJSON(xctl, filterline, &f)
772 count, err := queue.HookCancel(ctx, log, f)
773 xctl.xcheck(err, "canceling webhooks in queue")
775 xctl.xwrite(fmt.Sprintf("%d", count))
777 case "queuehookprint":
784 idstr := xctl.xread()
785 id, err := strconv.ParseInt(idstr, 10, 64)
787 xctl.xcheck(err, "parsing id")
789 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
790 xctl.xcheck(err, "getting webhooks")
792 xctl.xcheck(errors.New("not found"), "getting webhook")
797 enc := json.NewEncoder(xw)
798 enc.SetIndent("", "\t")
800 xctl.xcheck(err, "encode webhook")
803 case "queuehookretiredlist":
805 > "queuehookretiredlist"
811 filterline := xctl.xread()
812 sortline := xctl.xread()
813 var f queue.HookRetiredFilter
814 xparseJSON(xctl, filterline, &f)
815 var s queue.HookRetiredSort
816 xparseJSON(xctl, sortline, &s)
817 l, err := queue.HookRetiredList(ctx, f, s)
818 xctl.xcheck(err, "listing retired webhooks")
822 fmt.Fprintln(xw, "retired webhooks:")
823 for _, h := range l {
824 var lastAttempt string
825 if len(h.Results) > 0 {
826 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
832 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
835 fmt.Fprint(xw, "(none)\n")
839 case "queuehookretiredprint":
841 > "queuehookretiredprint"
846 idstr := xctl.xread()
847 id, err := strconv.ParseInt(idstr, 10, 64)
849 xctl.xcheck(err, "parsing id")
851 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
852 xctl.xcheck(err, "getting retired webhooks")
854 xctl.xcheck(errors.New("not found"), "getting retired webhook")
859 enc := json.NewEncoder(xw)
860 enc.SetIndent("", "\t")
862 xctl.xcheck(err, "encode retired webhook")
865 case "queuesuppresslist":
867 > "queuesuppresslist"
873 account := xctl.xread()
874 l, err := queue.SuppressionList(ctx, account)
875 xctl.xcheck(err, "listing suppressions")
878 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
879 for _, sup := range l {
884 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
887 fmt.Fprintln(xw, "(none)")
891 case "queuesuppressadd":
899 account := xctl.xread()
900 address := xctl.xread()
901 _, ok := mox.Conf.Account(account)
903 xctl.xcheck(errors.New("unknown account"), "looking up account")
905 addr, err := smtp.ParseAddress(address)
906 xctl.xcheck(err, "parsing address")
907 sup := webapi.Suppression{
910 Reason: "added through mox cli",
912 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
913 xctl.xcheck(err, "adding suppression")
916 case "queuesuppressremove":
918 > "queuesuppressremove"
924 account := xctl.xread()
925 address := xctl.xread()
926 addr, err := smtp.ParseAddress(address)
927 xctl.xcheck(err, "parsing address")
928 err = queue.SuppressionRemove(ctx, account, addr.Path())
929 xctl.xcheck(err, "removing suppression")
932 case "queuesuppresslookup":
934 > "queuesuppresslookup"
941 account := xctl.xread()
942 address := xctl.xread()
944 _, ok := mox.Conf.Account(account)
946 xctl.xcheck(errors.New("unknown account"), "looking up account")
949 addr, err := smtp.ParseAddress(address)
950 xctl.xcheck(err, "parsing address")
951 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
952 xctl.xcheck(err, "looking up suppression")
956 fmt.Fprintln(xw, "not present")
962 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
966 case "importmaildir", "importmbox":
967 mbox := cmd == "importmbox"
968 ximportctl(ctx, xctl, mbox)
973 > disabled as "true" or "false"
980 switch s := xctl.xread(); s {
986 xctl.xcheck(fmt.Errorf("invalid value %q", s), "parsing disabled boolean")
989 domain := xctl.xread()
990 account := xctl.xread()
991 localpart := xctl.xread()
992 d, err := dns.ParseDomain(domain)
993 xctl.xcheck(err, "parsing domain")
994 err = admin.DomainAdd(ctx, disabled, d, account, smtp.Localpart(localpart))
995 xctl.xcheck(err, "adding domain")
1004 domain := xctl.xread()
1005 d, err := dns.ParseDomain(domain)
1006 xctl.xcheck(err, "parsing domain")
1007 err = admin.DomainRemove(ctx, d)
1008 xctl.xcheck(err, "removing domain")
1011 case "domaindisabled":
1018 domain := xctl.xread()
1020 switch s := xctl.xread(); s {
1026 xctl.xerror("bad boolean value")
1028 err := admin.DomainSave(ctx, domain, func(d *config.Domain) error {
1029 d.Disabled = disabled
1032 xctl.xcheck(err, "saving domain")
1042 account := xctl.xread()
1043 address := xctl.xread()
1044 err := admin.AccountAdd(ctx, account, address)
1045 xctl.xcheck(err, "adding account")
1054 account := xctl.xread()
1055 err := admin.AccountRemove(ctx, account)
1056 xctl.xcheck(err, "removing account")
1059 case "accountdisabled":
1063 > message (if empty, then enabled)
1066 account := xctl.xread()
1067 message := xctl.xread()
1069 acc, err := store.OpenAccount(log, account, false)
1070 xctl.xcheck(err, "open account")
1073 log.Check(err, "closing account")
1076 err = admin.AccountSave(ctx, account, func(acc *config.Account) {
1077 acc.LoginDisabled = message
1079 xctl.xcheck(err, "saving account")
1081 err = acc.SessionsClear(ctx, xctl.log)
1082 xctl.xcheck(err, "clearing active web sessions")
1086 case "accountenable":
1092 account := xctl.xread()
1093 err := admin.AccountSave(ctx, account, func(acc *config.Account) {
1094 acc.LoginDisabled = ""
1096 xctl.xcheck(err, "enabling account")
1099 case "tlspubkeylist":
1102 > account (or empty)
1106 accountOpt := xctl.xread()
1107 tlspubkeys, err := store.TLSPublicKeyList(ctx, accountOpt)
1108 xctl.xcheck(err, "list tls public keys")
1111 fmt.Fprintf(xw, "# fingerprint, type, name, account, login address, no imap preauth (%d)\n", len(tlspubkeys))
1112 for _, k := range tlspubkeys {
1113 fmt.Fprintf(xw, "%s\t%s\t%q\t%s\t%s\t%v\n", k.Fingerprint, k.Type, k.Name, k.Account, k.LoginAddress, k.NoIMAPPreauth)
1117 case "tlspubkeyget":
1126 < noimappreauth (true/false)
1130 tlspubkey, err := store.TLSPublicKeyGet(ctx, fp)
1131 xctl.xcheck(err, "looking tls public key")
1133 xctl.xwrite(tlspubkey.Type)
1134 xctl.xwrite(tlspubkey.Name)
1135 xctl.xwrite(tlspubkey.Account)
1136 xctl.xwrite(tlspubkey.LoginAddress)
1137 xctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth))
1138 xctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER))
1140 case "tlspubkeyadd":
1145 > noimappreauth (true/false)
1149 loginAddress := xctl.xread()
1150 name := xctl.xread()
1151 noimappreauth := xctl.xread()
1152 if noimappreauth != "true" && noimappreauth != "false" {
1153 xctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth")
1157 tlspubkey, err := store.ParseTLSPublicKeyCert(b.Bytes())
1158 xctl.xcheck(err, "parsing certificate")
1160 tlspubkey.Name = name
1162 acc, _, _, err := store.OpenEmail(xctl.log, loginAddress, false)
1163 xctl.xcheck(err, "open account for address")
1166 xctl.log.Check(err, "close account")
1168 tlspubkey.Account = acc.Name
1169 tlspubkey.LoginAddress = loginAddress
1170 tlspubkey.NoIMAPPreauth = noimappreauth == "true"
1172 err = store.TLSPublicKeyAdd(ctx, &tlspubkey)
1173 xctl.xcheck(err, "adding tls public key")
1183 err := store.TLSPublicKeyRemove(ctx, fp)
1184 xctl.xcheck(err, "removing tls public key")
1194 address := xctl.xread()
1195 account := xctl.xread()
1196 err := admin.AddressAdd(ctx, address, account)
1197 xctl.xcheck(err, "adding address")
1206 address := xctl.xread()
1207 err := admin.AddressRemove(ctx, address)
1208 xctl.xcheck(err, "removing address")
1218 domain := xctl.xread()
1219 d, err := dns.ParseDomain(domain)
1220 xctl.xcheck(err, "parsing domain")
1221 dc, ok := mox.Conf.Domain(d)
1223 xctl.xcheck(errors.New("no such domain"), "listing aliases")
1227 for _, a := range dc.Aliases {
1228 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1229 xctl.xcheck(err, "parsing alias localpart")
1230 fmt.Fprintln(xw, smtp.NewAddress(lp, a.Domain).Pack(true))
1241 address := xctl.xread()
1242 _, alias, ok := mox.Conf.AccountDestination(address)
1244 xctl.xcheck(errors.New("no such address"), "looking up alias")
1245 } else if alias == nil {
1246 xctl.xcheck(errors.New("address not an alias"), "looking up alias")
1250 fmt.Fprintf(xw, "# postpublic %v\n", alias.PostPublic)
1251 fmt.Fprintf(xw, "# listmembers %v\n", alias.ListMembers)
1252 fmt.Fprintf(xw, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1253 fmt.Fprintln(xw, "# members:")
1254 for _, a := range alias.Addresses {
1266 address := xctl.xread()
1267 line := xctl.xread()
1268 addr, err := smtp.ParseAddress(address)
1269 xctl.xcheck(err, "parsing address")
1270 var alias config.Alias
1271 xparseJSON(xctl, line, &alias)
1272 err = admin.AliasAdd(ctx, addr, alias)
1273 xctl.xcheck(err, "adding alias")
1280 > "true" or "false" for postpublic
1281 > "true" or "false" for listmembers
1282 > "true" or "false" for allowmsgfrom
1285 address := xctl.xread()
1286 postpublic := xctl.xread()
1287 listmembers := xctl.xread()
1288 allowmsgfrom := xctl.xread()
1289 addr, err := smtp.ParseAddress(address)
1290 xctl.xcheck(err, "parsing address")
1291 err = admin.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1292 a, ok := d.Aliases[addr.Localpart.String()]
1294 return fmt.Errorf("alias does not exist")
1299 a.PostPublic = false
1303 switch listmembers {
1305 a.ListMembers = false
1307 a.ListMembers = true
1309 switch allowmsgfrom {
1311 a.AllowMsgFrom = false
1313 a.AllowMsgFrom = true
1316 d.Aliases = maps.Clone(d.Aliases)
1317 d.Aliases[addr.Localpart.String()] = a
1320 xctl.xcheck(err, "saving alias")
1329 address := xctl.xread()
1330 addr, err := smtp.ParseAddress(address)
1331 xctl.xcheck(err, "parsing address")
1332 err = admin.AliasRemove(ctx, addr)
1333 xctl.xcheck(err, "removing alias")
1336 case "aliasaddaddr":
1343 address := xctl.xread()
1344 line := xctl.xread()
1345 addr, err := smtp.ParseAddress(address)
1346 xctl.xcheck(err, "parsing address")
1347 var addresses []string
1348 xparseJSON(xctl, line, &addresses)
1349 err = admin.AliasAddressesAdd(ctx, addr, addresses)
1350 xctl.xcheck(err, "adding addresses to alias")
1360 address := xctl.xread()
1361 line := xctl.xread()
1362 addr, err := smtp.ParseAddress(address)
1363 xctl.xcheck(err, "parsing address")
1364 var addresses []string
1365 xparseJSON(xctl, line, &addresses)
1366 err = admin.AliasAddressesRemove(ctx, addr, addresses)
1367 xctl.xcheck(err, "removing addresses to alias")
1377 l := mox.Conf.LogLevels()
1380 keys = append(keys, k)
1384 for _, k := range keys {
1389 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1391 xctl.xstreamfrom(strings.NewReader(s))
1393 case "setloglevels":
1397 > level (if empty, log level for pkg will be unset)
1401 levelstr := xctl.xread()
1403 mox.Conf.LogLevelRemove(log, pkg)
1405 level, ok := mlog.Levels[levelstr]
1407 xctl.xerror("bad level")
1409 mox.Conf.LogLevelSet(log, pkg, level)
1419 account := xctl.xread()
1421 xretrain := func(name string) {
1422 acc, err := store.OpenAccount(log, name, false)
1423 xctl.xcheck(err, "open account")
1427 log.Check(err, "closing account after retraining")
1431 // todo: can we retrain an account without holding a write lock? perhaps by writing a junkfilter to a new location, and staying informed of message changes while we go through all messages in the account?
1433 acc.WithWLock(func() {
1434 conf, _ := acc.Conf()
1435 if conf.JunkFilter == nil {
1436 xctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1439 // Remove existing junk filter files.
1440 basePath := mox.DataDirPath("accounts")
1441 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1442 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1443 err := os.Remove(dbPath)
1444 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1445 err = os.Remove(bloomPath)
1446 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1448 // Open junk filter, this creates new files.
1449 jf, _, err := acc.OpenJunkFilter(ctx, log)
1450 xctl.xcheck(err, "open new junk filter")
1455 err := jf.CloseDiscard()
1456 log.Check(err, "closing junk filter during cleanup")
1459 // Read through messages with either junk or nonjunk flag set, and train them.
1460 var total, trained int
1461 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1462 q := bstore.QueryTx[store.Message](tx)
1463 q.FilterEqual("Expunged", false)
1464 return q.ForEach(func(m store.Message) error {
1466 if m.Junk == m.Notjunk {
1469 ok, err := acc.TrainMessage(ctx, log, jf, m.Notjunk, m)
1473 if m.TrainedJunk == nil || *m.TrainedJunk != m.Junk {
1474 m.TrainedJunk = &m.Junk
1475 if err := tx.Update(&m); err != nil {
1476 return fmt.Errorf("marking message as trained: %v", err)
1482 xctl.xcheck(err, "training messages")
1483 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1485 // Close junk filter, marking success.
1488 xctl.xcheck(err, "closing junk filter")
1493 for _, name := range mox.Conf.Accounts() {
1501 case "recalculatemailboxcounts":
1503 > "recalculatemailboxcounts"
1508 account := xctl.xread()
1509 acc, err := store.OpenAccount(log, account, false)
1510 xctl.xcheck(err, "open account")
1514 log.Check(err, "closing account after recalculating mailbox counts")
1521 acc.WithWLock(func() {
1522 var changes []store.Change
1523 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1525 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1526 mc, err := mb.CalculateCounts(tx)
1528 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1530 totalSize += mc.Size
1532 if mc != mb.MailboxCounts {
1533 fmt.Fprintf(xw, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1534 mb.HaveCounts = true
1535 mb.MailboxCounts = mc
1536 if err := tx.Update(&mb); err != nil {
1537 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1539 changes = append(changes, mb.ChangeCounts())
1547 du := store.DiskUsage{ID: 1}
1548 if err := tx.Get(&du); err != nil {
1549 return fmt.Errorf("get disk usage: %v", err)
1551 if du.MessageSize != totalSize {
1552 fmt.Fprintf(xw, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1553 du.MessageSize = totalSize
1554 if err := tx.Update(&du); err != nil {
1555 return fmt.Errorf("update disk usage: %v", err)
1560 xctl.xcheck(err, "write transaction for mailbox counts")
1562 store.BroadcastChanges(acc, changes)
1574 accountOpt := xctl.xread()
1578 var foundProblem bool
1579 const batchSize = 10000
1581 xfixmsgsize := func(accName string) {
1582 acc, err := store.OpenAccount(log, accName, false)
1583 xctl.xcheck(err, "open account")
1586 log.Check(err, "closing account after fixing message sizes")
1594 acc.WithRLock(func() {
1595 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1597 // Don't process all message in one transaction, we could block the account for too long.
1598 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1599 q := bstore.QueryTx[store.Message](tx)
1600 q.FilterEqual("Expunged", false)
1601 q.FilterGreater("ID", lastID)
1604 return q.ForEach(func(m store.Message) error {
1608 p := acc.MessagePath(m.ID)
1609 st, err := os.Stat(p)
1611 mb := store.Mailbox{ID: m.MailboxID}
1612 if xerr := tx.Get(&mb); xerr != nil {
1613 fmt.Fprintf(xw, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1615 fmt.Fprintf(xw, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1618 filesize := st.Size()
1619 correctSize := int64(len(m.MsgPrefix)) + filesize
1620 if m.Size == correctSize {
1626 mb := store.Mailbox{ID: m.MailboxID}
1627 if err := tx.Get(&mb); err != nil {
1628 fmt.Fprintf(xw, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1632 fmt.Fprintf(xw, "message %d is in expunged mailbox %q (id %d) (continuing)\n", m.ID, mb.Name, mb.ID)
1634 fmt.Fprintf(xw, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1636 // We assume that the original message size was accounted as stored in the mailbox
1637 // total size. If this isn't correct, the user can always run
1638 // recalculatemailboxcounts.
1640 mb.Size += correctSize
1641 if err := tx.Update(&mb); err != nil {
1642 return fmt.Errorf("update mailbox counts: %v", err)
1644 mailboxCounts[mb.ID] = mb
1646 m.Size = correctSize
1648 mr := acc.MessageReader(m)
1649 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1651 fmt.Fprintf(xw, "parsing message %d again: %v (continuing)\n", m.ID, err)
1653 m.ParsedBuf, err = json.Marshal(part)
1655 return fmt.Errorf("marshal parsed message: %v", err)
1658 if err := tx.Update(&m); err != nil {
1659 return fmt.Errorf("update message: %v", err)
1664 xctl.xcheck(err, "find and fix wrong message sizes")
1666 var changes []store.Change
1667 for _, mb := range mailboxCounts {
1668 changes = append(changes, mb.ChangeCounts())
1670 store.BroadcastChanges(acc, changes)
1676 fmt.Fprintf(xw, "%d message size(s) fixed for account %s\n", total, accName)
1679 if accountOpt != "" {
1680 xfixmsgsize(accountOpt)
1682 for i, accName := range mox.Conf.Accounts() {
1687 fmt.Fprintf(xw, "%sFixing message sizes in account %s...\n", line, accName)
1688 xfixmsgsize(accName)
1692 fmt.Fprintf(xw, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1705 accountOpt := xctl.xread()
1709 const batchSize = 100
1711 xreparseAccount := func(accName string) {
1712 acc, err := store.OpenAccount(log, accName, false)
1713 xctl.xcheck(err, "open account")
1716 log.Check(err, "closing account after reparsing messages")
1723 // Don't process all message in one transaction, we could block the account for too long.
1724 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1725 q := bstore.QueryTx[store.Message](tx)
1726 q.FilterEqual("Expunged", false)
1727 q.FilterGreater("ID", lastID)
1730 return q.ForEach(func(m store.Message) error {
1732 mr := acc.MessageReader(m)
1733 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1735 fmt.Fprintf(xw, "parsing message %d: %v (continuing)\n", m.ID, err)
1737 m.ParsedBuf, err = json.Marshal(p)
1739 return fmt.Errorf("marshal parsed message: %v", err)
1743 if err := tx.Update(&m); err != nil {
1744 return fmt.Errorf("update message: %v", err)
1750 xctl.xcheck(err, "update messages with parsed mime structure")
1755 fmt.Fprintf(xw, "%d message(s) reparsed for account %s\n", total, accName)
1758 if accountOpt != "" {
1759 xreparseAccount(accountOpt)
1761 for i, accName := range mox.Conf.Accounts() {
1766 fmt.Fprintf(xw, "%sReparsing account %s...\n", line, accName)
1767 xreparseAccount(accName)
1772 case "reassignthreads":
1780 accountOpt := xctl.xread()
1784 xreassignThreads := func(accName string) {
1785 acc, err := store.OpenAccount(log, accName, false)
1786 xctl.xcheck(err, "open account")
1789 log.Check(err, "closing account after reassigning threads")
1792 // We don't want to step on an existing upgrade process.
1793 err = acc.ThreadingWait(log)
1794 xctl.xcheck(err, "waiting for threading upgrade to finish")
1795 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1797 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1798 const batchSize = 50000
1799 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1800 xctl.xcheck(err, "resetting threading fields")
1801 fmt.Fprintf(xw, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1803 // Assign threads again. Ideally we would do this in a single transaction, but
1804 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1805 err = acc.AssignThreads(ctx, log, nil, 0, 50000, xw)
1806 xctl.xcheck(err, "reassign threads")
1808 fmt.Fprintf(xw, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1811 if accountOpt != "" {
1812 xreassignThreads(accountOpt)
1814 for i, accName := range mox.Conf.Accounts() {
1819 fmt.Fprintf(xw, "%sReassigning threads for account %s...\n", line, accName)
1820 xreassignThreads(accName)
1826 xbackupctl(ctx, xctl)
1835 address := xctl.xread()
1837 imapserver.ServeConnPreauth("(imapserve)", cid, xctl.conn, address)
1838 xctl.log.Debug("imap connection finished")
1841 log.Info("unrecognized command", slog.String("cmd", cmd))
1842 xctl.xwrite("unrecognized command")