3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
43 SSEID int64 // SSE connection.
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
49 // If set, this request and its view are canceled. A new view must be started.
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
64// Query is a request for messages that match filters, in a given order.
66 OrderAsc bool // Order by received ascending or desending.
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
101 Words []string // Case insensitive substring match for each string.
103 To []string // Including Cc and Bcc.
107 Attachments AttachmentType
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
120 Attachments AttachmentType
124// Page holds pagination parameters for a request.
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
180// ParsedMessage has more parsed/derived information about a message, intended
181// for rendering the (contents of the) message. Information from MessageItem is
183type ParsedMessage struct {
186 Headers map[string][]string
187 ViewMode store.ViewMode
189 Texts []string // Contents of text parts, can be empty.
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
198 ListReplyAddress *MessageAddress // From List-Post.
200 TextPaths [][]int // Paths to text parts.
201 HTMLPath []int // Path to HTML part.
203 // Information used by MessageItem, not exported in this type.
204 envelope MessageEnvelope
205 attachments []Attachment
211// EventStart is the first message sent on an SSE connection, giving the client
212// basic data to populate its UI. After this event, messages will follow quickly in
213// an EventViewMsgs event.
214type EventStart struct {
216 LoginAddress MessageAddress
217 Addresses []MessageAddress
218 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
220 Mailboxes []store.Mailbox
221 RejectsMailbox string
222 Settings store.Settings
223 AccountPath string // If nonempty, the path on same host to webaccount interface.
227// DomainAddressConfig has the address (localpart) configuration for a domain, so
228// the webmail client can decide if an address matches the addresses of the
230type DomainAddressConfig struct {
231 LocalpartCatchallSeparator string // Can be empty.
232 LocalpartCaseSensitive bool
235// EventViewMsgs contains messages for a view, possibly a continuation of an
236// earlier list of messages.
237type EventViewMsgs struct {
241 // If empty, this was the last message for the request. If non-empty, a list of
242 // thread messages. Each with the first message being the reason this thread is
243 // included and can be used as AnchorID in followup requests. If the threading mode
244 // is "off" in the query, there will always be only a single message. If a thread
245 // is sent, all messages in the thread are sent, including those that don't match
246 // the query (e.g. from another mailbox). Threads can be displayed based on the
247 // ThreadParentIDs field, with possibly slightly different display based on field
248 // ThreadMissingLink.
249 MessageItems [][]MessageItem
251 // If set, will match the target page.DestMessageID from the request.
252 ParsedMessage *ParsedMessage
254 // If set, there are no more messages in this view at this moment. Messages can be
255 // added, typically via Change messages, e.g. for new deliveries.
259// EventViewErr indicates an error during a query for messages. The request is
260// aborted, no more request-related messages will be sent until the next request.
261type EventViewErr struct {
264 Err string // To be displayed in client.
265 err error // Original message, for checking against context.Canceled.
268// EventViewReset indicates that a request for the next set of messages in a few
269// could not be fulfilled, e.g. because the anchor message does not exist anymore.
270// The client should clear its list of messages. This can happen before
271// EventViewMsgs events are sent.
272type EventViewReset struct {
277// EventViewChanges contain one or more changes relevant for the client, either
278// with new mailbox total/unseen message counts, or messages added/removed/modified
279// (flags) for the current view.
280type EventViewChanges struct {
282 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
285// ChangeMsgAdd adds a new message and possibly its thread to the view.
286type ChangeMsgAdd struct {
288 MessageItems []MessageItem
291// ChangeMsgRemove removes one or more messages from the view.
292type ChangeMsgRemove struct {
293 store.ChangeRemoveUIDs
296// ChangeMsgFlags updates flags for one message.
297type ChangeMsgFlags struct {
301// ChangeMsgThread updates muted/collapsed fields for one message.
302type ChangeMsgThread struct {
306// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
307type ChangeMailboxRemove struct {
308 store.ChangeRemoveMailbox
311// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
312type ChangeMailboxAdd struct {
313 Mailbox store.Mailbox
316// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
317// It could be under a new parent.
318type ChangeMailboxRename struct {
319 store.ChangeRenameMailbox
322// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
323type ChangeMailboxCounts struct {
324 store.ChangeMailboxCounts
327// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
328type ChangeMailboxSpecialUse struct {
329 store.ChangeMailboxSpecialUse
332// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
333// a message was added with a keyword that wasn't in the mailbox yet.
334type ChangeMailboxKeywords struct {
335 store.ChangeMailboxKeywords
338// View holds the information about the returned data for a query. It is used to
339// determine whether mailbox changes should be sent to the client, we only send
340// addition/removal/flag-changes of messages that are in view, or would extend it
341// if the view is at the end of the results.
345 // Received of last message we sent to the client. We use it to decide if a newly
346 // delivered message is within the view and the client should get a notification.
347 LastMessageReceived time.Time
349 // If set, the last message in the query view has been sent. There is no need to do
350 // another query, it will not return more data. Used to decide if an event for a
351 // new message should be sent.
354 // Whether message must or must not match mailboxIDs.
356 // Mailboxes to match, can be multiple, for matching children. If empty, there is
357 // no filter on mailboxes.
358 mailboxIDs map[int64]bool
360 // Threads sent to client. New messages for this thread are also sent, regardless
361 // of regular query matching, so also for other mailboxes. If the user (re)moved
362 // all messages of a thread, they may still receive events for the thread. Only
363 // filled when query with threading not off.
364 threadIDs map[int64]struct{}
367// sses tracks all sse connections, and access to them.
374// sse represents an sse connection.
376 ID int64 // Also returned in EventStart and used in Request to identify the request.
377 AccountName string // Used to check the authenticated user has access to the SSE connection.
378 Request chan Request // Goroutine will receive requests from here, coming from API calls.
381// called by the goroutine when the connection is closed or breaks.
382func (sse sse) unregister() {
385 delete(sses.m, sse.ID)
387 // Drain any pending requests, preventing blocked goroutines from API calls.
397func sseRegister(accountName string) sse {
401 v := sse{sses.gen, accountName, make(chan Request, 1)}
406// sseGet returns a reference to an existing connection if it exists and user
408func sseGet(id int64, accountName string) (sse, bool) {
412 if s.AccountName != accountName {
418// ssetoken is a temporary token that has not yet been used to start an SSE
419// connection. Created by Token, consumed by a new SSE connection.
420type ssetoken struct {
421 token string // Uniquely generated.
423 address string // Address used to authenticate in call that created the token.
424 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
428// ssetokens maintains unused tokens. We have just one, but it's a type so we
429// can define methods.
430type ssetokens struct {
432 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
433 tokens map[string]ssetoken // Token to details, for finding account for a token.
436var sseTokens = ssetokens{
437 accountTokens: map[string][]ssetoken{},
438 tokens: map[string]ssetoken{},
441// xgenerate creates and saves a new token. It ensures no more than 10 tokens
442// per account exist, removing old ones if needed.
443func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
444 buf := make([]byte, 16)
445 _, err := cryptrand.Read(buf)
446 xcheckf(ctx, err, "generating token")
447 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
451 n := len(x.accountTokens[accName])
453 for _, ost := range x.accountTokens[accName][:n-9] {
454 delete(x.tokens, ost.token)
456 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
457 x.accountTokens[accName] = x.accountTokens[accName][:9]
459 x.accountTokens[accName] = append(x.accountTokens[accName], st)
460 x.tokens[st.token] = st
464// check verifies a token, and consumes it if valid.
465func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
469 st, ok := x.tokens[token]
471 return "", "", "", false, nil
473 delete(x.tokens, token)
474 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
475 return "", "", "", false, errors.New("internal error, could not find token in account")
477 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
478 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
479 if len(x.accountTokens[st.accName]) == 0 {
480 delete(x.accountTokens, st.accName)
483 if time.Now().After(st.validUntil) {
484 return "", "", "", false, nil
486 return st.accName, st.address, st.sessionToken, true, nil
489// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
494// ensure we have a non-nil moreHeaders, taking it from Settings.
495func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
496 if moreHeaders != nil {
497 return moreHeaders, nil
500 s := store.Settings{ID: 1}
501 if err := tx.Get(&s); err != nil {
502 return nil, fmt.Errorf("get settings: %v", err)
504 moreHeaders = s.ShowHeaders
505 if moreHeaders == nil {
506 moreHeaders = []string{} // Ensure we won't get Settings again next call.
508 return moreHeaders, nil
511// serveEvents serves an SSE connection. Authentication is done through a query
512// string parameter "singleUseToken", a one-time-use token returned by the Token
514func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
515 if r.Method != "GET" {
516 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
520 flusher, ok := w.(http.Flusher)
522 log.Error("internal error: ResponseWriter not a http.Flusher")
523 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
528 token := q.Get("singleUseToken")
530 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
533 accName, address, sessionToken, ok, err := sseTokens.check(token)
535 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
539 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
542 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
543 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
547 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
548 // incoming responses with its slow-network similation.
549 var waitMin, waitMax time.Duration
550 waitMinMsec := q.Get("waitMinMsec")
551 waitMaxMsec := q.Get("waitMaxMsec")
552 if waitMinMsec != "" && waitMaxMsec != "" {
553 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
554 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
557 waitMin = time.Duration(v) * time.Millisecond
560 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
561 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
564 waitMax = time.Duration(v) * time.Millisecond
568 // Parse the request with initial mailbox/search criteria.
570 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
571 dec.DisallowUnknownFields()
572 if err := dec.Decode(&req); err != nil {
573 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
575 } else if req.Page.Count <= 0 {
576 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
579 if req.Query.Threading == "" {
580 req.Query.Threading = ThreadOff
583 var writer *eventWriter
585 metricSSEConnections.Inc()
586 defer metricSSEConnections.Dec()
588 // Below here, error handling cause through xcheckf, which panics with
589 // *sherpa.Error, after which we send an error event to the client. We can also get
590 // an *ioErr when the connection is broken.
596 if err, ok := x.(*sherpa.Error); ok {
597 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
598 } else if _, ok := x.(ioErr); ok {
601 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
603 metrics.PanicInc(metrics.Webmail)
609 h.Set("Content-Type", "text/event-stream")
610 h.Set("Cache-Control", "no-cache")
612 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
613 // keys), so should be quite compressible.
615 gz := mox.AcceptsGzip(r)
617 h.Set("Content-Encoding", "gzip")
618 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
622 out = httpFlusher{out, flusher}
624 // We'll be writing outgoing SSE events through writer.
625 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
628 // Fetch initial data.
629 acc, err := store.OpenAccount(log, accName, true)
630 xcheckf(ctx, err, "open account")
633 log.Check(err, "closing account")
635 comm := store.RegisterComm(acc)
636 defer comm.Unregister()
638 // List addresses that the client can use to send email from.
639 accConf, _ := acc.Conf()
640 loginAddr, err := smtp.ParseAddress(address)
641 xcheckf(ctx, err, "parsing login address")
642 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
643 xcheckf(ctx, err, "looking up destination for login address")
644 loginName := accConf.FullName
645 if dest.FullName != "" {
646 loginName = dest.FullName
648 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
649 var addresses []MessageAddress
650 for a, dest := range accConf.Destinations {
651 name := dest.FullName
653 name = accConf.FullName
655 var ma MessageAddress
656 if strings.HasPrefix(a, "@") {
657 dom, err := dns.ParseDomain(a[1:])
658 xcheckf(ctx, err, "parsing destination address for account")
659 ma = MessageAddress{Domain: dom}
661 addr, err := smtp.ParseAddress(a)
662 xcheckf(ctx, err, "parsing destination address for account")
663 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
665 addresses = append(addresses, ma)
667 // User is allowed to send using alias address as message From address. Webmail
668 // will choose it when replying to a message sent to that address.
669 aliasAddrs := map[MessageAddress]bool{}
670 for _, a := range accConf.Aliases {
671 if a.Alias.AllowMsgFrom {
672 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
674 addresses = append(addresses, ma)
676 aliasAddrs[ma] = true
680 // We implicitly start a query. We use the reqctx for the transaction, because the
681 // transaction is passed to the query, which can be canceled.
682 reqctx, reqctxcancel := context.WithCancel(ctx)
684 // We also cancel in cancelDrain later on, but there is a brief window where the
685 // context wouldn't be canceled.
686 if reqctxcancel != nil {
692 // qtx is kept around during connection initialization, until we pass it off to the
693 // goroutine that starts querying for messages.
697 err := qtx.Rollback()
698 log.Check(err, "rolling back")
702 var mbl []store.Mailbox
703 settings := store.Settings{ID: 1}
705 // We only take the rlock when getting the tx.
706 acc.WithRLock(func() {
707 // Now a read-only transaction we'll use during the query.
708 qtx, err = acc.DB.Begin(reqctx, false)
709 xcheckf(ctx, err, "begin transaction")
711 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
712 xcheckf(ctx, err, "list mailboxes")
714 err = qtx.Get(&settings)
715 xcheckf(ctx, err, "get settings")
718 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
719 var zerofilter Filter
720 var zeronotfilter NotFilter
721 var mailbox store.Mailbox
722 var mailboxPrefixes []string
723 var matchMailboxes bool
724 mailboxIDs := map[int64]bool{}
725 mailboxName := req.Query.Filter.MailboxName
726 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
727 if mailboxName == "" {
728 mailboxName = "Inbox"
731 var inbox store.Mailbox
732 for _, e := range mbl {
733 if e.Name == mailboxName {
736 if e.Name == "Inbox" {
744 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
746 req.Query.Filter.MailboxID = mailbox.ID
747 req.Query.Filter.MailboxName = ""
748 mailboxPrefixes = []string{mailbox.Name + "/"}
749 matchMailboxes = true
750 mailboxIDs[mailbox.ID] = true
752 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
754 if req.Query.Filter.MailboxChildrenIncluded {
755 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
758 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
760 sse := sseRegister(acc.Name)
761 defer sse.unregister()
763 // Per-domain localpart config so webclient can decide if an address belongs to the account.
764 domainAddressConfigs := map[string]DomainAddressConfig{}
765 for _, a := range addresses {
766 dom, _ := mox.Conf.Domain(a.Domain)
767 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
770 // Write first event, allowing client to fill its UI with mailboxes.
771 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
772 writer.xsendEvent(ctx, log, "start", start)
774 // The goroutine doing the querying will send messages on these channels, which
775 // result in an event being written on the SSE connection.
776 viewMsgsc := make(chan EventViewMsgs)
777 viewErrc := make(chan EventViewErr)
778 viewResetc := make(chan EventViewReset)
779 donec := make(chan int64) // When request is done.
781 // Start a view, it determines if we send a change to the client. And start an
782 // implicit query for messages, we'll send the messages to the client which can
783 // fill its ui with messages.
784 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
785 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
786 qtx = nil // viewRequestTx closes qtx
788 // When canceling a query, we must drain its messages until it says it is done.
789 // Otherwise the sending goroutine would hang indefinitely on a channel send.
790 cancelDrain := func() {
791 if reqctxcancel != nil {
792 // Cancel the goroutine doing the querying.
800 // Drain events until done.
812 // If we stop and a query is in progress, we must drain the channel it will send on.
815 // Changes broadcasted by other connections on this account. If applicable for the
816 // connection/view, we send events.
817 xprocessChanges := func(changes []store.Change) {
818 taggedChanges := [][2]any{}
820 // We get a transaction first time we need it.
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
828 ensureTx := func() error {
835 xtx, err = acc.DB.Begin(ctx, false)
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
851 xcheckf(ctx, err, "transaction")
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
878 xcheckf(ctx, err, "fetching messages for change")
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
895 state := msgState{acc: acc}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
898 xcheckf(ctx, err, "make messageitem")
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
918 case store.ChangeRemoveUIDs:
919 // We may send changes for uids the client doesn't know, that's fine.
920 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
921 if len(changedUIDs) == 0 {
924 ch := ChangeMsgRemove{c}
925 ch.UIDs = changedUIDs
926 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
928 case store.ChangeFlags:
929 // We may send changes for uids the client doesn't know, that's fine.
930 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
931 if len(changedUIDs) == 0 {
934 ch := ChangeMsgFlags{c}
935 ch.UID = changedUIDs[0]
936 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
938 case store.ChangeThread:
939 // Change in muted/collaped state, just always ship it.
940 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
942 case store.ChangeRemoveMailbox:
943 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
945 case store.ChangeAddMailbox:
946 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
948 case store.ChangeRenameMailbox:
949 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
951 case store.ChangeMailboxCounts:
952 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
954 case store.ChangeMailboxSpecialUse:
955 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
957 case store.ChangeMailboxKeywords:
958 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
960 case store.ChangeAddSubscription:
961 // Webmail does not care about subscriptions.
964 panic(fmt.Sprintf("missing case for change %T", c))
968 if len(taggedChanges) > 0 {
969 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
970 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
974 timer := time.NewTimer(5 * time.Minute) // For keepalives.
978 timer.Reset(5 * time.Minute)
982 pending := comm.Pending
988 case <-mox.Shutdown.Done():
989 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
990 // Work around go vet, it doesn't see defer cancelDrain.
991 if reqctxcancel != nil {
997 _, err := fmt.Fprintf(out, ": keepalive\n\n")
999 log.Errorx("write keepalive", err)
1000 // Work around go vet, it doesn't see defer cancelDrain.
1001 if reqctxcancel != nil {
1009 case vm := <-viewMsgsc:
1010 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1011 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1016 if len(vm.MessageItems) > 0 {
1017 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1019 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1021 case ve := <-viewErrc:
1022 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1023 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1025 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
1026 // Work around go vet, it doesn't see defer cancelDrain.
1027 if reqctxcancel != nil {
1032 writer.xsendEvent(ctx, log, "viewErr", ve)
1034 case vr := <-viewResetc:
1035 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1036 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1038 writer.xsendEvent(ctx, log, "viewReset", vr)
1041 if id != v.Request.ID {
1042 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1044 if reqctxcancel != nil {
1050 case req := <-sse.Request:
1055 v = view{req, time.Time{}, false, false, nil, nil}
1059 reqctx, reqctxcancel = context.WithCancel(ctx)
1061 stop := func() (stop bool) {
1062 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1067 err = rtx.Rollback()
1068 log.Check(err, "rolling back transaction")
1071 acc.WithRLock(func() {
1072 rtx, err = acc.DB.Begin(reqctx, false)
1079 if errors.Is(err, context.Canceled) {
1082 err := fmt.Errorf("begin transaction: %v", err)
1083 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1084 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1088 // Reset view state for new query.
1089 if req.ViewID != v.Request.ViewID {
1090 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1091 if req.Query.Filter.MailboxChildrenIncluded {
1092 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1094 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1098 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1107 xprocessChanges(comm.Get())
1110 // Work around go vet, it doesn't see defer cancelDrain.
1111 if reqctxcancel != nil {
1119// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1120// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1121// mailboxIDs must or must not match. mailboxPrefixes is for use with
1122// xgatherMailboxIDs to gather children of the mailboxIDs.
1123func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1124 matchMailboxes = true
1125 mailboxIDs = map[int64]bool{}
1126 if f.MailboxID == -1 {
1127 matchMailboxes = false
1128 // Add the trash, junk and account rejects mailbox.
1129 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1130 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1131 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1132 mailboxIDs[mb.ID] = true
1136 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1137 } else if f.MailboxID > 0 {
1138 mb := store.Mailbox{ID: f.MailboxID}
1140 xcheckf(ctx, err, "get mailbox")
1141 mailboxIDs[f.MailboxID] = true
1142 mailboxPrefixes = []string{mb.Name + "/"}
1147// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1148// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1149func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1150 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1151 if len(mailboxPrefixes) == 0 {
1154 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1155 for _, p := range mailboxPrefixes {
1156 if strings.HasPrefix(mb.Name, p) {
1157 mailboxIDs[mb.ID] = true
1163 xcheckf(ctx, err, "gathering mailboxes")
1166// matchesMailbox returns whether a mailbox matches the view.
1167func (v view) matchesMailbox(mailboxID int64) bool {
1168 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1171// inRange returns whether m is within the range for the view, whether a change for
1172// this message should be sent to the client so it can update its state.
1173func (v view) inRange(m store.Message) bool {
1174 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1177// matches checks if the message, identified by either messageID or mailboxID+UID,
1178// is in the current "view" (i.e. passing the filters, and if checkRange is set
1179// also if within the range of sent messages based on sort order and the last seen
1180// message). getmsg retrieves the message, which may be necessary depending on the
1181// active filters. Used to determine if a store.Change with a new message should be
1182// sent, and for the destination and anchor messages in view requests.
1183func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1185 ensureMessage := func() bool {
1186 if m.ID == 0 && rerr == nil {
1187 m, rerr = getmsg(messageID, mailboxID, uid)
1192 q := v.Request.Query
1194 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1197 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1200 // note: anchorMessageID is not relevant for matching.
1201 flagfilter := q.flagFilterFn()
1202 if flagfilter != nil && !flagfilter(flags, keywords) {
1206 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1209 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1213 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1216 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1220 state := msgState{acc: acc}
1222 if rerr == nil && state.err != nil {
1228 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1229 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1233 envFilter := q.envFilterFn(log, &state)
1234 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1238 headerFilter := q.headerFilterFn(log, &state)
1239 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1243 wordsFilter := q.wordsFilterFn(log, &state)
1244 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1248 // Now check that we are either within the sorting order, or "last" was sent.
1249 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1255type msgResp struct {
1256 err error // If set, an error happened and fields below are not set.
1257 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1258 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1259 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1260 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1263// viewRequestTx executes a request (query with filters, pagination) by
1264// launching a new goroutine with queryMessages, receiving results as msgResp,
1265// and sending Event* to the SSE connection.
1267// It always closes tx.
1268func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1270 err := tx.Rollback()
1271 log.Check(err, "rolling back query transaction")
1273 donec <- v.Request.ID
1275 x := recover() // Should not happen, but don't take program down if it does.
1277 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1279 metrics.PanicInc(metrics.Webmailrequest)
1283 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1284 var parsedMessage *ParsedMessage
1287 var immediate bool // No waiting, flush immediate.
1288 t := time.NewTimer(300 * time.Millisecond)
1291 sendViewMsgs := func(force bool) {
1292 if len(msgitems) == 0 && !force {
1297 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1300 t.Reset(300 * time.Millisecond)
1303 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1305 mrc := make(chan msgResp, 1)
1306 go queryMessages(ctx, log, acc, tx, v, mrc)
1310 case mr, ok := <-mrc:
1313 // Empty message list signals this query is done.
1314 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1319 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1323 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1332 msgitems = append(msgitems, mr.mil)
1334 parsedMessage = mr.pm
1341 if len(msgitems) == 0 {
1342 // Nothing to send yet. We'll send immediately when the next message comes in.
1351// queryMessages executes a query, with filter, pagination, destination message id
1352// to fetch (the message that the client had in view and wants to display again).
1353// It sends on msgc, with several types of messages: errors, whether the view is
1354// reset due to missing AnchorMessageID, and when the end of the view was reached
1355// and/or for a message.
1356func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1358 x := recover() // Should not happen, but don't take program down if it does.
1360 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1362 mrc <- msgResp{err: fmt.Errorf("query failed")}
1363 metrics.PanicInc(metrics.Webmailquery)
1369 query := v.Request.Query
1370 page := v.Request.Page
1372 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1374 checkMessage := func(id int64) (valid bool, rerr error) {
1375 m := store.Message{ID: id}
1377 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1379 } else if err != nil {
1382 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1388 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1389 if page.AnchorMessageID > 0 {
1390 // Check if message exists and (still) matches the filter.
1391 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1392 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1393 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1396 mrc <- msgResp{reset: true}
1397 page.AnchorMessageID = 0
1401 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1402 // it instead of continuing to send message till the end of the view.
1403 if page.DestMessageID > 0 {
1404 if valid, err := checkMessage(page.DestMessageID); err != nil {
1405 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1408 page.DestMessageID = 0
1412 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1414 q := bstore.QueryTx[store.Message](tx)
1415 q.FilterEqual("Expunged", false)
1416 if len(v.mailboxIDs) > 0 {
1417 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1418 // Should result in fast indexed query.
1419 for mbID := range v.mailboxIDs {
1420 q.FilterNonzero(store.Message{MailboxID: mbID})
1423 idsAny := make([]any, 0, len(v.mailboxIDs))
1424 for mbID := range v.mailboxIDs {
1425 idsAny = append(idsAny, mbID)
1427 if v.matchMailboxIDs {
1428 q.FilterEqual("MailboxID", idsAny...)
1430 q.FilterNotEqual("MailboxID", idsAny...)
1435 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1436 if page.AnchorMessageID > 0 {
1438 q.FilterFn(func(m store.Message) bool {
1442 seen = m.ID == page.AnchorMessageID
1447 // We may be added filters the the query below. The FilterFn signature does not
1448 // implement reporting errors, or anything else, just a bool. So when making the
1449 // filter functions, we give them a place to store parsed message state, and an
1450 // error. We check the error during and after query execution.
1451 state := msgState{acc: acc}
1454 flagfilter := query.flagFilterFn()
1455 if flagfilter != nil {
1456 q.FilterFn(func(m store.Message) bool {
1457 return flagfilter(m.Flags, m.Keywords)
1461 if query.Filter.Oldest != nil {
1462 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1464 if query.Filter.Newest != nil {
1465 q.FilterLessEqual("Received", *query.Filter.Newest)
1468 if query.Filter.SizeMin > 0 {
1469 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1471 if query.Filter.SizeMax > 0 {
1472 q.FilterLessEqual("Size", query.Filter.SizeMax)
1475 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1476 if attachmentFilter != nil {
1477 q.FilterFn(attachmentFilter)
1480 envFilter := query.envFilterFn(log, &state)
1481 if envFilter != nil {
1482 q.FilterFn(envFilter)
1485 headerFilter := query.headerFilterFn(log, &state)
1486 if headerFilter != nil {
1487 q.FilterFn(headerFilter)
1490 wordsFilter := query.wordsFilterFn(log, &state)
1491 if wordsFilter != nil {
1492 q.FilterFn(wordsFilter)
1495 var moreHeaders []string // From store.Settings.ShowHeaders
1498 q.SortAsc("Received")
1500 q.SortDesc("Received")
1502 found := page.DestMessageID <= 0
1505 err := q.ForEach(func(m store.Message) error {
1506 // Check for an error in one of the filters, propagate it.
1507 if state.err != nil {
1511 if have >= page.Count && found || have > 10000 {
1513 return bstore.StopForEach
1516 if _, ok := v.threadIDs[m.ThreadID]; ok {
1517 // Message was already returned as part of a thread.
1521 var pm *ParsedMessage
1522 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1523 // For threads, if there was no DestMessageID, we may be getting the newest
1524 // message. For an initial view, this isn't necessarily the first the user is
1525 // expected to read first, that would be the first unread, which we'll get below
1526 // when gathering the thread.
1528 xpm, err := parsedMessage(log, m, &state, true, false, false)
1529 if err != nil && errors.Is(err, message.ErrHeader) {
1530 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1531 } else if err != nil {
1532 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1539 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1541 return fmt.Errorf("ensuring more headers: %v", err)
1544 mi, err := messageItem(log, m, &state, moreHeaders)
1546 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1548 mil := []MessageItem{mi}
1549 if query.Threading != ThreadOff {
1550 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders)
1552 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1558 mil = append(mil, more...)
1559 v.threadIDs[m.ThreadID] = struct{}{}
1561 // Calculate how many messages the frontend is going to show, and only count those as returned.
1562 collapsed := map[int64]bool{}
1563 for _, mi := range mil {
1564 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1566 unread := map[int64]bool{} // Propagated to thread root.
1567 if query.Threading == ThreadUnread {
1568 for _, mi := range mil {
1573 unread[mm.ID] = true
1574 for _, id := range mm.ThreadParentIDs {
1579 for _, mi := range mil {
1583 for _, id := range mm.ThreadParentIDs {
1584 if _, ok := collapsed[id]; ok {
1589 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1596 if pm != nil && len(pm.envelope.From) == 1 {
1597 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1599 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1602 mrc <- msgResp{mil: mil, pm: pm}
1605 // Check for an error in one of the filters again. Check in ForEach would not
1606 // trigger if the last message has the error.
1607 if err == nil && state.err != nil {
1611 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1615 mrc <- msgResp{viewEnd: true}
1619func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string) ([]MessageItem, *ParsedMessage, error) {
1620 if m.ThreadID == 0 {
1621 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1622 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1625 // Fetch other messages for this thread.
1626 qt := bstore.QueryTx[store.Message](tx)
1627 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1628 qt.FilterEqual("Expunged", false)
1629 qt.FilterNotEqual("ID", m.ID)
1631 tml, err := qt.List()
1633 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1636 var mil []MessageItem
1637 var pm *ParsedMessage
1638 var firstUnread bool
1639 for _, tm := range tml {
1640 err := func() error {
1641 xstate := msgState{acc: acc}
1642 defer xstate.clear()
1644 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1646 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1648 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1652 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1654 mil = append(mil, mi)
1656 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1657 firstUnread = !tm.Seen
1658 xpm, err := parsedMessage(log, tm, &xstate, true, false, false)
1659 if err != nil && errors.Is(err, message.ErrHeader) {
1660 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1661 } else if err != nil {
1662 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1670 return nil, nil, err
1674 // Finally, the message that caused us to gather this thread (which is likely the
1675 // most recent message in the thread) could be the only unread message.
1676 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1677 xstate := msgState{acc: acc}
1678 defer xstate.clear()
1679 xpm, err := parsedMessage(log, m, &xstate, true, false, false)
1680 if err != nil && errors.Is(err, message.ErrHeader) {
1681 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1682 } else if err != nil {
1683 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1692// While checking the filters on a message, we may need to get more message
1693// details as each filter passes. We check the filters that need the basic
1694// information first, and load and cache more details for the next filters.
1695// msgState holds parsed details for a message, it is updated while filtering,
1696// with more information or reset for a next message.
1697type msgState struct {
1698 acc *store.Account // Never changes during lifetime.
1699 err error // Once set, doesn't get cleared.
1701 part *message.Part // Will be without Reader when msgr is nil.
1702 msgr *store.MsgReader
1705func (ms *msgState) clear() {
1710 *ms = msgState{acc: ms.acc, err: ms.err}
1713func (ms *msgState) ensureMsg(m store.Message) {
1714 if m.ID != ms.m.ID {
1720func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1725 if m.ParsedBuf == nil {
1726 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1730 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1731 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1736 if withMsgReader && ms.msgr == nil {
1737 ms.msgr = ms.acc.MessageReader(m)
1738 ms.part.SetReaderAt(ms.msgr)
1741 return ms.part != nil
1744// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1745// filters for a query. A nil function is returned if there are no flags to filter
1747func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1748 labels := map[string]bool{}
1749 for _, k := range q.Filter.Labels {
1752 for _, k := range q.NotFilter.Labels {
1756 if len(labels) == 0 {
1760 var mask, flags store.Flags
1761 systemflags := map[string][]*bool{
1762 `\answered`: {&mask.Answered, &flags.Answered},
1763 `\flagged`: {&mask.Flagged, &flags.Flagged},
1764 `\deleted`: {&mask.Deleted, &flags.Deleted},
1765 `\seen`: {&mask.Seen, &flags.Seen},
1766 `\draft`: {&mask.Draft, &flags.Draft},
1767 `$junk`: {&mask.Junk, &flags.Junk},
1768 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1769 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1770 `$phishing`: {&mask.Phishing, &flags.Phishing},
1771 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1773 keywords := map[string]bool{}
1774 for k, v := range labels {
1775 k = strings.ToLower(k)
1776 if mf, ok := systemflags[k]; ok {
1783 return func(msgFlags store.Flags, msgKeywords []string) bool {
1785 if f.Set(mask, msgFlags) != flags {
1788 for k, v := range keywords {
1789 if slices.Contains(msgKeywords, k) != v {
1797// attachmentFilterFn returns a function that filters for the attachment-related
1798// filter from the query. A nil function is returned if there are attachment
1800func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1801 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1805 return func(m store.Message) bool {
1806 if !state.ensurePart(m, true) {
1809 types, err := attachmentTypes(log, m, state)
1814 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1818var attachmentMimetypes = map[string]AttachmentType{
1819 "application/pdf": AttachmentPDF,
1820 "application/zip": AttachmentArchive,
1821 "application/x-rar-compressed": AttachmentArchive,
1822 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1823 "application/vnd.ms-excel": AttachmentSpreadsheet,
1824 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1825 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1826 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1827 "application/vnd.ms-powerpoint": AttachmentPresentation,
1828 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1830var attachmentExtensions = map[string]AttachmentType{
1831 ".pdf": AttachmentPDF,
1832 ".zip": AttachmentArchive,
1833 ".tar": AttachmentArchive,
1834 ".tgz": AttachmentArchive,
1835 ".tar.gz": AttachmentArchive,
1836 ".tbz2": AttachmentArchive,
1837 ".tar.bz2": AttachmentArchive,
1838 ".tar.lz": AttachmentArchive,
1839 ".tlz": AttachmentArchive,
1840 ".tar.xz": AttachmentArchive,
1841 ".txz": AttachmentArchive,
1842 ".tar.zst": AttachmentArchive,
1843 ".tar.lz4": AttachmentArchive,
1844 ".7z": AttachmentArchive,
1845 ".rar": AttachmentArchive,
1846 ".ods": AttachmentSpreadsheet,
1847 ".xls": AttachmentSpreadsheet,
1848 ".xlsx": AttachmentSpreadsheet,
1849 ".odt": AttachmentDocument,
1850 ".doc": AttachmentDocument,
1851 ".docx": AttachmentDocument,
1852 ".odp": AttachmentPresentation,
1853 ".ppt": AttachmentPresentation,
1854 ".pptx": AttachmentPresentation,
1857func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1858 types := map[AttachmentType]bool{}
1860 pm, err := parsedMessage(log, m, state, false, false, false)
1862 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1864 for _, a := range pm.attachments {
1865 if a.Part.MediaType == "IMAGE" {
1866 types[AttachmentImage] = true
1869 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1870 if t, ok := attachmentMimetypes[mt]; ok {
1874 _, filename, err := a.Part.DispositionFilename()
1875 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1876 log.Debugx("parsing disposition/filename", err)
1877 } else if err != nil {
1878 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1880 if ext := filepath.Ext(filename); ext != "" {
1881 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1887 if len(types) == 0 {
1888 types[AttachmentNone] = true
1890 types[AttachmentAny] = true
1895// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1896// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1897// clash with SMTP envelope) for the query. A nil function is returned if no
1898// filtering is needed.
1899func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1900 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1904 lower := func(l []string) []string {
1908 r := make([]string, len(l))
1909 for i, s := range l {
1910 r[i] = strings.ToLower(s)
1915 filterSubject := lower(q.Filter.Subject)
1916 notFilterSubject := lower(q.NotFilter.Subject)
1917 filterFrom := lower(q.Filter.From)
1918 notFilterFrom := lower(q.NotFilter.From)
1919 filterTo := lower(q.Filter.To)
1920 notFilterTo := lower(q.NotFilter.To)
1922 return func(m store.Message) bool {
1923 if !state.ensurePart(m, false) {
1927 var env message.Envelope
1928 if state.part.Envelope != nil {
1929 env = *state.part.Envelope
1932 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1933 subject := strings.ToLower(env.Subject)
1934 for _, s := range filterSubject {
1935 if !strings.Contains(subject, s) {
1939 for _, s := range notFilterSubject {
1940 if strings.Contains(subject, s) {
1946 contains := func(textLower []string, l []message.Address, all bool) bool {
1948 for _, s := range textLower {
1949 for _, a := range l {
1950 name := strings.ToLower(a.Name)
1951 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1952 if strings.Contains(name, s) || strings.Contains(addr, s) {
1966 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1969 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1972 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1973 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1974 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1977 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1985// headerFilterFn returns a function that filters for the header filters in the
1986// query. A nil function is returned if there are no header filters.
1987func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1988 if len(q.Filter.Headers) == 0 {
1992 lowerValues := make([]string, len(q.Filter.Headers))
1993 for i, t := range q.Filter.Headers {
1994 lowerValues[i] = strings.ToLower(t[1])
1997 return func(m store.Message) bool {
1998 if !state.ensurePart(m, true) {
2001 hdr, err := state.part.Header()
2003 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2008 for i, t := range q.Filter.Headers {
2012 if v == "" && len(l) > 0 {
2015 for _, e := range l {
2016 if strings.Contains(strings.ToLower(e), v) {
2026// wordFiltersFn returns a function that applies the word filters of the query. A
2027// nil function is returned when query does not contain a word filter.
2028func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2029 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2033 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2035 return func(m store.Message) bool {
2036 if !state.ensurePart(m, true) {
2040 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2041 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)