3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
42 SSEID int64 // SSE connection.
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
48 // If set, this request and its view are canceled. A new view must be started.
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
63// Query is a request for messages that match filters, in a given order.
65 OrderAsc bool // Order by received ascending or desending.
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
100 Words []string // Case insensitive substring match for each string.
102 To []string // Including Cc and Bcc.
106 Attachments AttachmentType
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
119 Attachments AttachmentType
123// Page holds pagination parameters for a request.
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
181type ParsedMessage struct {
184 Headers map[string][]string
185 ViewMode store.ViewMode
187 Texts []string // Contents of text parts, can be empty.
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
196 ListReplyAddress *MessageAddress // From List-Post.
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
285 MessageItems []MessageItem
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
351 // Whether message must or must not match mailboxIDs.
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
364// sses tracks all sse connections, and access to them.
371// sse represents an sse connection.
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
382 delete(sses.m, sse.ID)
384 // Drain any pending requests, preventing blocked goroutines from API calls.
394func sseRegister(accountName string) sse {
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
403// sseGet returns a reference to an existing connection if it exists and user
405func sseGet(id int64, accountName string) (sse, bool) {
409 if s.AccountName != accountName {
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
448 n := len(x.accountTokens[accName])
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
466 st, ok := x.tokens[token]
468 return "", "", "", false, nil
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
483 return st.accName, st.address, st.sessionToken, true, nil
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
491// ensure we have a non-nil moreHeaders, taking it from Settings.
492func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
493 if moreHeaders != nil {
494 return moreHeaders, nil
497 s := store.Settings{ID: 1}
498 if err := tx.Get(&s); err != nil {
499 return nil, fmt.Errorf("get settings: %v", err)
501 moreHeaders = s.ShowHeaders
502 if moreHeaders == nil {
503 moreHeaders = []string{} // Ensure we won't get Settings again next call.
505 return moreHeaders, nil
508// serveEvents serves an SSE connection. Authentication is done through a query
509// string parameter "singleUseToken", a one-time-use token returned by the Token
511func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
512 if r.Method != "GET" {
513 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
517 flusher, ok := w.(http.Flusher)
519 log.Error("internal error: ResponseWriter not a http.Flusher")
520 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
525 token := q.Get("singleUseToken")
527 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
530 accName, address, sessionToken, ok, err := sseTokens.check(token)
532 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
536 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
539 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
540 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
544 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
545 // incoming responses with its slow-network similation.
546 var waitMin, waitMax time.Duration
547 waitMinMsec := q.Get("waitMinMsec")
548 waitMaxMsec := q.Get("waitMaxMsec")
549 if waitMinMsec != "" && waitMaxMsec != "" {
550 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
551 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
554 waitMin = time.Duration(v) * time.Millisecond
557 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
558 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
561 waitMax = time.Duration(v) * time.Millisecond
565 // Parse the request with initial mailbox/search criteria.
567 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
568 dec.DisallowUnknownFields()
569 if err := dec.Decode(&req); err != nil {
570 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
572 } else if req.Page.Count <= 0 {
573 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
576 if req.Query.Threading == "" {
577 req.Query.Threading = ThreadOff
580 var writer *eventWriter
582 metricSSEConnections.Inc()
583 defer metricSSEConnections.Dec()
585 // Below here, error handling cause through xcheckf, which panics with
586 // *sherpa.Error, after which we send an error event to the client. We can also get
587 // an *ioErr when the connection is broken.
593 if err, ok := x.(*sherpa.Error); ok {
594 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
595 } else if _, ok := x.(ioErr); ok {
598 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
600 metrics.PanicInc(metrics.Webmail)
606 h.Set("Content-Type", "text/event-stream")
607 h.Set("Cache-Control", "no-cache")
609 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
610 // keys), so should be quite compressible.
612 gz := mox.AcceptsGzip(r)
614 h.Set("Content-Encoding", "gzip")
615 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
619 out = httpFlusher{out, flusher}
621 // We'll be writing outgoing SSE events through writer.
622 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
625 // Fetch initial data.
626 acc, err := store.OpenAccount(log, accName, true)
627 xcheckf(ctx, err, "open account")
630 log.Check(err, "closing account")
632 comm := store.RegisterComm(acc)
633 defer comm.Unregister()
635 // List addresses that the client can use to send email from.
636 accConf, _ := acc.Conf()
637 loginAddr, err := smtp.ParseAddress(address)
638 xcheckf(ctx, err, "parsing login address")
639 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
640 xcheckf(ctx, err, "looking up destination for login address")
641 loginName := accConf.FullName
642 if dest.FullName != "" {
643 loginName = dest.FullName
645 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
646 var addresses []MessageAddress
647 for a, dest := range accConf.Destinations {
648 name := dest.FullName
650 name = accConf.FullName
652 var ma MessageAddress
653 if strings.HasPrefix(a, "@") {
654 dom, err := dns.ParseDomain(a[1:])
655 xcheckf(ctx, err, "parsing destination address for account")
656 ma = MessageAddress{Domain: dom}
658 addr, err := smtp.ParseAddress(a)
659 xcheckf(ctx, err, "parsing destination address for account")
660 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
662 addresses = append(addresses, ma)
664 // User is allowed to send using alias address as message From address. Webmail
665 // will choose it when replying to a message sent to that address.
666 aliasAddrs := map[MessageAddress]bool{}
667 for _, a := range accConf.Aliases {
668 if a.Alias.AllowMsgFrom {
669 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
671 addresses = append(addresses, ma)
673 aliasAddrs[ma] = true
677 // We implicitly start a query. We use the reqctx for the transaction, because the
678 // transaction is passed to the query, which can be canceled.
679 reqctx, reqctxcancel := context.WithCancel(ctx)
681 // We also cancel in cancelDrain later on, but there is a brief window where the
682 // context wouldn't be canceled.
683 if reqctxcancel != nil {
689 // qtx is kept around during connection initialization, until we pass it off to the
690 // goroutine that starts querying for messages.
694 err := qtx.Rollback()
695 log.Check(err, "rolling back")
699 var mbl []store.Mailbox
700 settings := store.Settings{ID: 1}
702 // We only take the rlock when getting the tx.
703 acc.WithRLock(func() {
704 // Now a read-only transaction we'll use during the query.
705 qtx, err = acc.DB.Begin(reqctx, false)
706 xcheckf(ctx, err, "begin transaction")
708 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
709 xcheckf(ctx, err, "list mailboxes")
711 err = qtx.Get(&settings)
712 xcheckf(ctx, err, "get settings")
715 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
716 var zerofilter Filter
717 var zeronotfilter NotFilter
718 var mailbox store.Mailbox
719 var mailboxPrefixes []string
720 var matchMailboxes bool
721 mailboxIDs := map[int64]bool{}
722 mailboxName := req.Query.Filter.MailboxName
723 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
724 if mailboxName == "" {
725 mailboxName = "Inbox"
728 var inbox store.Mailbox
729 for _, e := range mbl {
730 if e.Name == mailboxName {
733 if e.Name == "Inbox" {
741 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
743 req.Query.Filter.MailboxID = mailbox.ID
744 req.Query.Filter.MailboxName = ""
745 mailboxPrefixes = []string{mailbox.Name + "/"}
746 matchMailboxes = true
747 mailboxIDs[mailbox.ID] = true
749 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
751 if req.Query.Filter.MailboxChildrenIncluded {
752 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
755 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
757 sse := sseRegister(acc.Name)
758 defer sse.unregister()
760 // Per-domain localpart config so webclient can decide if an address belongs to the account.
761 domainAddressConfigs := map[string]DomainAddressConfig{}
762 for _, a := range addresses {
763 dom, _ := mox.Conf.Domain(a.Domain)
764 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
767 // Write first event, allowing client to fill its UI with mailboxes.
768 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
769 writer.xsendEvent(ctx, log, "start", start)
771 // The goroutine doing the querying will send messages on these channels, which
772 // result in an event being written on the SSE connection.
773 viewMsgsc := make(chan EventViewMsgs)
774 viewErrc := make(chan EventViewErr)
775 viewResetc := make(chan EventViewReset)
776 donec := make(chan int64) // When request is done.
778 // Start a view, it determines if we send a change to the client. And start an
779 // implicit query for messages, we'll send the messages to the client which can
780 // fill its ui with messages.
781 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
782 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
783 qtx = nil // viewRequestTx closes qtx
785 // When canceling a query, we must drain its messages until it says it is done.
786 // Otherwise the sending goroutine would hang indefinitely on a channel send.
787 cancelDrain := func() {
788 if reqctxcancel != nil {
789 // Cancel the goroutine doing the querying.
797 // Drain events until done.
809 // If we stop and a query is in progress, we must drain the channel it will send on.
812 // Changes broadcasted by other connections on this account. If applicable for the
813 // connection/view, we send events.
814 xprocessChanges := func(changes []store.Change) {
815 taggedChanges := [][2]any{}
817 newPreviews := map[int64]string{}
818 defer storeNewPreviews(ctx, log, acc, newPreviews)
820 // We get a transaction first time we need it.
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
828 ensureTx := func() error {
835 xtx, err = acc.DB.Begin(ctx, false)
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
851 xcheckf(ctx, err, "transaction")
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
878 xcheckf(ctx, err, "fetching messages for change")
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
895 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
898 xcheckf(ctx, err, "make messageitem")
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
918 case store.ChangeRemoveUIDs:
921 // We may send changes for uids the client doesn't know, that's fine.
922 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
923 if len(changedUIDs) == 0 {
926 ch := ChangeMsgRemove{c}
927 ch.UIDs = changedUIDs
928 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
930 case store.ChangeFlags:
931 // We may send changes for uids the client doesn't know, that's fine.
932 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
933 if len(changedUIDs) == 0 {
936 ch := ChangeMsgFlags{c}
937 ch.UID = changedUIDs[0]
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
940 case store.ChangeThread:
941 // Change in muted/collaped state, just always ship it.
942 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
944 case store.ChangeRemoveMailbox:
945 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
947 case store.ChangeAddMailbox:
948 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
950 case store.ChangeRenameMailbox:
951 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
953 case store.ChangeMailboxCounts:
954 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
956 case store.ChangeMailboxSpecialUse:
957 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
959 case store.ChangeMailboxKeywords:
960 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
962 case store.ChangeAddSubscription:
963 // Webmail does not care about subscriptions.
965 case store.ChangeAnnotation:
969 panic(fmt.Sprintf("missing case for change %T", c))
973 if len(taggedChanges) > 0 {
974 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
975 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
979 timer := time.NewTimer(5 * time.Minute) // For keepalives.
983 timer.Reset(5 * time.Minute)
987 pending := comm.Pending
993 case <-mox.Shutdown.Done():
994 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
995 // Work around go vet, it doesn't see defer cancelDrain.
996 if reqctxcancel != nil {
1002 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1007 log.Errorx("write keepalive", err)
1008 // Work around go vet, it doesn't see defer cancelDrain.
1009 if reqctxcancel != nil {
1016 case vm := <-viewMsgsc:
1017 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1018 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1023 if len(vm.MessageItems) > 0 {
1024 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1026 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1028 case ve := <-viewErrc:
1029 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1030 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1032 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1033 // Work around go vet, it doesn't see defer cancelDrain.
1034 if reqctxcancel != nil {
1039 writer.xsendEvent(ctx, log, "viewErr", ve)
1041 case vr := <-viewResetc:
1042 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1043 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1045 writer.xsendEvent(ctx, log, "viewReset", vr)
1048 if id != v.Request.ID {
1049 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1051 if reqctxcancel != nil {
1057 case req := <-sse.Request:
1062 v = view{req, time.Time{}, false, false, nil, nil}
1066 reqctx, reqctxcancel = context.WithCancel(ctx)
1068 stop := func() (stop bool) {
1069 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1074 err = rtx.Rollback()
1075 log.Check(err, "rolling back transaction")
1078 acc.WithRLock(func() {
1079 rtx, err = acc.DB.Begin(reqctx, false)
1086 if errors.Is(err, context.Canceled) {
1089 err := fmt.Errorf("begin transaction: %v", err)
1090 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1091 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1095 // Reset view state for new query.
1096 if req.ViewID != v.Request.ViewID {
1097 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1098 if req.Query.Filter.MailboxChildrenIncluded {
1099 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1101 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1105 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1114 xprocessChanges(comm.Get())
1117 // Work around go vet, it doesn't see defer cancelDrain.
1118 if reqctxcancel != nil {
1126// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1127// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1128// mailboxIDs must or must not match. mailboxPrefixes is for use with
1129// xgatherMailboxIDs to gather children of the mailboxIDs.
1130func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1131 matchMailboxes = true
1132 mailboxIDs = map[int64]bool{}
1133 if f.MailboxID == -1 {
1134 matchMailboxes = false
1135 // Add the trash, junk and account rejects mailbox.
1136 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1137 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1138 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1139 mailboxIDs[mb.ID] = true
1143 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1144 } else if f.MailboxID > 0 {
1145 mb, err := store.MailboxID(tx, f.MailboxID)
1146 xcheckf(ctx, err, "get mailbox")
1147 mailboxIDs[f.MailboxID] = true
1148 mailboxPrefixes = []string{mb.Name + "/"}
1153// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1154// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1155func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1156 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1157 if len(mailboxPrefixes) == 0 {
1160 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1161 for _, p := range mailboxPrefixes {
1162 if strings.HasPrefix(mb.Name, p) {
1163 mailboxIDs[mb.ID] = true
1169 xcheckf(ctx, err, "gathering mailboxes")
1172// matchesMailbox returns whether a mailbox matches the view.
1173func (v view) matchesMailbox(mailboxID int64) bool {
1174 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1177// inRange returns whether m is within the range for the view, whether a change for
1178// this message should be sent to the client so it can update its state.
1179func (v view) inRange(m store.Message) bool {
1180 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1183// matches checks if the message, identified by either messageID or mailboxID+UID,
1184// is in the current "view" (i.e. passing the filters, and if checkRange is set
1185// also if within the range of sent messages based on sort order and the last seen
1186// message). getmsg retrieves the message, which may be necessary depending on the
1187// active filters. Used to determine if a store.Change with a new message should be
1188// sent, and for the destination and anchor messages in view requests.
1189func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1191 ensureMessage := func() bool {
1192 if m.ID == 0 && rerr == nil {
1193 m, rerr = getmsg(messageID, mailboxID, uid)
1198 q := v.Request.Query
1200 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1203 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1206 // note: anchorMessageID is not relevant for matching.
1207 flagfilter := q.flagFilterFn()
1208 if flagfilter != nil && !flagfilter(flags, keywords) {
1212 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1215 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1219 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1222 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1226 state := msgState{acc: acc, log: log}
1228 if rerr == nil && state.err != nil {
1234 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1235 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1239 envFilter := q.envFilterFn(log, &state)
1240 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1244 headerFilter := q.headerFilterFn(log, &state)
1245 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1249 wordsFilter := q.wordsFilterFn(log, &state)
1250 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1254 // Now check that we are either within the sorting order, or "last" was sent.
1255 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1261type msgResp struct {
1262 err error // If set, an error happened and fields below are not set.
1263 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1264 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1265 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1266 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1269func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1270 if len(newPreviews) == 0 {
1277 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1279 metrics.PanicInc(metrics.Store)
1283 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1284 for id, preview := range newPreviews {
1285 m := store.Message{ID: id}
1286 if err := tx.Get(&m); err != nil {
1287 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1288 } else if !m.Expunged {
1289 m.Preview = &preview
1290 if err := tx.Update(&m); err != nil {
1291 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1297 log.Check(err, "saving new previews with messages")
1300// viewRequestTx executes a request (query with filters, pagination) by
1301// launching a new goroutine with queryMessages, receiving results as msgResp,
1302// and sending Event* to the SSE connection.
1304// It always closes tx.
1305func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1306 // Newly generated previews which we'll save when the operation is done.
1307 newPreviews := map[int64]string{}
1310 err := tx.Rollback()
1311 log.Check(err, "rolling back query transaction")
1313 donec <- v.Request.ID
1315 // ctx can be canceled, we still want to store the previews.
1316 storeNewPreviews(context.Background(), log, acc, newPreviews)
1318 x := recover() // Should not happen, but don't take program down if it does.
1320 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1322 metrics.PanicInc(metrics.Webmailrequest)
1326 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1327 var parsedMessage *ParsedMessage
1330 var immediate bool // No waiting, flush immediate.
1331 t := time.NewTimer(300 * time.Millisecond)
1334 sendViewMsgs := func(force bool) {
1335 if len(msgitems) == 0 && !force {
1340 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1343 t.Reset(300 * time.Millisecond)
1346 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1348 mrc := make(chan msgResp, 1)
1349 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1353 case mr, ok := <-mrc:
1356 // Empty message list signals this query is done.
1357 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1362 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1366 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1375 msgitems = append(msgitems, mr.mil)
1377 parsedMessage = mr.pm
1384 if len(msgitems) == 0 {
1385 // Nothing to send yet. We'll send immediately when the next message comes in.
1394// queryMessages executes a query, with filter, pagination, destination message id
1395// to fetch (the message that the client had in view and wants to display again).
1396// It sends on msgc, with several types of messages: errors, whether the view is
1397// reset due to missing AnchorMessageID, and when the end of the view was reached
1398// and/or for a message.
1399// newPreviews is filled with previews, the caller must save them.
1400func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1402 x := recover() // Should not happen, but don't take program down if it does.
1404 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1406 mrc <- msgResp{err: fmt.Errorf("query failed")}
1407 metrics.PanicInc(metrics.Webmailquery)
1413 query := v.Request.Query
1414 page := v.Request.Page
1416 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1418 checkMessage := func(id int64) (valid bool, rerr error) {
1419 m := store.Message{ID: id}
1421 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1423 } else if err != nil {
1426 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1432 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1433 if page.AnchorMessageID > 0 {
1434 // Check if message exists and (still) matches the filter.
1435 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1436 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1437 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1440 mrc <- msgResp{reset: true}
1441 page.AnchorMessageID = 0
1445 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1446 // it instead of continuing to send message till the end of the view.
1447 if page.DestMessageID > 0 {
1448 if valid, err := checkMessage(page.DestMessageID); err != nil {
1449 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1452 page.DestMessageID = 0
1456 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1458 q := bstore.QueryTx[store.Message](tx)
1459 q.FilterEqual("Expunged", false)
1460 if len(v.mailboxIDs) > 0 {
1461 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1462 // Should result in fast indexed query.
1463 for mbID := range v.mailboxIDs {
1464 q.FilterNonzero(store.Message{MailboxID: mbID})
1467 idsAny := make([]any, 0, len(v.mailboxIDs))
1468 for mbID := range v.mailboxIDs {
1469 idsAny = append(idsAny, mbID)
1471 if v.matchMailboxIDs {
1472 q.FilterEqual("MailboxID", idsAny...)
1474 q.FilterNotEqual("MailboxID", idsAny...)
1479 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1480 if page.AnchorMessageID > 0 {
1482 q.FilterFn(func(m store.Message) bool {
1486 seen = m.ID == page.AnchorMessageID
1491 // We may be added filters the the query below. The FilterFn signature does not
1492 // implement reporting errors, or anything else, just a bool. So when making the
1493 // filter functions, we give them a place to store parsed message state, and an
1494 // error. We check the error during and after query execution.
1495 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1498 flagfilter := query.flagFilterFn()
1499 if flagfilter != nil {
1500 q.FilterFn(func(m store.Message) bool {
1501 return flagfilter(m.Flags, m.Keywords)
1505 if query.Filter.Oldest != nil {
1506 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1508 if query.Filter.Newest != nil {
1509 q.FilterLessEqual("Received", *query.Filter.Newest)
1512 if query.Filter.SizeMin > 0 {
1513 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1515 if query.Filter.SizeMax > 0 {
1516 q.FilterLessEqual("Size", query.Filter.SizeMax)
1519 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1520 if attachmentFilter != nil {
1521 q.FilterFn(attachmentFilter)
1524 envFilter := query.envFilterFn(log, &state)
1525 if envFilter != nil {
1526 q.FilterFn(envFilter)
1529 headerFilter := query.headerFilterFn(log, &state)
1530 if headerFilter != nil {
1531 q.FilterFn(headerFilter)
1534 wordsFilter := query.wordsFilterFn(log, &state)
1535 if wordsFilter != nil {
1536 q.FilterFn(wordsFilter)
1539 var moreHeaders []string // From store.Settings.ShowHeaders
1542 q.SortAsc("Received")
1544 q.SortDesc("Received")
1546 found := page.DestMessageID <= 0
1549 err := q.ForEach(func(m store.Message) error {
1550 // Check for an error in one of the filters, propagate it.
1551 if state.err != nil {
1555 if have >= page.Count && found || have > 10000 {
1557 return bstore.StopForEach
1560 if _, ok := v.threadIDs[m.ThreadID]; ok {
1561 // Message was already returned as part of a thread.
1565 var pm *ParsedMessage
1566 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1567 // For threads, if there was no DestMessageID, we may be getting the newest
1568 // message. For an initial view, this isn't necessarily the first the user is
1569 // expected to read first, that would be the first unread, which we'll get below
1570 // when gathering the thread.
1572 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1573 if err != nil && errors.Is(err, message.ErrHeader) {
1574 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1575 } else if err != nil {
1576 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1583 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1585 return fmt.Errorf("ensuring more headers: %v", err)
1588 mi, err := messageItem(log, m, &state, moreHeaders)
1590 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1592 mil := []MessageItem{mi}
1593 if query.Threading != ThreadOff {
1594 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1596 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1602 mil = append(mil, more...)
1603 v.threadIDs[m.ThreadID] = struct{}{}
1605 // Calculate how many messages the frontend is going to show, and only count those as returned.
1606 collapsed := map[int64]bool{}
1607 for _, mi := range mil {
1608 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1610 unread := map[int64]bool{} // Propagated to thread root.
1611 if query.Threading == ThreadUnread {
1612 for _, mi := range mil {
1617 unread[mm.ID] = true
1618 for _, id := range mm.ThreadParentIDs {
1623 for _, mi := range mil {
1627 for _, id := range mm.ThreadParentIDs {
1628 if _, ok := collapsed[id]; ok {
1633 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1640 if pm != nil && len(pm.envelope.From) == 1 {
1641 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1643 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1646 mrc <- msgResp{mil: mil, pm: pm}
1649 // Check for an error in one of the filters again. Check in ForEach would not
1650 // trigger if the last message has the error.
1651 if err == nil && state.err != nil {
1655 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1659 mrc <- msgResp{viewEnd: true}
1663func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1664 if m.ThreadID == 0 {
1665 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1666 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1669 // Fetch other messages for this thread.
1670 qt := bstore.QueryTx[store.Message](tx)
1671 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1672 qt.FilterEqual("Expunged", false)
1673 qt.FilterNotEqual("ID", m.ID)
1675 tml, err := qt.List()
1677 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1680 var mil []MessageItem
1681 var pm *ParsedMessage
1682 var firstUnread bool
1683 for _, tm := range tml {
1684 err := func() error {
1685 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1686 defer xstate.clear()
1688 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1690 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1692 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1696 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1698 mil = append(mil, mi)
1700 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1701 firstUnread = !tm.Seen
1702 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1703 if err != nil && errors.Is(err, message.ErrHeader) {
1704 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1705 } else if err != nil {
1706 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1714 return nil, nil, err
1718 // Finally, the message that caused us to gather this thread (which is likely the
1719 // most recent message in the thread) could be the only unread message.
1720 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1721 xstate := msgState{acc: acc, log: log}
1722 defer xstate.clear()
1723 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1724 if err != nil && errors.Is(err, message.ErrHeader) {
1725 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1726 } else if err != nil {
1727 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1736// While checking the filters on a message, we may need to get more message
1737// details as each filter passes. We check the filters that need the basic
1738// information first, and load and cache more details for the next filters.
1739// msgState holds parsed details for a message, it is updated while filtering,
1740// with more information or reset for a next message.
1741type msgState struct {
1742 acc *store.Account // Never changes during lifetime.
1743 err error // Once set, doesn't get cleared.
1745 part *message.Part // Will be without Reader when msgr is nil.
1746 msgr *store.MsgReader
1749 // If not nil, messages will get their Preview field filled when nil, and message
1750 // id and preview added to newPreviews, and saved in a separate write transaction
1751 // when the operation is done.
1752 newPreviews map[int64]string
1755func (ms *msgState) clear() {
1757 err := ms.msgr.Close()
1758 ms.log.Check(err, "closing message reader from state")
1761 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1764func (ms *msgState) ensureMsg(m store.Message) {
1765 if m.ID != ms.m.ID {
1771func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1776 if m.ParsedBuf == nil {
1777 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1781 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1782 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1787 if withMsgReader && ms.msgr == nil {
1788 ms.msgr = ms.acc.MessageReader(m)
1789 ms.part.SetReaderAt(ms.msgr)
1792 return ms.part != nil
1795// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1796// filters for a query. A nil function is returned if there are no flags to filter
1798func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1799 labels := map[string]bool{}
1800 for _, k := range q.Filter.Labels {
1803 for _, k := range q.NotFilter.Labels {
1807 if len(labels) == 0 {
1811 var mask, flags store.Flags
1812 systemflags := map[string][]*bool{
1813 `\answered`: {&mask.Answered, &flags.Answered},
1814 `\flagged`: {&mask.Flagged, &flags.Flagged},
1815 `\deleted`: {&mask.Deleted, &flags.Deleted},
1816 `\seen`: {&mask.Seen, &flags.Seen},
1817 `\draft`: {&mask.Draft, &flags.Draft},
1818 `$junk`: {&mask.Junk, &flags.Junk},
1819 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1820 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1821 `$phishing`: {&mask.Phishing, &flags.Phishing},
1822 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1824 keywords := map[string]bool{}
1825 for k, v := range labels {
1826 k = strings.ToLower(k)
1827 if mf, ok := systemflags[k]; ok {
1834 return func(msgFlags store.Flags, msgKeywords []string) bool {
1836 if f.Set(mask, msgFlags) != flags {
1839 for k, v := range keywords {
1840 if slices.Contains(msgKeywords, k) != v {
1848// attachmentFilterFn returns a function that filters for the attachment-related
1849// filter from the query. A nil function is returned if there are attachment
1851func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1852 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1856 return func(m store.Message) bool {
1857 if !state.ensurePart(m, true) {
1860 types, err := attachmentTypes(log, m, state)
1865 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1869var attachmentMimetypes = map[string]AttachmentType{
1870 "application/pdf": AttachmentPDF,
1871 "application/zip": AttachmentArchive,
1872 "application/x-rar-compressed": AttachmentArchive,
1873 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1874 "application/vnd.ms-excel": AttachmentSpreadsheet,
1875 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1876 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1877 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1878 "application/vnd.ms-powerpoint": AttachmentPresentation,
1879 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1881var attachmentExtensions = map[string]AttachmentType{
1882 ".pdf": AttachmentPDF,
1883 ".zip": AttachmentArchive,
1884 ".tar": AttachmentArchive,
1885 ".tgz": AttachmentArchive,
1886 ".tar.gz": AttachmentArchive,
1887 ".tbz2": AttachmentArchive,
1888 ".tar.bz2": AttachmentArchive,
1889 ".tar.lz": AttachmentArchive,
1890 ".tlz": AttachmentArchive,
1891 ".tar.xz": AttachmentArchive,
1892 ".txz": AttachmentArchive,
1893 ".tar.zst": AttachmentArchive,
1894 ".tar.lz4": AttachmentArchive,
1895 ".7z": AttachmentArchive,
1896 ".rar": AttachmentArchive,
1897 ".ods": AttachmentSpreadsheet,
1898 ".xls": AttachmentSpreadsheet,
1899 ".xlsx": AttachmentSpreadsheet,
1900 ".odt": AttachmentDocument,
1901 ".doc": AttachmentDocument,
1902 ".docx": AttachmentDocument,
1903 ".odp": AttachmentPresentation,
1904 ".ppt": AttachmentPresentation,
1905 ".pptx": AttachmentPresentation,
1908func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1909 types := map[AttachmentType]bool{}
1911 pm, err := parsedMessage(log, &m, state, false, false, false)
1913 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1915 for _, a := range pm.attachments {
1916 if a.Part.MediaType == "IMAGE" {
1917 types[AttachmentImage] = true
1920 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1921 if t, ok := attachmentMimetypes[mt]; ok {
1925 _, filename, err := a.Part.DispositionFilename()
1926 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1927 log.Debugx("parsing disposition/filename", err)
1928 } else if err != nil {
1929 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1931 if ext := filepath.Ext(filename); ext != "" {
1932 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1938 if len(types) == 0 {
1939 types[AttachmentNone] = true
1941 types[AttachmentAny] = true
1946// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1947// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1948// clash with SMTP envelope) for the query. A nil function is returned if no
1949// filtering is needed.
1950func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1951 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1955 lower := func(l []string) []string {
1959 r := make([]string, len(l))
1960 for i, s := range l {
1961 r[i] = strings.ToLower(s)
1966 filterSubject := lower(q.Filter.Subject)
1967 notFilterSubject := lower(q.NotFilter.Subject)
1968 filterFrom := lower(q.Filter.From)
1969 notFilterFrom := lower(q.NotFilter.From)
1970 filterTo := lower(q.Filter.To)
1971 notFilterTo := lower(q.NotFilter.To)
1973 return func(m store.Message) bool {
1974 if !state.ensurePart(m, false) {
1978 var env message.Envelope
1979 if state.part.Envelope != nil {
1980 env = *state.part.Envelope
1983 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1984 subject := strings.ToLower(env.Subject)
1985 for _, s := range filterSubject {
1986 if !strings.Contains(subject, s) {
1990 for _, s := range notFilterSubject {
1991 if strings.Contains(subject, s) {
1997 contains := func(textLower []string, l []message.Address, all bool) bool {
1999 for _, s := range textLower {
2000 for _, a := range l {
2001 name := strings.ToLower(a.Name)
2002 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2003 if strings.Contains(name, s) || strings.Contains(addr, s) {
2017 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2020 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2023 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2024 to := slices.Concat(env.To, env.CC, env.BCC)
2025 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2028 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2036// headerFilterFn returns a function that filters for the header filters in the
2037// query. A nil function is returned if there are no header filters.
2038func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2039 if len(q.Filter.Headers) == 0 {
2043 lowerValues := make([]string, len(q.Filter.Headers))
2044 for i, t := range q.Filter.Headers {
2045 lowerValues[i] = strings.ToLower(t[1])
2048 return func(m store.Message) bool {
2049 if !state.ensurePart(m, true) {
2052 hdr, err := state.part.Header()
2054 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2059 for i, t := range q.Filter.Headers {
2063 if v == "" && len(l) > 0 {
2066 for _, e := range l {
2067 if strings.Contains(strings.ToLower(e), v) {
2077// wordFiltersFn returns a function that applies the word filters of the query. A
2078// nil function is returned when query does not contain a word filter.
2079func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2080 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2084 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2086 return func(m store.Message) bool {
2087 if !state.ensurePart(m, true) {
2091 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2092 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)