3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
43 SSEID int64 // SSE connection.
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
49 // If set, this request and its view are canceled. A new view must be started.
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
64// Query is a request for messages that match filters, in a given order.
66 OrderAsc bool // Order by received ascending or desending.
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
101 Words []string // Case insensitive substring match for each string.
103 To []string // Including Cc and Bcc.
107 Attachments AttachmentType
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
120 Attachments AttachmentType
124// Page holds pagination parameters for a request.
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
182type ParsedMessage struct {
185 Headers map[string][]string
186 ViewMode store.ViewMode
188 // Text parts, can be empty.
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
198 ListReplyAddress *MessageAddress // From List-Post.
200 // Information used by MessageItem, not exported in this type.
201 envelope MessageEnvelope
202 attachments []Attachment
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparator string // Can be empty.
229 LocalpartCaseSensitive bool
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
285 MessageItems []MessageItem
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
351 // Whether message must or must not match mailboxIDs.
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
364// sses tracks all sse connections, and access to them.
371// sse represents an sse connection.
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
382 delete(sses.m, sse.ID)
384 // Drain any pending requests, preventing blocked goroutines from API calls.
394func sseRegister(accountName string) sse {
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
403// sseGet returns a reference to an existing connection if it exists and user
405func sseGet(id int64, accountName string) (sse, bool) {
409 if s.AccountName != accountName {
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
448 n := len(x.accountTokens[accName])
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
466 st, ok := x.tokens[token]
468 return "", "", "", false, nil
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
483 return st.accName, st.address, st.sessionToken, true, nil
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
491// serveEvents serves an SSE connection. Authentication is done through a query
492// string parameter "singleUseToken", a one-time-use token returned by the Token
494func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
495 if r.Method != "GET" {
496 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
500 flusher, ok := w.(http.Flusher)
502 log.Error("internal error: ResponseWriter not a http.Flusher")
503 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
508 token := q.Get("singleUseToken")
510 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
513 accName, address, sessionToken, ok, err := sseTokens.check(token)
515 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
519 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
522 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
523 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
527 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
528 // incoming responses with its slow-network similation.
529 var waitMin, waitMax time.Duration
530 waitMinMsec := q.Get("waitMinMsec")
531 waitMaxMsec := q.Get("waitMaxMsec")
532 if waitMinMsec != "" && waitMaxMsec != "" {
533 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
534 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
537 waitMin = time.Duration(v) * time.Millisecond
540 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
541 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
544 waitMax = time.Duration(v) * time.Millisecond
548 // Parse the request with initial mailbox/search criteria.
550 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
551 dec.DisallowUnknownFields()
552 if err := dec.Decode(&req); err != nil {
553 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
555 } else if req.Page.Count <= 0 {
556 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
559 if req.Query.Threading == "" {
560 req.Query.Threading = ThreadOff
563 var writer *eventWriter
565 metricSSEConnections.Inc()
566 defer metricSSEConnections.Dec()
568 // Below here, error handling cause through xcheckf, which panics with
569 // *sherpa.Error, after which we send an error event to the client. We can also get
570 // an *ioErr when the connection is broken.
576 if err, ok := x.(*sherpa.Error); ok {
577 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
578 } else if _, ok := x.(ioErr); ok {
581 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
583 metrics.PanicInc(metrics.Webmail)
589 h.Set("Content-Type", "text/event-stream")
590 h.Set("Cache-Control", "no-cache")
592 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
593 // keys), so should be quite compressible.
595 gz := mox.AcceptsGzip(r)
597 h.Set("Content-Encoding", "gzip")
598 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
602 out = httpFlusher{out, flusher}
604 // We'll be writing outgoing SSE events through writer.
605 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
608 // Fetch initial data.
609 acc, err := store.OpenAccount(log, accName)
610 xcheckf(ctx, err, "open account")
613 log.Check(err, "closing account")
615 comm := store.RegisterComm(acc)
616 defer comm.Unregister()
618 // List addresses that the client can use to send email from.
619 accConf, _ := acc.Conf()
620 loginAddr, err := smtp.ParseAddress(address)
621 xcheckf(ctx, err, "parsing login address")
622 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
623 xcheckf(ctx, err, "looking up destination for login address")
624 loginName := accConf.FullName
625 if dest.FullName != "" {
626 loginName = dest.FullName
628 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
629 var addresses []MessageAddress
630 for a, dest := range accConf.Destinations {
631 name := dest.FullName
633 name = accConf.FullName
635 var ma MessageAddress
636 if strings.HasPrefix(a, "@") {
637 dom, err := dns.ParseDomain(a[1:])
638 xcheckf(ctx, err, "parsing destination address for account")
639 ma = MessageAddress{Domain: dom}
641 addr, err := smtp.ParseAddress(a)
642 xcheckf(ctx, err, "parsing destination address for account")
643 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
645 addresses = append(addresses, ma)
647 // User is allowed to send using alias address as message From address. Webmail
648 // will choose it when replying to a message sent to that address.
649 aliasAddrs := map[MessageAddress]bool{}
650 for _, a := range accConf.Aliases {
651 if a.Alias.AllowMsgFrom {
652 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
654 addresses = append(addresses, ma)
656 aliasAddrs[ma] = true
660 // We implicitly start a query. We use the reqctx for the transaction, because the
661 // transaction is passed to the query, which can be canceled.
662 reqctx, reqctxcancel := context.WithCancel(ctx)
664 // We also cancel in cancelDrain later on, but there is a brief window where the
665 // context wouldn't be canceled.
666 if reqctxcancel != nil {
672 // qtx is kept around during connection initialization, until we pass it off to the
673 // goroutine that starts querying for messages.
677 err := qtx.Rollback()
678 log.Check(err, "rolling back")
682 var mbl []store.Mailbox
683 settings := store.Settings{ID: 1}
685 // We only take the rlock when getting the tx.
686 acc.WithRLock(func() {
687 // Now a read-only transaction we'll use during the query.
688 qtx, err = acc.DB.Begin(reqctx, false)
689 xcheckf(ctx, err, "begin transaction")
691 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
692 xcheckf(ctx, err, "list mailboxes")
694 err = qtx.Get(&settings)
695 xcheckf(ctx, err, "get settings")
698 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
699 var zerofilter Filter
700 var zeronotfilter NotFilter
701 var mailbox store.Mailbox
702 var mailboxPrefixes []string
703 var matchMailboxes bool
704 mailboxIDs := map[int64]bool{}
705 mailboxName := req.Query.Filter.MailboxName
706 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
707 if mailboxName == "" {
708 mailboxName = "Inbox"
711 var inbox store.Mailbox
712 for _, e := range mbl {
713 if e.Name == mailboxName {
716 if e.Name == "Inbox" {
724 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
726 req.Query.Filter.MailboxID = mailbox.ID
727 req.Query.Filter.MailboxName = ""
728 mailboxPrefixes = []string{mailbox.Name + "/"}
729 matchMailboxes = true
730 mailboxIDs[mailbox.ID] = true
732 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
734 if req.Query.Filter.MailboxChildrenIncluded {
735 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
738 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
740 sse := sseRegister(acc.Name)
741 defer sse.unregister()
743 // Per-domain localpart config so webclient can decide if an address belongs to the account.
744 domainAddressConfigs := map[string]DomainAddressConfig{}
745 for _, a := range addresses {
746 dom, _ := mox.Conf.Domain(a.Domain)
747 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
750 // Write first event, allowing client to fill its UI with mailboxes.
751 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
752 writer.xsendEvent(ctx, log, "start", start)
754 // The goroutine doing the querying will send messages on these channels, which
755 // result in an event being written on the SSE connection.
756 viewMsgsc := make(chan EventViewMsgs)
757 viewErrc := make(chan EventViewErr)
758 viewResetc := make(chan EventViewReset)
759 donec := make(chan int64) // When request is done.
761 // Start a view, it determines if we send a change to the client. And start an
762 // implicit query for messages, we'll send the messages to the client which can
763 // fill its ui with messages.
764 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
765 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
766 qtx = nil // viewRequestTx closes qtx
768 // When canceling a query, we must drain its messages until it says it is done.
769 // Otherwise the sending goroutine would hang indefinitely on a channel send.
770 cancelDrain := func() {
771 if reqctxcancel != nil {
772 // Cancel the goroutine doing the querying.
780 // Drain events until done.
792 // If we stop and a query is in progress, we must drain the channel it will send on.
795 // Changes broadcasted by other connections on this account. If applicable for the
796 // connection/view, we send events.
797 xprocessChanges := func(changes []store.Change) {
798 taggedChanges := [][2]any{}
800 // We get a transaction first time we need it.
804 err := xtx.Rollback()
805 log.Check(err, "rolling back transaction")
808 ensureTx := func() error {
815 xtx, err = acc.DB.Begin(ctx, false)
818 // This getmsg will now only be called mailboxID+UID, not with messageID set.
819 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
820 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
821 if err := ensureTx(); err != nil {
822 return store.Message{}, fmt.Errorf("transaction: %v", err)
824 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
827 // Return uids that are within range in view. Because the end has been reached, or
828 // because the UID is not after the last message.
829 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
830 uidsAny := make([]any, len(uids))
831 for i, uid := range uids {
835 xcheckf(ctx, err, "transaction")
836 q := bstore.QueryTx[store.Message](xtx)
837 q.FilterNonzero(store.Message{MailboxID: mailboxID})
838 q.FilterEqual("UID", uidsAny...)
839 mbOK := v.matchesMailbox(mailboxID)
840 err = q.ForEach(func(m store.Message) error {
841 _, thread := v.threadIDs[m.ThreadID]
842 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
843 changedUIDs = append(changedUIDs, m.UID)
847 xcheckf(ctx, err, "fetching messages for change")
851 // Forward changes that are relevant to the current view.
852 for _, change := range changes {
853 switch c := change.(type) {
854 case store.ChangeAddUID:
855 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
856 xcheckf(ctx, err, "matching new message against view")
857 m, err := getmsg(0, c.MailboxID, c.UID)
858 xcheckf(ctx, err, "get message")
859 _, thread := v.threadIDs[m.ThreadID]
863 state := msgState{acc: acc}
864 mi, err := messageItem(log, m, &state)
866 xcheckf(ctx, err, "make messageitem")
869 mil := []MessageItem{mi}
870 if !thread && req.Query.Threading != ThreadOff {
872 xcheckf(ctx, err, "transaction")
873 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
874 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
875 mil = append(mil, more...)
876 v.threadIDs[m.ThreadID] = struct{}{}
879 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
881 // If message extends the view, store it as such.
882 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
883 v.LastMessageReceived = m.Received
886 case store.ChangeRemoveUIDs:
887 // We may send changes for uids the client doesn't know, that's fine.
888 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
889 if len(changedUIDs) == 0 {
892 ch := ChangeMsgRemove{c}
893 ch.UIDs = changedUIDs
894 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
896 case store.ChangeFlags:
897 // We may send changes for uids the client doesn't know, that's fine.
898 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
899 if len(changedUIDs) == 0 {
902 ch := ChangeMsgFlags{c}
903 ch.UID = changedUIDs[0]
904 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
906 case store.ChangeThread:
907 // Change in muted/collaped state, just always ship it.
908 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
910 case store.ChangeRemoveMailbox:
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
913 case store.ChangeAddMailbox:
914 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
916 case store.ChangeRenameMailbox:
917 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
919 case store.ChangeMailboxCounts:
920 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
922 case store.ChangeMailboxSpecialUse:
923 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
925 case store.ChangeMailboxKeywords:
926 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
928 case store.ChangeAddSubscription:
929 // Webmail does not care about subscriptions.
932 panic(fmt.Sprintf("missing case for change %T", c))
936 if len(taggedChanges) > 0 {
937 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
938 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
942 timer := time.NewTimer(5 * time.Minute) // For keepalives.
946 timer.Reset(5 * time.Minute)
950 pending := comm.Pending
956 case <-mox.Shutdown.Done():
957 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
958 // Work around go vet, it doesn't see defer cancelDrain.
959 if reqctxcancel != nil {
965 _, err := fmt.Fprintf(out, ": keepalive\n\n")
967 log.Errorx("write keepalive", err)
968 // Work around go vet, it doesn't see defer cancelDrain.
969 if reqctxcancel != nil {
977 case vm := <-viewMsgsc:
978 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
979 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
984 if len(vm.MessageItems) > 0 {
985 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
987 writer.xsendEvent(ctx, log, "viewMsgs", vm)
989 case ve := <-viewErrc:
990 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
991 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
993 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
994 // Work around go vet, it doesn't see defer cancelDrain.
995 if reqctxcancel != nil {
1000 writer.xsendEvent(ctx, log, "viewErr", ve)
1002 case vr := <-viewResetc:
1003 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1004 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1006 writer.xsendEvent(ctx, log, "viewReset", vr)
1009 if id != v.Request.ID {
1010 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1012 if reqctxcancel != nil {
1018 case req := <-sse.Request:
1023 v = view{req, time.Time{}, false, false, nil, nil}
1027 reqctx, reqctxcancel = context.WithCancel(ctx)
1029 stop := func() (stop bool) {
1030 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1035 err = rtx.Rollback()
1036 log.Check(err, "rolling back transaction")
1039 acc.WithRLock(func() {
1040 rtx, err = acc.DB.Begin(reqctx, false)
1047 if errors.Is(err, context.Canceled) {
1050 err := fmt.Errorf("begin transaction: %v", err)
1051 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1052 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1056 // Reset view state for new query.
1057 if req.ViewID != v.Request.ViewID {
1058 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1059 if req.Query.Filter.MailboxChildrenIncluded {
1060 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1062 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1066 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1075 xprocessChanges(comm.Get())
1078 // Work around go vet, it doesn't see defer cancelDrain.
1079 if reqctxcancel != nil {
1087// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1088// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1089// mailboxIDs must or must not match. mailboxPrefixes is for use with
1090// xgatherMailboxIDs to gather children of the mailboxIDs.
1091func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1092 matchMailboxes = true
1093 mailboxIDs = map[int64]bool{}
1094 if f.MailboxID == -1 {
1095 matchMailboxes = false
1096 // Add the trash, junk and account rejects mailbox.
1097 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1098 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1099 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1100 mailboxIDs[mb.ID] = true
1104 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1105 } else if f.MailboxID > 0 {
1106 mb := store.Mailbox{ID: f.MailboxID}
1108 xcheckf(ctx, err, "get mailbox")
1109 mailboxIDs[f.MailboxID] = true
1110 mailboxPrefixes = []string{mb.Name + "/"}
1115// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1116// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1117func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1118 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1119 if len(mailboxPrefixes) == 0 {
1122 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1123 for _, p := range mailboxPrefixes {
1124 if strings.HasPrefix(mb.Name, p) {
1125 mailboxIDs[mb.ID] = true
1131 xcheckf(ctx, err, "gathering mailboxes")
1134// matchesMailbox returns whether a mailbox matches the view.
1135func (v view) matchesMailbox(mailboxID int64) bool {
1136 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1139// inRange returns whether m is within the range for the view, whether a change for
1140// this message should be sent to the client so it can update its state.
1141func (v view) inRange(m store.Message) bool {
1142 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1145// matches checks if the message, identified by either messageID or mailboxID+UID,
1146// is in the current "view" (i.e. passing the filters, and if checkRange is set
1147// also if within the range of sent messages based on sort order and the last seen
1148// message). getmsg retrieves the message, which may be necessary depending on the
1149// active filters. Used to determine if a store.Change with a new message should be
1150// sent, and for the destination and anchor messages in view requests.
1151func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1153 ensureMessage := func() bool {
1154 if m.ID == 0 && rerr == nil {
1155 m, rerr = getmsg(messageID, mailboxID, uid)
1160 q := v.Request.Query
1162 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1165 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1168 // note: anchorMessageID is not relevant for matching.
1169 flagfilter := q.flagFilterFn()
1170 if flagfilter != nil && !flagfilter(flags, keywords) {
1174 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1177 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1181 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1184 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1188 state := msgState{acc: acc}
1190 if rerr == nil && state.err != nil {
1196 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1197 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1201 envFilter := q.envFilterFn(log, &state)
1202 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1206 headerFilter := q.headerFilterFn(log, &state)
1207 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1211 wordsFilter := q.wordsFilterFn(log, &state)
1212 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1216 // Now check that we are either within the sorting order, or "last" was sent.
1217 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1223type msgResp struct {
1224 err error // If set, an error happened and fields below are not set.
1225 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1226 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1227 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1228 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1231// viewRequestTx executes a request (query with filters, pagination) by
1232// launching a new goroutine with queryMessages, receiving results as msgResp,
1233// and sending Event* to the SSE connection.
1235// It always closes tx.
1236func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1238 err := tx.Rollback()
1239 log.Check(err, "rolling back query transaction")
1241 donec <- v.Request.ID
1243 x := recover() // Should not happen, but don't take program down if it does.
1245 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1247 metrics.PanicInc(metrics.Webmailrequest)
1251 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1252 var parsedMessage *ParsedMessage
1255 var immediate bool // No waiting, flush immediate.
1256 t := time.NewTimer(300 * time.Millisecond)
1259 sendViewMsgs := func(force bool) {
1260 if len(msgitems) == 0 && !force {
1265 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1268 t.Reset(300 * time.Millisecond)
1271 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1273 mrc := make(chan msgResp, 1)
1274 go queryMessages(ctx, log, acc, tx, v, mrc)
1278 case mr, ok := <-mrc:
1281 // Empty message list signals this query is done.
1282 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1287 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1291 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1300 msgitems = append(msgitems, mr.mil)
1302 parsedMessage = mr.pm
1309 if len(msgitems) == 0 {
1310 // Nothing to send yet. We'll send immediately when the next message comes in.
1319// queryMessages executes a query, with filter, pagination, destination message id
1320// to fetch (the message that the client had in view and wants to display again).
1321// It sends on msgc, with several types of messages: errors, whether the view is
1322// reset due to missing AnchorMessageID, and when the end of the view was reached
1323// and/or for a message.
1324func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1326 x := recover() // Should not happen, but don't take program down if it does.
1328 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1330 mrc <- msgResp{err: fmt.Errorf("query failed")}
1331 metrics.PanicInc(metrics.Webmailquery)
1337 query := v.Request.Query
1338 page := v.Request.Page
1340 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1342 checkMessage := func(id int64) (valid bool, rerr error) {
1343 m := store.Message{ID: id}
1345 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1347 } else if err != nil {
1350 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1356 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1357 if page.AnchorMessageID > 0 {
1358 // Check if message exists and (still) matches the filter.
1359 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1360 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1361 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1364 mrc <- msgResp{reset: true}
1365 page.AnchorMessageID = 0
1369 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1370 // it instead of continuing to send message till the end of the view.
1371 if page.DestMessageID > 0 {
1372 if valid, err := checkMessage(page.DestMessageID); err != nil {
1373 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1376 page.DestMessageID = 0
1380 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1382 q := bstore.QueryTx[store.Message](tx)
1383 q.FilterEqual("Expunged", false)
1384 if len(v.mailboxIDs) > 0 {
1385 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1386 // Should result in fast indexed query.
1387 for mbID := range v.mailboxIDs {
1388 q.FilterNonzero(store.Message{MailboxID: mbID})
1391 idsAny := make([]any, 0, len(v.mailboxIDs))
1392 for mbID := range v.mailboxIDs {
1393 idsAny = append(idsAny, mbID)
1395 if v.matchMailboxIDs {
1396 q.FilterEqual("MailboxID", idsAny...)
1398 q.FilterNotEqual("MailboxID", idsAny...)
1403 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1404 if page.AnchorMessageID > 0 {
1406 q.FilterFn(func(m store.Message) bool {
1410 seen = m.ID == page.AnchorMessageID
1415 // We may be added filters the the query below. The FilterFn signature does not
1416 // implement reporting errors, or anything else, just a bool. So when making the
1417 // filter functions, we give them a place to store parsed message state, and an
1418 // error. We check the error during and after query execution.
1419 state := msgState{acc: acc}
1422 flagfilter := query.flagFilterFn()
1423 if flagfilter != nil {
1424 q.FilterFn(func(m store.Message) bool {
1425 return flagfilter(m.Flags, m.Keywords)
1429 if query.Filter.Oldest != nil {
1430 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1432 if query.Filter.Newest != nil {
1433 q.FilterLessEqual("Received", *query.Filter.Newest)
1436 if query.Filter.SizeMin > 0 {
1437 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1439 if query.Filter.SizeMax > 0 {
1440 q.FilterLessEqual("Size", query.Filter.SizeMax)
1443 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1444 if attachmentFilter != nil {
1445 q.FilterFn(attachmentFilter)
1448 envFilter := query.envFilterFn(log, &state)
1449 if envFilter != nil {
1450 q.FilterFn(envFilter)
1453 headerFilter := query.headerFilterFn(log, &state)
1454 if headerFilter != nil {
1455 q.FilterFn(headerFilter)
1458 wordsFilter := query.wordsFilterFn(log, &state)
1459 if wordsFilter != nil {
1460 q.FilterFn(wordsFilter)
1464 q.SortAsc("Received")
1466 q.SortDesc("Received")
1468 found := page.DestMessageID <= 0
1471 err := q.ForEach(func(m store.Message) error {
1472 // Check for an error in one of the filters, propagate it.
1473 if state.err != nil {
1477 if have >= page.Count && found || have > 10000 {
1479 return bstore.StopForEach
1482 if _, ok := v.threadIDs[m.ThreadID]; ok {
1483 // Message was already returned as part of a thread.
1487 var pm *ParsedMessage
1488 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1489 // For threads, if there was no DestMessageID, we may be getting the newest
1490 // message. For an initial view, this isn't necessarily the first the user is
1491 // expected to read first, that would be the first unread, which we'll get below
1492 // when gathering the thread.
1494 xpm, err := parsedMessage(log, m, &state, true, false)
1495 if err != nil && errors.Is(err, message.ErrHeader) {
1496 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1497 } else if err != nil {
1498 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1504 mi, err := messageItem(log, m, &state)
1506 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1508 mil := []MessageItem{mi}
1509 if query.Threading != ThreadOff {
1510 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1512 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1518 mil = append(mil, more...)
1519 v.threadIDs[m.ThreadID] = struct{}{}
1521 // Calculate how many messages the frontend is going to show, and only count those as returned.
1522 collapsed := map[int64]bool{}
1523 for _, mi := range mil {
1524 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1526 unread := map[int64]bool{} // Propagated to thread root.
1527 if query.Threading == ThreadUnread {
1528 for _, mi := range mil {
1533 unread[mm.ID] = true
1534 for _, id := range mm.ThreadParentIDs {
1539 for _, mi := range mil {
1543 for _, id := range mm.ThreadParentIDs {
1544 if _, ok := collapsed[id]; ok {
1549 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1556 if pm != nil && len(pm.envelope.From) == 1 {
1557 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1559 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1562 mrc <- msgResp{mil: mil, pm: pm}
1565 // Check for an error in one of the filters again. Check in ForEach would not
1566 // trigger if the last message has the error.
1567 if err == nil && state.err != nil {
1571 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1575 mrc <- msgResp{viewEnd: true}
1579func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1580 if m.ThreadID == 0 {
1581 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1582 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1585 // Fetch other messages for this thread.
1586 qt := bstore.QueryTx[store.Message](tx)
1587 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1588 qt.FilterEqual("Expunged", false)
1589 qt.FilterNotEqual("ID", m.ID)
1591 tml, err := qt.List()
1593 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1596 var mil []MessageItem
1597 var pm *ParsedMessage
1598 var firstUnread bool
1599 for _, tm := range tml {
1600 err := func() error {
1601 xstate := msgState{acc: acc}
1602 defer xstate.clear()
1604 mi, err := messageItem(log, tm, &xstate)
1606 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1608 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1612 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1614 mil = append(mil, mi)
1616 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1617 firstUnread = !tm.Seen
1618 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1619 if err != nil && errors.Is(err, message.ErrHeader) {
1620 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1621 } else if err != nil {
1622 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1630 return nil, nil, err
1634 // Finally, the message that caused us to gather this thread (which is likely the
1635 // most recent message in the thread) could be the only unread message.
1636 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1637 xstate := msgState{acc: acc}
1638 defer xstate.clear()
1639 xpm, err := parsedMessage(log, m, &xstate, true, false)
1640 if err != nil && errors.Is(err, message.ErrHeader) {
1641 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1642 } else if err != nil {
1643 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1652// While checking the filters on a message, we may need to get more message
1653// details as each filter passes. We check the filters that need the basic
1654// information first, and load and cache more details for the next filters.
1655// msgState holds parsed details for a message, it is updated while filtering,
1656// with more information or reset for a next message.
1657type msgState struct {
1658 acc *store.Account // Never changes during lifetime.
1659 err error // Once set, doesn't get cleared.
1661 part *message.Part // Will be without Reader when msgr is nil.
1662 msgr *store.MsgReader
1665func (ms *msgState) clear() {
1670 *ms = msgState{acc: ms.acc, err: ms.err}
1673func (ms *msgState) ensureMsg(m store.Message) {
1674 if m.ID != ms.m.ID {
1680func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1685 if m.ParsedBuf == nil {
1686 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1690 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1691 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1696 if withMsgReader && ms.msgr == nil {
1697 ms.msgr = ms.acc.MessageReader(m)
1698 ms.part.SetReaderAt(ms.msgr)
1701 return ms.part != nil
1704// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1705// filters for a query. A nil function is returned if there are no flags to filter
1707func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1708 labels := map[string]bool{}
1709 for _, k := range q.Filter.Labels {
1712 for _, k := range q.NotFilter.Labels {
1716 if len(labels) == 0 {
1720 var mask, flags store.Flags
1721 systemflags := map[string][]*bool{
1722 `\answered`: {&mask.Answered, &flags.Answered},
1723 `\flagged`: {&mask.Flagged, &flags.Flagged},
1724 `\deleted`: {&mask.Deleted, &flags.Deleted},
1725 `\seen`: {&mask.Seen, &flags.Seen},
1726 `\draft`: {&mask.Draft, &flags.Draft},
1727 `$junk`: {&mask.Junk, &flags.Junk},
1728 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1729 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1730 `$phishing`: {&mask.Phishing, &flags.Phishing},
1731 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1733 keywords := map[string]bool{}
1734 for k, v := range labels {
1735 k = strings.ToLower(k)
1736 if mf, ok := systemflags[k]; ok {
1743 return func(msgFlags store.Flags, msgKeywords []string) bool {
1745 if f.Set(mask, msgFlags) != flags {
1748 for k, v := range keywords {
1749 if slices.Contains(msgKeywords, k) != v {
1757// attachmentFilterFn returns a function that filters for the attachment-related
1758// filter from the query. A nil function is returned if there are attachment
1760func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1761 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1765 return func(m store.Message) bool {
1766 if !state.ensurePart(m, false) {
1769 types, err := attachmentTypes(log, m, state)
1774 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1778var attachmentMimetypes = map[string]AttachmentType{
1779 "application/pdf": AttachmentPDF,
1780 "application/zip": AttachmentArchive,
1781 "application/x-rar-compressed": AttachmentArchive,
1782 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1783 "application/vnd.ms-excel": AttachmentSpreadsheet,
1784 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1785 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1786 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1787 "application/vnd.ms-powerpoint": AttachmentPresentation,
1788 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1790var attachmentExtensions = map[string]AttachmentType{
1791 ".pdf": AttachmentPDF,
1792 ".zip": AttachmentArchive,
1793 ".tar": AttachmentArchive,
1794 ".tgz": AttachmentArchive,
1795 ".tar.gz": AttachmentArchive,
1796 ".tbz2": AttachmentArchive,
1797 ".tar.bz2": AttachmentArchive,
1798 ".tar.lz": AttachmentArchive,
1799 ".tlz": AttachmentArchive,
1800 ".tar.xz": AttachmentArchive,
1801 ".txz": AttachmentArchive,
1802 ".tar.zst": AttachmentArchive,
1803 ".tar.lz4": AttachmentArchive,
1804 ".7z": AttachmentArchive,
1805 ".rar": AttachmentArchive,
1806 ".ods": AttachmentSpreadsheet,
1807 ".xls": AttachmentSpreadsheet,
1808 ".xlsx": AttachmentSpreadsheet,
1809 ".odt": AttachmentDocument,
1810 ".doc": AttachmentDocument,
1811 ".docx": AttachmentDocument,
1812 ".odp": AttachmentPresentation,
1813 ".ppt": AttachmentPresentation,
1814 ".pptx": AttachmentPresentation,
1817func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1818 types := map[AttachmentType]bool{}
1820 pm, err := parsedMessage(log, m, state, false, false)
1822 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1824 for _, a := range pm.attachments {
1825 if a.Part.MediaType == "IMAGE" {
1826 types[AttachmentImage] = true
1829 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1830 if t, ok := attachmentMimetypes[mt]; ok {
1832 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1833 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1841 if len(types) == 0 {
1842 types[AttachmentNone] = true
1844 types[AttachmentAny] = true
1849// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1850// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1851// clash with SMTP envelope) for the query. A nil function is returned if no
1852// filtering is needed.
1853func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1854 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1858 lower := func(l []string) []string {
1862 r := make([]string, len(l))
1863 for i, s := range l {
1864 r[i] = strings.ToLower(s)
1869 filterSubject := lower(q.Filter.Subject)
1870 notFilterSubject := lower(q.NotFilter.Subject)
1871 filterFrom := lower(q.Filter.From)
1872 notFilterFrom := lower(q.NotFilter.From)
1873 filterTo := lower(q.Filter.To)
1874 notFilterTo := lower(q.NotFilter.To)
1876 return func(m store.Message) bool {
1877 if !state.ensurePart(m, false) {
1881 var env message.Envelope
1882 if state.part.Envelope != nil {
1883 env = *state.part.Envelope
1886 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1887 subject := strings.ToLower(env.Subject)
1888 for _, s := range filterSubject {
1889 if !strings.Contains(subject, s) {
1893 for _, s := range notFilterSubject {
1894 if strings.Contains(subject, s) {
1900 contains := func(textLower []string, l []message.Address, all bool) bool {
1902 for _, s := range textLower {
1903 for _, a := range l {
1904 name := strings.ToLower(a.Name)
1905 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1906 if strings.Contains(name, s) || strings.Contains(addr, s) {
1920 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1923 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1926 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1927 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1928 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1931 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1939// headerFilterFn returns a function that filters for the header filters in the
1940// query. A nil function is returned if there are no header filters.
1941func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1942 if len(q.Filter.Headers) == 0 {
1946 lowerValues := make([]string, len(q.Filter.Headers))
1947 for i, t := range q.Filter.Headers {
1948 lowerValues[i] = strings.ToLower(t[1])
1951 return func(m store.Message) bool {
1952 if !state.ensurePart(m, true) {
1955 hdr, err := state.part.Header()
1957 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1962 for i, t := range q.Filter.Headers {
1966 if v == "" && len(l) > 0 {
1969 for _, e := range l {
1970 if strings.Contains(strings.ToLower(e), v) {
1980// wordFiltersFn returns a function that applies the word filters of the query. A
1981// nil function is returned when query does not contain a word filter.
1982func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1983 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1987 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1989 return func(m store.Message) bool {
1990 if !state.ensurePart(m, true) {
1994 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1995 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)