3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
43 SSEID int64 // SSE connection.
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
49 // If set, this request and its view are canceled. A new view must be started.
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
64// Query is a request for messages that match filters, in a given order.
66 OrderAsc bool // Order by received ascending or desending.
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
101 Words []string // Case insensitive substring match for each string.
103 To []string // Including Cc and Bcc.
107 Attachments AttachmentType
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
120 Attachments AttachmentType
124// Page holds pagination parameters for a request.
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
180// ParsedMessage has more parsed/derived information about a message, intended
181// for rendering the (contents of the) message. Information from MessageItem is
183type ParsedMessage struct {
186 Headers map[string][]string
187 ViewMode store.ViewMode
189 // Text parts, can be empty.
192 // Whether there is an HTML part. The webclient renders HTML message parts through
193 // an iframe and a separate request with strict CSP headers to prevent script
194 // execution and loading of external resources, which isn't possible when loading
195 // in iframe with inline HTML because not all browsers support the iframe csp
199 ListReplyAddress *MessageAddress // From List-Post.
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
209// EventStart is the first message sent on an SSE connection, giving the client
210// basic data to populate its UI. After this event, messages will follow quickly in
211// an EventViewMsgs event.
212type EventStart struct {
214 LoginAddress MessageAddress
215 Addresses []MessageAddress
216 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
218 Mailboxes []store.Mailbox
219 RejectsMailbox string
220 Settings store.Settings
221 AccountPath string // If nonempty, the path on same host to webaccount interface.
225// DomainAddressConfig has the address (localpart) configuration for a domain, so
226// the webmail client can decide if an address matches the addresses of the
228type DomainAddressConfig struct {
229 LocalpartCatchallSeparator string // Can be empty.
230 LocalpartCaseSensitive bool
233// EventViewMsgs contains messages for a view, possibly a continuation of an
234// earlier list of messages.
235type EventViewMsgs struct {
239 // If empty, this was the last message for the request. If non-empty, a list of
240 // thread messages. Each with the first message being the reason this thread is
241 // included and can be used as AnchorID in followup requests. If the threading mode
242 // is "off" in the query, there will always be only a single message. If a thread
243 // is sent, all messages in the thread are sent, including those that don't match
244 // the query (e.g. from another mailbox). Threads can be displayed based on the
245 // ThreadParentIDs field, with possibly slightly different display based on field
246 // ThreadMissingLink.
247 MessageItems [][]MessageItem
249 // If set, will match the target page.DestMessageID from the request.
250 ParsedMessage *ParsedMessage
252 // If set, there are no more messages in this view at this moment. Messages can be
253 // added, typically via Change messages, e.g. for new deliveries.
257// EventViewErr indicates an error during a query for messages. The request is
258// aborted, no more request-related messages will be sent until the next request.
259type EventViewErr struct {
262 Err string // To be displayed in client.
263 err error // Original message, for checking against context.Canceled.
266// EventViewReset indicates that a request for the next set of messages in a few
267// could not be fulfilled, e.g. because the anchor message does not exist anymore.
268// The client should clear its list of messages. This can happen before
269// EventViewMsgs events are sent.
270type EventViewReset struct {
275// EventViewChanges contain one or more changes relevant for the client, either
276// with new mailbox total/unseen message counts, or messages added/removed/modified
277// (flags) for the current view.
278type EventViewChanges struct {
280 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
283// ChangeMsgAdd adds a new message and possibly its thread to the view.
284type ChangeMsgAdd struct {
286 MessageItems []MessageItem
289// ChangeMsgRemove removes one or more messages from the view.
290type ChangeMsgRemove struct {
291 store.ChangeRemoveUIDs
294// ChangeMsgFlags updates flags for one message.
295type ChangeMsgFlags struct {
299// ChangeMsgThread updates muted/collapsed fields for one message.
300type ChangeMsgThread struct {
304// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
305type ChangeMailboxRemove struct {
306 store.ChangeRemoveMailbox
309// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
310type ChangeMailboxAdd struct {
311 Mailbox store.Mailbox
314// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
315// It could be under a new parent.
316type ChangeMailboxRename struct {
317 store.ChangeRenameMailbox
320// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
321type ChangeMailboxCounts struct {
322 store.ChangeMailboxCounts
325// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
326type ChangeMailboxSpecialUse struct {
327 store.ChangeMailboxSpecialUse
330// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
331// a message was added with a keyword that wasn't in the mailbox yet.
332type ChangeMailboxKeywords struct {
333 store.ChangeMailboxKeywords
336// View holds the information about the returned data for a query. It is used to
337// determine whether mailbox changes should be sent to the client, we only send
338// addition/removal/flag-changes of messages that are in view, or would extend it
339// if the view is at the end of the results.
343 // Received of last message we sent to the client. We use it to decide if a newly
344 // delivered message is within the view and the client should get a notification.
345 LastMessageReceived time.Time
347 // If set, the last message in the query view has been sent. There is no need to do
348 // another query, it will not return more data. Used to decide if an event for a
349 // new message should be sent.
352 // Whether message must or must not match mailboxIDs.
354 // Mailboxes to match, can be multiple, for matching children. If empty, there is
355 // no filter on mailboxes.
356 mailboxIDs map[int64]bool
358 // Threads sent to client. New messages for this thread are also sent, regardless
359 // of regular query matching, so also for other mailboxes. If the user (re)moved
360 // all messages of a thread, they may still receive events for the thread. Only
361 // filled when query with threading not off.
362 threadIDs map[int64]struct{}
365// sses tracks all sse connections, and access to them.
372// sse represents an sse connection.
374 ID int64 // Also returned in EventStart and used in Request to identify the request.
375 AccountName string // Used to check the authenticated user has access to the SSE connection.
376 Request chan Request // Goroutine will receive requests from here, coming from API calls.
379// called by the goroutine when the connection is closed or breaks.
380func (sse sse) unregister() {
383 delete(sses.m, sse.ID)
385 // Drain any pending requests, preventing blocked goroutines from API calls.
395func sseRegister(accountName string) sse {
399 v := sse{sses.gen, accountName, make(chan Request, 1)}
404// sseGet returns a reference to an existing connection if it exists and user
406func sseGet(id int64, accountName string) (sse, bool) {
410 if s.AccountName != accountName {
416// ssetoken is a temporary token that has not yet been used to start an SSE
417// connection. Created by Token, consumed by a new SSE connection.
418type ssetoken struct {
419 token string // Uniquely generated.
421 address string // Address used to authenticate in call that created the token.
422 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
426// ssetokens maintains unused tokens. We have just one, but it's a type so we
427// can define methods.
428type ssetokens struct {
430 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
431 tokens map[string]ssetoken // Token to details, for finding account for a token.
434var sseTokens = ssetokens{
435 accountTokens: map[string][]ssetoken{},
436 tokens: map[string]ssetoken{},
439// xgenerate creates and saves a new token. It ensures no more than 10 tokens
440// per account exist, removing old ones if needed.
441func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
442 buf := make([]byte, 16)
443 _, err := cryptrand.Read(buf)
444 xcheckf(ctx, err, "generating token")
445 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
449 n := len(x.accountTokens[accName])
451 for _, ost := range x.accountTokens[accName][:n-9] {
452 delete(x.tokens, ost.token)
454 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
455 x.accountTokens[accName] = x.accountTokens[accName][:9]
457 x.accountTokens[accName] = append(x.accountTokens[accName], st)
458 x.tokens[st.token] = st
462// check verifies a token, and consumes it if valid.
463func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
467 st, ok := x.tokens[token]
469 return "", "", "", false, nil
471 delete(x.tokens, token)
472 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
473 return "", "", "", false, errors.New("internal error, could not find token in account")
475 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
476 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
477 if len(x.accountTokens[st.accName]) == 0 {
478 delete(x.accountTokens, st.accName)
481 if time.Now().After(st.validUntil) {
482 return "", "", "", false, nil
484 return st.accName, st.address, st.sessionToken, true, nil
487// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
492// ensure we have a non-nil moreHeaders, taking it from Settings.
493func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
494 if moreHeaders != nil {
495 return moreHeaders, nil
498 s := store.Settings{ID: 1}
499 if err := tx.Get(&s); err != nil {
500 return nil, fmt.Errorf("get settings: %v", err)
502 moreHeaders = s.ShowHeaders
503 if moreHeaders == nil {
504 moreHeaders = []string{} // Ensure we won't get Settings again next call.
506 return moreHeaders, nil
509// serveEvents serves an SSE connection. Authentication is done through a query
510// string parameter "singleUseToken", a one-time-use token returned by the Token
512func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
513 if r.Method != "GET" {
514 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
518 flusher, ok := w.(http.Flusher)
520 log.Error("internal error: ResponseWriter not a http.Flusher")
521 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
526 token := q.Get("singleUseToken")
528 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
531 accName, address, sessionToken, ok, err := sseTokens.check(token)
533 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
537 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
540 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
541 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
545 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
546 // incoming responses with its slow-network similation.
547 var waitMin, waitMax time.Duration
548 waitMinMsec := q.Get("waitMinMsec")
549 waitMaxMsec := q.Get("waitMaxMsec")
550 if waitMinMsec != "" && waitMaxMsec != "" {
551 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
552 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
555 waitMin = time.Duration(v) * time.Millisecond
558 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
559 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
562 waitMax = time.Duration(v) * time.Millisecond
566 // Parse the request with initial mailbox/search criteria.
568 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
569 dec.DisallowUnknownFields()
570 if err := dec.Decode(&req); err != nil {
571 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
573 } else if req.Page.Count <= 0 {
574 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
577 if req.Query.Threading == "" {
578 req.Query.Threading = ThreadOff
581 var writer *eventWriter
583 metricSSEConnections.Inc()
584 defer metricSSEConnections.Dec()
586 // Below here, error handling cause through xcheckf, which panics with
587 // *sherpa.Error, after which we send an error event to the client. We can also get
588 // an *ioErr when the connection is broken.
594 if err, ok := x.(*sherpa.Error); ok {
595 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
596 } else if _, ok := x.(ioErr); ok {
599 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
601 metrics.PanicInc(metrics.Webmail)
607 h.Set("Content-Type", "text/event-stream")
608 h.Set("Cache-Control", "no-cache")
610 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
611 // keys), so should be quite compressible.
613 gz := mox.AcceptsGzip(r)
615 h.Set("Content-Encoding", "gzip")
616 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
620 out = httpFlusher{out, flusher}
622 // We'll be writing outgoing SSE events through writer.
623 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
626 // Fetch initial data.
627 acc, err := store.OpenAccount(log, accName)
628 xcheckf(ctx, err, "open account")
631 log.Check(err, "closing account")
633 comm := store.RegisterComm(acc)
634 defer comm.Unregister()
636 // List addresses that the client can use to send email from.
637 accConf, _ := acc.Conf()
638 loginAddr, err := smtp.ParseAddress(address)
639 xcheckf(ctx, err, "parsing login address")
640 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
641 xcheckf(ctx, err, "looking up destination for login address")
642 loginName := accConf.FullName
643 if dest.FullName != "" {
644 loginName = dest.FullName
646 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
647 var addresses []MessageAddress
648 for a, dest := range accConf.Destinations {
649 name := dest.FullName
651 name = accConf.FullName
653 var ma MessageAddress
654 if strings.HasPrefix(a, "@") {
655 dom, err := dns.ParseDomain(a[1:])
656 xcheckf(ctx, err, "parsing destination address for account")
657 ma = MessageAddress{Domain: dom}
659 addr, err := smtp.ParseAddress(a)
660 xcheckf(ctx, err, "parsing destination address for account")
661 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
663 addresses = append(addresses, ma)
665 // User is allowed to send using alias address as message From address. Webmail
666 // will choose it when replying to a message sent to that address.
667 aliasAddrs := map[MessageAddress]bool{}
668 for _, a := range accConf.Aliases {
669 if a.Alias.AllowMsgFrom {
670 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
672 addresses = append(addresses, ma)
674 aliasAddrs[ma] = true
678 // We implicitly start a query. We use the reqctx for the transaction, because the
679 // transaction is passed to the query, which can be canceled.
680 reqctx, reqctxcancel := context.WithCancel(ctx)
682 // We also cancel in cancelDrain later on, but there is a brief window where the
683 // context wouldn't be canceled.
684 if reqctxcancel != nil {
690 // qtx is kept around during connection initialization, until we pass it off to the
691 // goroutine that starts querying for messages.
695 err := qtx.Rollback()
696 log.Check(err, "rolling back")
700 var mbl []store.Mailbox
701 settings := store.Settings{ID: 1}
703 // We only take the rlock when getting the tx.
704 acc.WithRLock(func() {
705 // Now a read-only transaction we'll use during the query.
706 qtx, err = acc.DB.Begin(reqctx, false)
707 xcheckf(ctx, err, "begin transaction")
709 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
710 xcheckf(ctx, err, "list mailboxes")
712 err = qtx.Get(&settings)
713 xcheckf(ctx, err, "get settings")
716 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
717 var zerofilter Filter
718 var zeronotfilter NotFilter
719 var mailbox store.Mailbox
720 var mailboxPrefixes []string
721 var matchMailboxes bool
722 mailboxIDs := map[int64]bool{}
723 mailboxName := req.Query.Filter.MailboxName
724 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
725 if mailboxName == "" {
726 mailboxName = "Inbox"
729 var inbox store.Mailbox
730 for _, e := range mbl {
731 if e.Name == mailboxName {
734 if e.Name == "Inbox" {
742 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
744 req.Query.Filter.MailboxID = mailbox.ID
745 req.Query.Filter.MailboxName = ""
746 mailboxPrefixes = []string{mailbox.Name + "/"}
747 matchMailboxes = true
748 mailboxIDs[mailbox.ID] = true
750 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
752 if req.Query.Filter.MailboxChildrenIncluded {
753 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
756 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
758 sse := sseRegister(acc.Name)
759 defer sse.unregister()
761 // Per-domain localpart config so webclient can decide if an address belongs to the account.
762 domainAddressConfigs := map[string]DomainAddressConfig{}
763 for _, a := range addresses {
764 dom, _ := mox.Conf.Domain(a.Domain)
765 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
768 // Write first event, allowing client to fill its UI with mailboxes.
769 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
770 writer.xsendEvent(ctx, log, "start", start)
772 // The goroutine doing the querying will send messages on these channels, which
773 // result in an event being written on the SSE connection.
774 viewMsgsc := make(chan EventViewMsgs)
775 viewErrc := make(chan EventViewErr)
776 viewResetc := make(chan EventViewReset)
777 donec := make(chan int64) // When request is done.
779 // Start a view, it determines if we send a change to the client. And start an
780 // implicit query for messages, we'll send the messages to the client which can
781 // fill its ui with messages.
782 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
783 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
784 qtx = nil // viewRequestTx closes qtx
786 // When canceling a query, we must drain its messages until it says it is done.
787 // Otherwise the sending goroutine would hang indefinitely on a channel send.
788 cancelDrain := func() {
789 if reqctxcancel != nil {
790 // Cancel the goroutine doing the querying.
798 // Drain events until done.
810 // If we stop and a query is in progress, we must drain the channel it will send on.
813 // Changes broadcasted by other connections on this account. If applicable for the
814 // connection/view, we send events.
815 xprocessChanges := func(changes []store.Change) {
816 taggedChanges := [][2]any{}
818 // We get a transaction first time we need it.
822 err := xtx.Rollback()
823 log.Check(err, "rolling back transaction")
826 ensureTx := func() error {
833 xtx, err = acc.DB.Begin(ctx, false)
836 // This getmsg will now only be called mailboxID+UID, not with messageID set.
837 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
838 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
839 if err := ensureTx(); err != nil {
840 return store.Message{}, fmt.Errorf("transaction: %v", err)
842 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
845 // Additional headers from settings to add to MessageItems.
846 var moreHeaders []string
847 xmoreHeaders := func() []string {
849 xcheckf(ctx, err, "transaction")
851 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
852 xcheckf(ctx, err, "ensuring more headers")
856 // Return uids that are within range in view. Because the end has been reached, or
857 // because the UID is not after the last message.
858 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
859 uidsAny := make([]any, len(uids))
860 for i, uid := range uids {
864 xcheckf(ctx, err, "transaction")
865 q := bstore.QueryTx[store.Message](xtx)
866 q.FilterNonzero(store.Message{MailboxID: mailboxID})
867 q.FilterEqual("UID", uidsAny...)
868 mbOK := v.matchesMailbox(mailboxID)
869 err = q.ForEach(func(m store.Message) error {
870 _, thread := v.threadIDs[m.ThreadID]
871 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
872 changedUIDs = append(changedUIDs, m.UID)
876 xcheckf(ctx, err, "fetching messages for change")
880 // Forward changes that are relevant to the current view.
881 for _, change := range changes {
882 switch c := change.(type) {
883 case store.ChangeAddUID:
884 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
885 xcheckf(ctx, err, "matching new message against view")
886 m, err := getmsg(0, c.MailboxID, c.UID)
887 xcheckf(ctx, err, "get message")
888 _, thread := v.threadIDs[m.ThreadID]
893 state := msgState{acc: acc}
894 mi, err := messageItem(log, m, &state, xmoreHeaders())
896 xcheckf(ctx, err, "make messageitem")
899 mil := []MessageItem{mi}
900 if !thread && req.Query.Threading != ThreadOff {
902 xcheckf(ctx, err, "transaction")
903 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
904 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
905 mil = append(mil, more...)
906 v.threadIDs[m.ThreadID] = struct{}{}
909 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
911 // If message extends the view, store it as such.
912 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
913 v.LastMessageReceived = m.Received
916 case store.ChangeRemoveUIDs:
917 // We may send changes for uids the client doesn't know, that's fine.
918 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
919 if len(changedUIDs) == 0 {
922 ch := ChangeMsgRemove{c}
923 ch.UIDs = changedUIDs
924 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
926 case store.ChangeFlags:
927 // We may send changes for uids the client doesn't know, that's fine.
928 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
929 if len(changedUIDs) == 0 {
932 ch := ChangeMsgFlags{c}
933 ch.UID = changedUIDs[0]
934 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
936 case store.ChangeThread:
937 // Change in muted/collaped state, just always ship it.
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
940 case store.ChangeRemoveMailbox:
941 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
943 case store.ChangeAddMailbox:
944 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
946 case store.ChangeRenameMailbox:
947 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
949 case store.ChangeMailboxCounts:
950 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
952 case store.ChangeMailboxSpecialUse:
953 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
955 case store.ChangeMailboxKeywords:
956 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
958 case store.ChangeAddSubscription:
959 // Webmail does not care about subscriptions.
962 panic(fmt.Sprintf("missing case for change %T", c))
966 if len(taggedChanges) > 0 {
967 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
968 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
972 timer := time.NewTimer(5 * time.Minute) // For keepalives.
976 timer.Reset(5 * time.Minute)
980 pending := comm.Pending
986 case <-mox.Shutdown.Done():
987 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
988 // Work around go vet, it doesn't see defer cancelDrain.
989 if reqctxcancel != nil {
995 _, err := fmt.Fprintf(out, ": keepalive\n\n")
997 log.Errorx("write keepalive", err)
998 // Work around go vet, it doesn't see defer cancelDrain.
999 if reqctxcancel != nil {
1007 case vm := <-viewMsgsc:
1008 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1009 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1014 if len(vm.MessageItems) > 0 {
1015 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1017 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1019 case ve := <-viewErrc:
1020 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1021 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1023 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
1024 // Work around go vet, it doesn't see defer cancelDrain.
1025 if reqctxcancel != nil {
1030 writer.xsendEvent(ctx, log, "viewErr", ve)
1032 case vr := <-viewResetc:
1033 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1034 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1036 writer.xsendEvent(ctx, log, "viewReset", vr)
1039 if id != v.Request.ID {
1040 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1042 if reqctxcancel != nil {
1048 case req := <-sse.Request:
1053 v = view{req, time.Time{}, false, false, nil, nil}
1057 reqctx, reqctxcancel = context.WithCancel(ctx)
1059 stop := func() (stop bool) {
1060 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1065 err = rtx.Rollback()
1066 log.Check(err, "rolling back transaction")
1069 acc.WithRLock(func() {
1070 rtx, err = acc.DB.Begin(reqctx, false)
1077 if errors.Is(err, context.Canceled) {
1080 err := fmt.Errorf("begin transaction: %v", err)
1081 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1082 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1086 // Reset view state for new query.
1087 if req.ViewID != v.Request.ViewID {
1088 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1089 if req.Query.Filter.MailboxChildrenIncluded {
1090 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1092 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1096 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1105 xprocessChanges(comm.Get())
1108 // Work around go vet, it doesn't see defer cancelDrain.
1109 if reqctxcancel != nil {
1117// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1118// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1119// mailboxIDs must or must not match. mailboxPrefixes is for use with
1120// xgatherMailboxIDs to gather children of the mailboxIDs.
1121func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1122 matchMailboxes = true
1123 mailboxIDs = map[int64]bool{}
1124 if f.MailboxID == -1 {
1125 matchMailboxes = false
1126 // Add the trash, junk and account rejects mailbox.
1127 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1128 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1129 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1130 mailboxIDs[mb.ID] = true
1134 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1135 } else if f.MailboxID > 0 {
1136 mb := store.Mailbox{ID: f.MailboxID}
1138 xcheckf(ctx, err, "get mailbox")
1139 mailboxIDs[f.MailboxID] = true
1140 mailboxPrefixes = []string{mb.Name + "/"}
1145// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1146// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1147func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1148 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1149 if len(mailboxPrefixes) == 0 {
1152 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1153 for _, p := range mailboxPrefixes {
1154 if strings.HasPrefix(mb.Name, p) {
1155 mailboxIDs[mb.ID] = true
1161 xcheckf(ctx, err, "gathering mailboxes")
1164// matchesMailbox returns whether a mailbox matches the view.
1165func (v view) matchesMailbox(mailboxID int64) bool {
1166 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1169// inRange returns whether m is within the range for the view, whether a change for
1170// this message should be sent to the client so it can update its state.
1171func (v view) inRange(m store.Message) bool {
1172 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1175// matches checks if the message, identified by either messageID or mailboxID+UID,
1176// is in the current "view" (i.e. passing the filters, and if checkRange is set
1177// also if within the range of sent messages based on sort order and the last seen
1178// message). getmsg retrieves the message, which may be necessary depending on the
1179// active filters. Used to determine if a store.Change with a new message should be
1180// sent, and for the destination and anchor messages in view requests.
1181func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1183 ensureMessage := func() bool {
1184 if m.ID == 0 && rerr == nil {
1185 m, rerr = getmsg(messageID, mailboxID, uid)
1190 q := v.Request.Query
1192 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1195 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1198 // note: anchorMessageID is not relevant for matching.
1199 flagfilter := q.flagFilterFn()
1200 if flagfilter != nil && !flagfilter(flags, keywords) {
1204 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1207 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1211 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1214 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1218 state := msgState{acc: acc}
1220 if rerr == nil && state.err != nil {
1226 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1227 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1231 envFilter := q.envFilterFn(log, &state)
1232 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1236 headerFilter := q.headerFilterFn(log, &state)
1237 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1241 wordsFilter := q.wordsFilterFn(log, &state)
1242 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1246 // Now check that we are either within the sorting order, or "last" was sent.
1247 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1253type msgResp struct {
1254 err error // If set, an error happened and fields below are not set.
1255 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1256 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1257 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1258 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1261// viewRequestTx executes a request (query with filters, pagination) by
1262// launching a new goroutine with queryMessages, receiving results as msgResp,
1263// and sending Event* to the SSE connection.
1265// It always closes tx.
1266func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1268 err := tx.Rollback()
1269 log.Check(err, "rolling back query transaction")
1271 donec <- v.Request.ID
1273 x := recover() // Should not happen, but don't take program down if it does.
1275 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1277 metrics.PanicInc(metrics.Webmailrequest)
1281 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1282 var parsedMessage *ParsedMessage
1285 var immediate bool // No waiting, flush immediate.
1286 t := time.NewTimer(300 * time.Millisecond)
1289 sendViewMsgs := func(force bool) {
1290 if len(msgitems) == 0 && !force {
1295 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1298 t.Reset(300 * time.Millisecond)
1301 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1303 mrc := make(chan msgResp, 1)
1304 go queryMessages(ctx, log, acc, tx, v, mrc)
1308 case mr, ok := <-mrc:
1311 // Empty message list signals this query is done.
1312 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1317 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1321 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1330 msgitems = append(msgitems, mr.mil)
1332 parsedMessage = mr.pm
1339 if len(msgitems) == 0 {
1340 // Nothing to send yet. We'll send immediately when the next message comes in.
1349// queryMessages executes a query, with filter, pagination, destination message id
1350// to fetch (the message that the client had in view and wants to display again).
1351// It sends on msgc, with several types of messages: errors, whether the view is
1352// reset due to missing AnchorMessageID, and when the end of the view was reached
1353// and/or for a message.
1354func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1356 x := recover() // Should not happen, but don't take program down if it does.
1358 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1360 mrc <- msgResp{err: fmt.Errorf("query failed")}
1361 metrics.PanicInc(metrics.Webmailquery)
1367 query := v.Request.Query
1368 page := v.Request.Page
1370 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1372 checkMessage := func(id int64) (valid bool, rerr error) {
1373 m := store.Message{ID: id}
1375 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1377 } else if err != nil {
1380 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1386 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1387 if page.AnchorMessageID > 0 {
1388 // Check if message exists and (still) matches the filter.
1389 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1390 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1391 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1394 mrc <- msgResp{reset: true}
1395 page.AnchorMessageID = 0
1399 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1400 // it instead of continuing to send message till the end of the view.
1401 if page.DestMessageID > 0 {
1402 if valid, err := checkMessage(page.DestMessageID); err != nil {
1403 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1406 page.DestMessageID = 0
1410 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1412 q := bstore.QueryTx[store.Message](tx)
1413 q.FilterEqual("Expunged", false)
1414 if len(v.mailboxIDs) > 0 {
1415 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1416 // Should result in fast indexed query.
1417 for mbID := range v.mailboxIDs {
1418 q.FilterNonzero(store.Message{MailboxID: mbID})
1421 idsAny := make([]any, 0, len(v.mailboxIDs))
1422 for mbID := range v.mailboxIDs {
1423 idsAny = append(idsAny, mbID)
1425 if v.matchMailboxIDs {
1426 q.FilterEqual("MailboxID", idsAny...)
1428 q.FilterNotEqual("MailboxID", idsAny...)
1433 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1434 if page.AnchorMessageID > 0 {
1436 q.FilterFn(func(m store.Message) bool {
1440 seen = m.ID == page.AnchorMessageID
1445 // We may be added filters the the query below. The FilterFn signature does not
1446 // implement reporting errors, or anything else, just a bool. So when making the
1447 // filter functions, we give them a place to store parsed message state, and an
1448 // error. We check the error during and after query execution.
1449 state := msgState{acc: acc}
1452 flagfilter := query.flagFilterFn()
1453 if flagfilter != nil {
1454 q.FilterFn(func(m store.Message) bool {
1455 return flagfilter(m.Flags, m.Keywords)
1459 if query.Filter.Oldest != nil {
1460 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1462 if query.Filter.Newest != nil {
1463 q.FilterLessEqual("Received", *query.Filter.Newest)
1466 if query.Filter.SizeMin > 0 {
1467 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1469 if query.Filter.SizeMax > 0 {
1470 q.FilterLessEqual("Size", query.Filter.SizeMax)
1473 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1474 if attachmentFilter != nil {
1475 q.FilterFn(attachmentFilter)
1478 envFilter := query.envFilterFn(log, &state)
1479 if envFilter != nil {
1480 q.FilterFn(envFilter)
1483 headerFilter := query.headerFilterFn(log, &state)
1484 if headerFilter != nil {
1485 q.FilterFn(headerFilter)
1488 wordsFilter := query.wordsFilterFn(log, &state)
1489 if wordsFilter != nil {
1490 q.FilterFn(wordsFilter)
1493 var moreHeaders []string // From store.Settings.ShowHeaders
1496 q.SortAsc("Received")
1498 q.SortDesc("Received")
1500 found := page.DestMessageID <= 0
1503 err := q.ForEach(func(m store.Message) error {
1504 // Check for an error in one of the filters, propagate it.
1505 if state.err != nil {
1509 if have >= page.Count && found || have > 10000 {
1511 return bstore.StopForEach
1514 if _, ok := v.threadIDs[m.ThreadID]; ok {
1515 // Message was already returned as part of a thread.
1519 var pm *ParsedMessage
1520 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1521 // For threads, if there was no DestMessageID, we may be getting the newest
1522 // message. For an initial view, this isn't necessarily the first the user is
1523 // expected to read first, that would be the first unread, which we'll get below
1524 // when gathering the thread.
1526 xpm, err := parsedMessage(log, m, &state, true, false, false)
1527 if err != nil && errors.Is(err, message.ErrHeader) {
1528 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1529 } else if err != nil {
1530 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1537 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1539 return fmt.Errorf("ensuring more headers: %v", err)
1542 mi, err := messageItem(log, m, &state, moreHeaders)
1544 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1546 mil := []MessageItem{mi}
1547 if query.Threading != ThreadOff {
1548 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders)
1550 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1556 mil = append(mil, more...)
1557 v.threadIDs[m.ThreadID] = struct{}{}
1559 // Calculate how many messages the frontend is going to show, and only count those as returned.
1560 collapsed := map[int64]bool{}
1561 for _, mi := range mil {
1562 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1564 unread := map[int64]bool{} // Propagated to thread root.
1565 if query.Threading == ThreadUnread {
1566 for _, mi := range mil {
1571 unread[mm.ID] = true
1572 for _, id := range mm.ThreadParentIDs {
1577 for _, mi := range mil {
1581 for _, id := range mm.ThreadParentIDs {
1582 if _, ok := collapsed[id]; ok {
1587 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1594 if pm != nil && len(pm.envelope.From) == 1 {
1595 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1597 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1600 mrc <- msgResp{mil: mil, pm: pm}
1603 // Check for an error in one of the filters again. Check in ForEach would not
1604 // trigger if the last message has the error.
1605 if err == nil && state.err != nil {
1609 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1613 mrc <- msgResp{viewEnd: true}
1617func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string) ([]MessageItem, *ParsedMessage, error) {
1618 if m.ThreadID == 0 {
1619 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1620 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1623 // Fetch other messages for this thread.
1624 qt := bstore.QueryTx[store.Message](tx)
1625 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1626 qt.FilterEqual("Expunged", false)
1627 qt.FilterNotEqual("ID", m.ID)
1629 tml, err := qt.List()
1631 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1634 var mil []MessageItem
1635 var pm *ParsedMessage
1636 var firstUnread bool
1637 for _, tm := range tml {
1638 err := func() error {
1639 xstate := msgState{acc: acc}
1640 defer xstate.clear()
1642 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1644 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1646 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1650 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1652 mil = append(mil, mi)
1654 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1655 firstUnread = !tm.Seen
1656 xpm, err := parsedMessage(log, tm, &xstate, true, false, false)
1657 if err != nil && errors.Is(err, message.ErrHeader) {
1658 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1659 } else if err != nil {
1660 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1668 return nil, nil, err
1672 // Finally, the message that caused us to gather this thread (which is likely the
1673 // most recent message in the thread) could be the only unread message.
1674 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1675 xstate := msgState{acc: acc}
1676 defer xstate.clear()
1677 xpm, err := parsedMessage(log, m, &xstate, true, false, false)
1678 if err != nil && errors.Is(err, message.ErrHeader) {
1679 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1680 } else if err != nil {
1681 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1690// While checking the filters on a message, we may need to get more message
1691// details as each filter passes. We check the filters that need the basic
1692// information first, and load and cache more details for the next filters.
1693// msgState holds parsed details for a message, it is updated while filtering,
1694// with more information or reset for a next message.
1695type msgState struct {
1696 acc *store.Account // Never changes during lifetime.
1697 err error // Once set, doesn't get cleared.
1699 part *message.Part // Will be without Reader when msgr is nil.
1700 msgr *store.MsgReader
1703func (ms *msgState) clear() {
1708 *ms = msgState{acc: ms.acc, err: ms.err}
1711func (ms *msgState) ensureMsg(m store.Message) {
1712 if m.ID != ms.m.ID {
1718func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1723 if m.ParsedBuf == nil {
1724 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1728 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1729 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1734 if withMsgReader && ms.msgr == nil {
1735 ms.msgr = ms.acc.MessageReader(m)
1736 ms.part.SetReaderAt(ms.msgr)
1739 return ms.part != nil
1742// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1743// filters for a query. A nil function is returned if there are no flags to filter
1745func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1746 labels := map[string]bool{}
1747 for _, k := range q.Filter.Labels {
1750 for _, k := range q.NotFilter.Labels {
1754 if len(labels) == 0 {
1758 var mask, flags store.Flags
1759 systemflags := map[string][]*bool{
1760 `\answered`: {&mask.Answered, &flags.Answered},
1761 `\flagged`: {&mask.Flagged, &flags.Flagged},
1762 `\deleted`: {&mask.Deleted, &flags.Deleted},
1763 `\seen`: {&mask.Seen, &flags.Seen},
1764 `\draft`: {&mask.Draft, &flags.Draft},
1765 `$junk`: {&mask.Junk, &flags.Junk},
1766 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1767 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1768 `$phishing`: {&mask.Phishing, &flags.Phishing},
1769 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1771 keywords := map[string]bool{}
1772 for k, v := range labels {
1773 k = strings.ToLower(k)
1774 if mf, ok := systemflags[k]; ok {
1781 return func(msgFlags store.Flags, msgKeywords []string) bool {
1783 if f.Set(mask, msgFlags) != flags {
1786 for k, v := range keywords {
1787 if slices.Contains(msgKeywords, k) != v {
1795// attachmentFilterFn returns a function that filters for the attachment-related
1796// filter from the query. A nil function is returned if there are attachment
1798func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1799 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1803 return func(m store.Message) bool {
1804 if !state.ensurePart(m, false) {
1807 types, err := attachmentTypes(log, m, state)
1812 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1816var attachmentMimetypes = map[string]AttachmentType{
1817 "application/pdf": AttachmentPDF,
1818 "application/zip": AttachmentArchive,
1819 "application/x-rar-compressed": AttachmentArchive,
1820 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1821 "application/vnd.ms-excel": AttachmentSpreadsheet,
1822 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1823 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1824 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1825 "application/vnd.ms-powerpoint": AttachmentPresentation,
1826 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1828var attachmentExtensions = map[string]AttachmentType{
1829 ".pdf": AttachmentPDF,
1830 ".zip": AttachmentArchive,
1831 ".tar": AttachmentArchive,
1832 ".tgz": AttachmentArchive,
1833 ".tar.gz": AttachmentArchive,
1834 ".tbz2": AttachmentArchive,
1835 ".tar.bz2": AttachmentArchive,
1836 ".tar.lz": AttachmentArchive,
1837 ".tlz": AttachmentArchive,
1838 ".tar.xz": AttachmentArchive,
1839 ".txz": AttachmentArchive,
1840 ".tar.zst": AttachmentArchive,
1841 ".tar.lz4": AttachmentArchive,
1842 ".7z": AttachmentArchive,
1843 ".rar": AttachmentArchive,
1844 ".ods": AttachmentSpreadsheet,
1845 ".xls": AttachmentSpreadsheet,
1846 ".xlsx": AttachmentSpreadsheet,
1847 ".odt": AttachmentDocument,
1848 ".doc": AttachmentDocument,
1849 ".docx": AttachmentDocument,
1850 ".odp": AttachmentPresentation,
1851 ".ppt": AttachmentPresentation,
1852 ".pptx": AttachmentPresentation,
1855func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1856 types := map[AttachmentType]bool{}
1858 pm, err := parsedMessage(log, m, state, false, false, false)
1860 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1862 for _, a := range pm.attachments {
1863 if a.Part.MediaType == "IMAGE" {
1864 types[AttachmentImage] = true
1867 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1868 if t, ok := attachmentMimetypes[mt]; ok {
1872 _, filename, err := a.Part.DispositionFilename()
1873 if err != nil && errors.Is(err, message.ErrParamEncoding) {
1874 log.Debugx("parsing disposition/filename", err)
1875 } else if err != nil {
1876 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1878 if ext := filepath.Ext(filename); ext != "" {
1879 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1885 if len(types) == 0 {
1886 types[AttachmentNone] = true
1888 types[AttachmentAny] = true
1893// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1894// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1895// clash with SMTP envelope) for the query. A nil function is returned if no
1896// filtering is needed.
1897func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1898 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1902 lower := func(l []string) []string {
1906 r := make([]string, len(l))
1907 for i, s := range l {
1908 r[i] = strings.ToLower(s)
1913 filterSubject := lower(q.Filter.Subject)
1914 notFilterSubject := lower(q.NotFilter.Subject)
1915 filterFrom := lower(q.Filter.From)
1916 notFilterFrom := lower(q.NotFilter.From)
1917 filterTo := lower(q.Filter.To)
1918 notFilterTo := lower(q.NotFilter.To)
1920 return func(m store.Message) bool {
1921 if !state.ensurePart(m, false) {
1925 var env message.Envelope
1926 if state.part.Envelope != nil {
1927 env = *state.part.Envelope
1930 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1931 subject := strings.ToLower(env.Subject)
1932 for _, s := range filterSubject {
1933 if !strings.Contains(subject, s) {
1937 for _, s := range notFilterSubject {
1938 if strings.Contains(subject, s) {
1944 contains := func(textLower []string, l []message.Address, all bool) bool {
1946 for _, s := range textLower {
1947 for _, a := range l {
1948 name := strings.ToLower(a.Name)
1949 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1950 if strings.Contains(name, s) || strings.Contains(addr, s) {
1964 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1967 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1970 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1971 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1972 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1975 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1983// headerFilterFn returns a function that filters for the header filters in the
1984// query. A nil function is returned if there are no header filters.
1985func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1986 if len(q.Filter.Headers) == 0 {
1990 lowerValues := make([]string, len(q.Filter.Headers))
1991 for i, t := range q.Filter.Headers {
1992 lowerValues[i] = strings.ToLower(t[1])
1995 return func(m store.Message) bool {
1996 if !state.ensurePart(m, true) {
1999 hdr, err := state.part.Header()
2001 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2006 for i, t := range q.Filter.Headers {
2010 if v == "" && len(l) > 0 {
2013 for _, e := range l {
2014 if strings.Contains(strings.ToLower(e), v) {
2024// wordFiltersFn returns a function that applies the word filters of the query. A
2025// nil function is returned when query does not contain a word filter.
2026func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2027 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2031 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2033 return func(m store.Message) bool {
2034 if !state.ensurePart(m, true) {
2038 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2039 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)