3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
43 SSEID int64 // SSE connection.
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
49 // If set, this request and its view are canceled. A new view must be started.
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
64// Query is a request for messages that match filters, in a given order.
66 OrderAsc bool // Order by received ascending or desending.
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
101 Words []string // Case insensitive substring match for each string.
103 To []string // Including Cc and Bcc.
107 Attachments AttachmentType
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
120 Attachments AttachmentType
124// Page holds pagination parameters for a request.
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
182type ParsedMessage struct {
185 Headers map[string][]string
186 ViewMode store.ViewMode
188 // Text parts, can be empty.
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
198 ListReplyAddress *MessageAddress // From List-Post.
200 // Information used by MessageItem, not exported in this type.
201 envelope MessageEnvelope
202 attachments []Attachment
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparator string // Can be empty.
229 LocalpartCaseSensitive bool
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
285 MessageItems []MessageItem
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
351 // Whether message must or must not match mailboxIDs.
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
364// sses tracks all sse connections, and access to them.
371// sse represents an sse connection.
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
382 delete(sses.m, sse.ID)
384 // Drain any pending requests, preventing blocked goroutines from API calls.
394func sseRegister(accountName string) sse {
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
403// sseGet returns a reference to an existing connection if it exists and user
405func sseGet(id int64, accountName string) (sse, bool) {
409 if s.AccountName != accountName {
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
448 n := len(x.accountTokens[accName])
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
466 st, ok := x.tokens[token]
468 return "", "", "", false, nil
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
483 return st.accName, st.address, st.sessionToken, true, nil
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
491// serveEvents serves an SSE connection. Authentication is done through a query
492// string parameter "token", a one-time-use token returned by the Token API call.
493func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
494 if r.Method != "GET" {
495 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
499 flusher, ok := w.(http.Flusher)
501 log.Error("internal error: ResponseWriter not a http.Flusher")
502 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
507 token := q.Get("token")
509 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
512 accName, address, sessionToken, ok, err := sseTokens.check(token)
514 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
518 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
521 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
522 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
526 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
527 // incoming responses with its slow-network similation.
528 var waitMin, waitMax time.Duration
529 waitMinMsec := q.Get("waitMinMsec")
530 waitMaxMsec := q.Get("waitMaxMsec")
531 if waitMinMsec != "" && waitMaxMsec != "" {
532 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
533 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
536 waitMin = time.Duration(v) * time.Millisecond
539 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
540 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
543 waitMax = time.Duration(v) * time.Millisecond
547 // Parse the request with initial mailbox/search criteria.
549 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
550 dec.DisallowUnknownFields()
551 if err := dec.Decode(&req); err != nil {
552 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
554 } else if req.Page.Count <= 0 {
555 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
558 if req.Query.Threading == "" {
559 req.Query.Threading = ThreadOff
562 var writer *eventWriter
564 metricSSEConnections.Inc()
565 defer metricSSEConnections.Dec()
567 // Below here, error handling cause through xcheckf, which panics with
568 // *sherpa.Error, after which we send an error event to the client. We can also get
569 // an *ioErr when the connection is broken.
575 if err, ok := x.(*sherpa.Error); ok {
576 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
577 } else if _, ok := x.(ioErr); ok {
580 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
582 metrics.PanicInc(metrics.Webmail)
588 h.Set("Content-Type", "text/event-stream")
589 h.Set("Cache-Control", "no-cache")
591 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
592 // keys), so should be quite compressible.
594 gz := mox.AcceptsGzip(r)
596 h.Set("Content-Encoding", "gzip")
597 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
601 out = httpFlusher{out, flusher}
603 // We'll be writing outgoing SSE events through writer.
604 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
607 // Fetch initial data.
608 acc, err := store.OpenAccount(log, accName)
609 xcheckf(ctx, err, "open account")
612 log.Check(err, "closing account")
614 comm := store.RegisterComm(acc)
615 defer comm.Unregister()
617 // List addresses that the client can use to send email from.
618 accConf, _ := acc.Conf()
619 loginAddr, err := smtp.ParseAddress(address)
620 xcheckf(ctx, err, "parsing login address")
621 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
622 xcheckf(ctx, err, "looking up destination for login address")
623 loginName := accConf.FullName
624 if dest.FullName != "" {
625 loginName = dest.FullName
627 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
628 var addresses []MessageAddress
629 for a, dest := range accConf.Destinations {
630 name := dest.FullName
632 name = accConf.FullName
634 var ma MessageAddress
635 if strings.HasPrefix(a, "@") {
636 dom, err := dns.ParseDomain(a[1:])
637 xcheckf(ctx, err, "parsing destination address for account")
638 ma = MessageAddress{Domain: dom}
640 addr, err := smtp.ParseAddress(a)
641 xcheckf(ctx, err, "parsing destination address for account")
642 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
644 addresses = append(addresses, ma)
646 // User is allowed to send using alias address as message From address. Webmail
647 // will choose it when replying to a message sent to that address.
648 aliasAddrs := map[MessageAddress]bool{}
649 for _, a := range accConf.Aliases {
650 if a.Alias.AllowMsgFrom {
651 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
653 addresses = append(addresses, ma)
655 aliasAddrs[ma] = true
659 // We implicitly start a query. We use the reqctx for the transaction, because the
660 // transaction is passed to the query, which can be canceled.
661 reqctx, reqctxcancel := context.WithCancel(ctx)
663 // We also cancel in cancelDrain later on, but there is a brief window where the
664 // context wouldn't be canceled.
665 if reqctxcancel != nil {
671 // qtx is kept around during connection initialization, until we pass it off to the
672 // goroutine that starts querying for messages.
676 err := qtx.Rollback()
677 log.Check(err, "rolling back")
681 var mbl []store.Mailbox
682 settings := store.Settings{ID: 1}
684 // We only take the rlock when getting the tx.
685 acc.WithRLock(func() {
686 // Now a read-only transaction we'll use during the query.
687 qtx, err = acc.DB.Begin(reqctx, false)
688 xcheckf(ctx, err, "begin transaction")
690 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
691 xcheckf(ctx, err, "list mailboxes")
693 err = qtx.Get(&settings)
694 xcheckf(ctx, err, "get settings")
697 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
698 var zerofilter Filter
699 var zeronotfilter NotFilter
700 var mailbox store.Mailbox
701 var mailboxPrefixes []string
702 var matchMailboxes bool
703 mailboxIDs := map[int64]bool{}
704 mailboxName := req.Query.Filter.MailboxName
705 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
706 if mailboxName == "" {
707 mailboxName = "Inbox"
710 var inbox store.Mailbox
711 for _, e := range mbl {
712 if e.Name == mailboxName {
715 if e.Name == "Inbox" {
723 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
725 req.Query.Filter.MailboxID = mailbox.ID
726 req.Query.Filter.MailboxName = ""
727 mailboxPrefixes = []string{mailbox.Name + "/"}
728 matchMailboxes = true
729 mailboxIDs[mailbox.ID] = true
731 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
733 if req.Query.Filter.MailboxChildrenIncluded {
734 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
737 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
739 sse := sseRegister(acc.Name)
740 defer sse.unregister()
742 // Per-domain localpart config so webclient can decide if an address belongs to the account.
743 domainAddressConfigs := map[string]DomainAddressConfig{}
744 for _, a := range addresses {
745 dom, _ := mox.Conf.Domain(a.Domain)
746 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
749 // Write first event, allowing client to fill its UI with mailboxes.
750 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
751 writer.xsendEvent(ctx, log, "start", start)
753 // The goroutine doing the querying will send messages on these channels, which
754 // result in an event being written on the SSE connection.
755 viewMsgsc := make(chan EventViewMsgs)
756 viewErrc := make(chan EventViewErr)
757 viewResetc := make(chan EventViewReset)
758 donec := make(chan int64) // When request is done.
760 // Start a view, it determines if we send a change to the client. And start an
761 // implicit query for messages, we'll send the messages to the client which can
762 // fill its ui with messages.
763 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
764 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
765 qtx = nil // viewRequestTx closes qtx
767 // When canceling a query, we must drain its messages until it says it is done.
768 // Otherwise the sending goroutine would hang indefinitely on a channel send.
769 cancelDrain := func() {
770 if reqctxcancel != nil {
771 // Cancel the goroutine doing the querying.
779 // Drain events until done.
791 // If we stop and a query is in progress, we must drain the channel it will send on.
794 // Changes broadcasted by other connections on this account. If applicable for the
795 // connection/view, we send events.
796 xprocessChanges := func(changes []store.Change) {
797 taggedChanges := [][2]any{}
799 // We get a transaction first time we need it.
803 err := xtx.Rollback()
804 log.Check(err, "rolling back transaction")
807 ensureTx := func() error {
814 xtx, err = acc.DB.Begin(ctx, false)
817 // This getmsg will now only be called mailboxID+UID, not with messageID set.
818 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
819 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
820 if err := ensureTx(); err != nil {
821 return store.Message{}, fmt.Errorf("transaction: %v", err)
823 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
826 // Return uids that are within range in view. Because the end has been reached, or
827 // because the UID is not after the last message.
828 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
829 uidsAny := make([]any, len(uids))
830 for i, uid := range uids {
834 xcheckf(ctx, err, "transaction")
835 q := bstore.QueryTx[store.Message](xtx)
836 q.FilterNonzero(store.Message{MailboxID: mailboxID})
837 q.FilterEqual("UID", uidsAny...)
838 mbOK := v.matchesMailbox(mailboxID)
839 err = q.ForEach(func(m store.Message) error {
840 _, thread := v.threadIDs[m.ThreadID]
841 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
842 changedUIDs = append(changedUIDs, m.UID)
846 xcheckf(ctx, err, "fetching messages for change")
850 // Forward changes that are relevant to the current view.
851 for _, change := range changes {
852 switch c := change.(type) {
853 case store.ChangeAddUID:
854 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
855 xcheckf(ctx, err, "matching new message against view")
856 m, err := getmsg(0, c.MailboxID, c.UID)
857 xcheckf(ctx, err, "get message")
858 _, thread := v.threadIDs[m.ThreadID]
862 state := msgState{acc: acc}
863 mi, err := messageItem(log, m, &state)
865 xcheckf(ctx, err, "make messageitem")
868 mil := []MessageItem{mi}
869 if !thread && req.Query.Threading != ThreadOff {
871 xcheckf(ctx, err, "transaction")
872 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
873 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
874 mil = append(mil, more...)
875 v.threadIDs[m.ThreadID] = struct{}{}
878 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
880 // If message extends the view, store it as such.
881 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
882 v.LastMessageReceived = m.Received
885 case store.ChangeRemoveUIDs:
886 // We may send changes for uids the client doesn't know, that's fine.
887 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
888 if len(changedUIDs) == 0 {
891 ch := ChangeMsgRemove{c}
892 ch.UIDs = changedUIDs
893 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
895 case store.ChangeFlags:
896 // We may send changes for uids the client doesn't know, that's fine.
897 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
898 if len(changedUIDs) == 0 {
901 ch := ChangeMsgFlags{c}
902 ch.UID = changedUIDs[0]
903 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
905 case store.ChangeThread:
906 // Change in muted/collaped state, just always ship it.
907 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
909 case store.ChangeRemoveMailbox:
910 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
912 case store.ChangeAddMailbox:
913 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
915 case store.ChangeRenameMailbox:
916 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
918 case store.ChangeMailboxCounts:
919 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
921 case store.ChangeMailboxSpecialUse:
922 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
924 case store.ChangeMailboxKeywords:
925 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
927 case store.ChangeAddSubscription:
928 // Webmail does not care about subscriptions.
931 panic(fmt.Sprintf("missing case for change %T", c))
935 if len(taggedChanges) > 0 {
936 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
937 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
941 timer := time.NewTimer(5 * time.Minute) // For keepalives.
945 timer.Reset(5 * time.Minute)
949 pending := comm.Pending
955 case <-mox.Shutdown.Done():
956 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
957 // Work around go vet, it doesn't see defer cancelDrain.
958 if reqctxcancel != nil {
964 _, err := fmt.Fprintf(out, ": keepalive\n\n")
966 log.Errorx("write keepalive", err)
967 // Work around go vet, it doesn't see defer cancelDrain.
968 if reqctxcancel != nil {
976 case vm := <-viewMsgsc:
977 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
978 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
983 if len(vm.MessageItems) > 0 {
984 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
986 writer.xsendEvent(ctx, log, "viewMsgs", vm)
988 case ve := <-viewErrc:
989 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
990 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
992 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
993 // Work around go vet, it doesn't see defer cancelDrain.
994 if reqctxcancel != nil {
999 writer.xsendEvent(ctx, log, "viewErr", ve)
1001 case vr := <-viewResetc:
1002 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1003 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1005 writer.xsendEvent(ctx, log, "viewReset", vr)
1008 if id != v.Request.ID {
1009 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1011 if reqctxcancel != nil {
1017 case req := <-sse.Request:
1022 v = view{req, time.Time{}, false, false, nil, nil}
1026 reqctx, reqctxcancel = context.WithCancel(ctx)
1028 stop := func() (stop bool) {
1029 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1034 err = rtx.Rollback()
1035 log.Check(err, "rolling back transaction")
1038 acc.WithRLock(func() {
1039 rtx, err = acc.DB.Begin(reqctx, false)
1046 if errors.Is(err, context.Canceled) {
1049 err := fmt.Errorf("begin transaction: %v", err)
1050 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1051 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1055 // Reset view state for new query.
1056 if req.ViewID != v.Request.ViewID {
1057 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1058 if req.Query.Filter.MailboxChildrenIncluded {
1059 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1061 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1065 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1074 xprocessChanges(comm.Get())
1077 // Work around go vet, it doesn't see defer cancelDrain.
1078 if reqctxcancel != nil {
1086// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1087// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1088// mailboxIDs must or must not match. mailboxPrefixes is for use with
1089// xgatherMailboxIDs to gather children of the mailboxIDs.
1090func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1091 matchMailboxes = true
1092 mailboxIDs = map[int64]bool{}
1093 if f.MailboxID == -1 {
1094 matchMailboxes = false
1095 // Add the trash, junk and account rejects mailbox.
1096 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1097 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1098 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1099 mailboxIDs[mb.ID] = true
1103 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1104 } else if f.MailboxID > 0 {
1105 mb := store.Mailbox{ID: f.MailboxID}
1107 xcheckf(ctx, err, "get mailbox")
1108 mailboxIDs[f.MailboxID] = true
1109 mailboxPrefixes = []string{mb.Name + "/"}
1114// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1115// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1116func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1117 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1118 if len(mailboxPrefixes) == 0 {
1121 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1122 for _, p := range mailboxPrefixes {
1123 if strings.HasPrefix(mb.Name, p) {
1124 mailboxIDs[mb.ID] = true
1130 xcheckf(ctx, err, "gathering mailboxes")
1133// matchesMailbox returns whether a mailbox matches the view.
1134func (v view) matchesMailbox(mailboxID int64) bool {
1135 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1138// inRange returns whether m is within the range for the view, whether a change for
1139// this message should be sent to the client so it can update its state.
1140func (v view) inRange(m store.Message) bool {
1141 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1144// matches checks if the message, identified by either messageID or mailboxID+UID,
1145// is in the current "view" (i.e. passing the filters, and if checkRange is set
1146// also if within the range of sent messages based on sort order and the last seen
1147// message). getmsg retrieves the message, which may be necessary depending on the
1148// active filters. Used to determine if a store.Change with a new message should be
1149// sent, and for the destination and anchor messages in view requests.
1150func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1152 ensureMessage := func() bool {
1153 if m.ID == 0 && rerr == nil {
1154 m, rerr = getmsg(messageID, mailboxID, uid)
1159 q := v.Request.Query
1161 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1164 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1167 // note: anchorMessageID is not relevant for matching.
1168 flagfilter := q.flagFilterFn()
1169 if flagfilter != nil && !flagfilter(flags, keywords) {
1173 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1176 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1180 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1183 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1187 state := msgState{acc: acc}
1189 if rerr == nil && state.err != nil {
1195 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1196 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1200 envFilter := q.envFilterFn(log, &state)
1201 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1205 headerFilter := q.headerFilterFn(log, &state)
1206 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1210 wordsFilter := q.wordsFilterFn(log, &state)
1211 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1215 // Now check that we are either within the sorting order, or "last" was sent.
1216 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1222type msgResp struct {
1223 err error // If set, an error happened and fields below are not set.
1224 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1225 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1226 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1227 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1230// viewRequestTx executes a request (query with filters, pagination) by
1231// launching a new goroutine with queryMessages, receiving results as msgResp,
1232// and sending Event* to the SSE connection.
1234// It always closes tx.
1235func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1237 err := tx.Rollback()
1238 log.Check(err, "rolling back query transaction")
1240 donec <- v.Request.ID
1242 x := recover() // Should not happen, but don't take program down if it does.
1244 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1246 metrics.PanicInc(metrics.Webmailrequest)
1250 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1251 var parsedMessage *ParsedMessage
1254 var immediate bool // No waiting, flush immediate.
1255 t := time.NewTimer(300 * time.Millisecond)
1258 sendViewMsgs := func(force bool) {
1259 if len(msgitems) == 0 && !force {
1264 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1267 t.Reset(300 * time.Millisecond)
1270 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1272 mrc := make(chan msgResp, 1)
1273 go queryMessages(ctx, log, acc, tx, v, mrc)
1277 case mr, ok := <-mrc:
1280 // Empty message list signals this query is done.
1281 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1286 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1290 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1299 msgitems = append(msgitems, mr.mil)
1301 parsedMessage = mr.pm
1308 if len(msgitems) == 0 {
1309 // Nothing to send yet. We'll send immediately when the next message comes in.
1318// queryMessages executes a query, with filter, pagination, destination message id
1319// to fetch (the message that the client had in view and wants to display again).
1320// It sends on msgc, with several types of messages: errors, whether the view is
1321// reset due to missing AnchorMessageID, and when the end of the view was reached
1322// and/or for a message.
1323func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1325 x := recover() // Should not happen, but don't take program down if it does.
1327 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1329 mrc <- msgResp{err: fmt.Errorf("query failed")}
1330 metrics.PanicInc(metrics.Webmailquery)
1336 query := v.Request.Query
1337 page := v.Request.Page
1339 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1341 checkMessage := func(id int64) (valid bool, rerr error) {
1342 m := store.Message{ID: id}
1344 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1346 } else if err != nil {
1349 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1355 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1356 if page.AnchorMessageID > 0 {
1357 // Check if message exists and (still) matches the filter.
1358 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1359 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1360 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1363 mrc <- msgResp{reset: true}
1364 page.AnchorMessageID = 0
1368 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1369 // it instead of continuing to send message till the end of the view.
1370 if page.DestMessageID > 0 {
1371 if valid, err := checkMessage(page.DestMessageID); err != nil {
1372 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1375 page.DestMessageID = 0
1379 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1381 q := bstore.QueryTx[store.Message](tx)
1382 q.FilterEqual("Expunged", false)
1383 if len(v.mailboxIDs) > 0 {
1384 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1385 // Should result in fast indexed query.
1386 for mbID := range v.mailboxIDs {
1387 q.FilterNonzero(store.Message{MailboxID: mbID})
1390 idsAny := make([]any, 0, len(v.mailboxIDs))
1391 for mbID := range v.mailboxIDs {
1392 idsAny = append(idsAny, mbID)
1394 if v.matchMailboxIDs {
1395 q.FilterEqual("MailboxID", idsAny...)
1397 q.FilterNotEqual("MailboxID", idsAny...)
1402 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1403 if page.AnchorMessageID > 0 {
1405 q.FilterFn(func(m store.Message) bool {
1409 seen = m.ID == page.AnchorMessageID
1414 // We may be added filters the the query below. The FilterFn signature does not
1415 // implement reporting errors, or anything else, just a bool. So when making the
1416 // filter functions, we give them a place to store parsed message state, and an
1417 // error. We check the error during and after query execution.
1418 state := msgState{acc: acc}
1421 flagfilter := query.flagFilterFn()
1422 if flagfilter != nil {
1423 q.FilterFn(func(m store.Message) bool {
1424 return flagfilter(m.Flags, m.Keywords)
1428 if query.Filter.Oldest != nil {
1429 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1431 if query.Filter.Newest != nil {
1432 q.FilterLessEqual("Received", *query.Filter.Newest)
1435 if query.Filter.SizeMin > 0 {
1436 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1438 if query.Filter.SizeMax > 0 {
1439 q.FilterLessEqual("Size", query.Filter.SizeMax)
1442 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1443 if attachmentFilter != nil {
1444 q.FilterFn(attachmentFilter)
1447 envFilter := query.envFilterFn(log, &state)
1448 if envFilter != nil {
1449 q.FilterFn(envFilter)
1452 headerFilter := query.headerFilterFn(log, &state)
1453 if headerFilter != nil {
1454 q.FilterFn(headerFilter)
1457 wordsFilter := query.wordsFilterFn(log, &state)
1458 if wordsFilter != nil {
1459 q.FilterFn(wordsFilter)
1463 q.SortAsc("Received")
1465 q.SortDesc("Received")
1467 found := page.DestMessageID <= 0
1470 err := q.ForEach(func(m store.Message) error {
1471 // Check for an error in one of the filters, propagate it.
1472 if state.err != nil {
1476 if have >= page.Count && found || have > 10000 {
1478 return bstore.StopForEach
1481 if _, ok := v.threadIDs[m.ThreadID]; ok {
1482 // Message was already returned as part of a thread.
1486 var pm *ParsedMessage
1487 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1488 // For threads, if there was not DestMessageID, we may be getting the newest
1489 // message. For an initial view, this isn't necessarily the first the user is
1490 // expected to read first, that would be the first unread, which we'll get below
1491 // when gathering the thread.
1493 xpm, err := parsedMessage(log, m, &state, true, false)
1495 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1500 mi, err := messageItem(log, m, &state)
1502 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1504 mil := []MessageItem{mi}
1505 if query.Threading != ThreadOff {
1506 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1508 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1514 mil = append(mil, more...)
1515 v.threadIDs[m.ThreadID] = struct{}{}
1517 // Calculate how many messages the frontend is going to show, and only count those as returned.
1518 collapsed := map[int64]bool{}
1519 for _, mi := range mil {
1520 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1522 unread := map[int64]bool{} // Propagated to thread root.
1523 if query.Threading == ThreadUnread {
1524 for _, mi := range mil {
1529 unread[mm.ID] = true
1530 for _, id := range mm.ThreadParentIDs {
1535 for _, mi := range mil {
1539 for _, id := range mm.ThreadParentIDs {
1540 if _, ok := collapsed[id]; ok {
1545 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1552 if pm != nil && len(pm.envelope.From) == 1 {
1553 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1555 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1558 mrc <- msgResp{mil: mil, pm: pm}
1561 // Check for an error in one of the filters again. Check in ForEach would not
1562 // trigger if the last message has the error.
1563 if err == nil && state.err != nil {
1567 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1571 mrc <- msgResp{viewEnd: true}
1575func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1576 if m.ThreadID == 0 {
1577 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1578 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1581 // Fetch other messages for this thread.
1582 qt := bstore.QueryTx[store.Message](tx)
1583 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1584 qt.FilterEqual("Expunged", false)
1585 qt.FilterNotEqual("ID", m.ID)
1587 tml, err := qt.List()
1589 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1592 var mil []MessageItem
1593 var pm *ParsedMessage
1594 var firstUnread bool
1595 for _, tm := range tml {
1596 err := func() error {
1597 xstate := msgState{acc: acc}
1598 defer xstate.clear()
1600 mi, err := messageItem(log, tm, &xstate)
1602 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1604 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1608 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1610 mil = append(mil, mi)
1612 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1613 firstUnread = !tm.Seen
1614 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1616 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1623 return nil, nil, err
1627 // Finally, the message that caused us to gather this thread (which is likely the
1628 // most recent message in the thread) could be the only unread message.
1629 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1630 xstate := msgState{acc: acc}
1631 defer xstate.clear()
1632 xpm, err := parsedMessage(log, m, &xstate, true, false)
1634 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1642// While checking the filters on a message, we may need to get more message
1643// details as each filter passes. We check the filters that need the basic
1644// information first, and load and cache more details for the next filters.
1645// msgState holds parsed details for a message, it is updated while filtering,
1646// with more information or reset for a next message.
1647type msgState struct {
1648 acc *store.Account // Never changes during lifetime.
1649 err error // Once set, doesn't get cleared.
1651 part *message.Part // Will be without Reader when msgr is nil.
1652 msgr *store.MsgReader
1655func (ms *msgState) clear() {
1660 *ms = msgState{acc: ms.acc, err: ms.err}
1663func (ms *msgState) ensureMsg(m store.Message) {
1664 if m.ID != ms.m.ID {
1670func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1675 if m.ParsedBuf == nil {
1676 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1680 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1681 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1686 if withMsgReader && ms.msgr == nil {
1687 ms.msgr = ms.acc.MessageReader(m)
1688 ms.part.SetReaderAt(ms.msgr)
1691 return ms.part != nil
1694// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1695// filters for a query. A nil function is returned if there are no flags to filter
1697func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1698 labels := map[string]bool{}
1699 for _, k := range q.Filter.Labels {
1702 for _, k := range q.NotFilter.Labels {
1706 if len(labels) == 0 {
1710 var mask, flags store.Flags
1711 systemflags := map[string][]*bool{
1712 `\answered`: {&mask.Answered, &flags.Answered},
1713 `\flagged`: {&mask.Flagged, &flags.Flagged},
1714 `\deleted`: {&mask.Deleted, &flags.Deleted},
1715 `\seen`: {&mask.Seen, &flags.Seen},
1716 `\draft`: {&mask.Draft, &flags.Draft},
1717 `$junk`: {&mask.Junk, &flags.Junk},
1718 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1719 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1720 `$phishing`: {&mask.Phishing, &flags.Phishing},
1721 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1723 keywords := map[string]bool{}
1724 for k, v := range labels {
1725 k = strings.ToLower(k)
1726 if mf, ok := systemflags[k]; ok {
1733 return func(msgFlags store.Flags, msgKeywords []string) bool {
1735 if f.Set(mask, msgFlags) != flags {
1738 for k, v := range keywords {
1739 if slices.Contains(msgKeywords, k) != v {
1747// attachmentFilterFn returns a function that filters for the attachment-related
1748// filter from the query. A nil function is returned if there are attachment
1750func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1751 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1755 return func(m store.Message) bool {
1756 if !state.ensurePart(m, false) {
1759 types, err := attachmentTypes(log, m, state)
1764 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1768var attachmentMimetypes = map[string]AttachmentType{
1769 "application/pdf": AttachmentPDF,
1770 "application/zip": AttachmentArchive,
1771 "application/x-rar-compressed": AttachmentArchive,
1772 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1773 "application/vnd.ms-excel": AttachmentSpreadsheet,
1774 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1775 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1776 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1777 "application/vnd.ms-powerpoint": AttachmentPresentation,
1778 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1780var attachmentExtensions = map[string]AttachmentType{
1781 ".pdf": AttachmentPDF,
1782 ".zip": AttachmentArchive,
1783 ".tar": AttachmentArchive,
1784 ".tgz": AttachmentArchive,
1785 ".tar.gz": AttachmentArchive,
1786 ".tbz2": AttachmentArchive,
1787 ".tar.bz2": AttachmentArchive,
1788 ".tar.lz": AttachmentArchive,
1789 ".tlz": AttachmentArchive,
1790 ".tar.xz": AttachmentArchive,
1791 ".txz": AttachmentArchive,
1792 ".tar.zst": AttachmentArchive,
1793 ".tar.lz4": AttachmentArchive,
1794 ".7z": AttachmentArchive,
1795 ".rar": AttachmentArchive,
1796 ".ods": AttachmentSpreadsheet,
1797 ".xls": AttachmentSpreadsheet,
1798 ".xlsx": AttachmentSpreadsheet,
1799 ".odt": AttachmentDocument,
1800 ".doc": AttachmentDocument,
1801 ".docx": AttachmentDocument,
1802 ".odp": AttachmentPresentation,
1803 ".ppt": AttachmentPresentation,
1804 ".pptx": AttachmentPresentation,
1807func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1808 types := map[AttachmentType]bool{}
1810 pm, err := parsedMessage(log, m, state, false, false)
1812 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1814 for _, a := range pm.attachments {
1815 if a.Part.MediaType == "IMAGE" {
1816 types[AttachmentImage] = true
1819 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1820 if t, ok := attachmentMimetypes[mt]; ok {
1822 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1823 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1831 if len(types) == 0 {
1832 types[AttachmentNone] = true
1834 types[AttachmentAny] = true
1839// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1840// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1841// clash with SMTP envelope) for the query. A nil function is returned if no
1842// filtering is needed.
1843func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1844 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1848 lower := func(l []string) []string {
1852 r := make([]string, len(l))
1853 for i, s := range l {
1854 r[i] = strings.ToLower(s)
1859 filterSubject := lower(q.Filter.Subject)
1860 notFilterSubject := lower(q.NotFilter.Subject)
1861 filterFrom := lower(q.Filter.From)
1862 notFilterFrom := lower(q.NotFilter.From)
1863 filterTo := lower(q.Filter.To)
1864 notFilterTo := lower(q.NotFilter.To)
1866 return func(m store.Message) bool {
1867 if !state.ensurePart(m, false) {
1871 var env message.Envelope
1872 if state.part.Envelope != nil {
1873 env = *state.part.Envelope
1876 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1877 subject := strings.ToLower(env.Subject)
1878 for _, s := range filterSubject {
1879 if !strings.Contains(subject, s) {
1883 for _, s := range notFilterSubject {
1884 if strings.Contains(subject, s) {
1890 contains := func(textLower []string, l []message.Address, all bool) bool {
1892 for _, s := range textLower {
1893 for _, a := range l {
1894 name := strings.ToLower(a.Name)
1895 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1896 if strings.Contains(name, s) || strings.Contains(addr, s) {
1910 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1913 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1916 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1917 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1918 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1921 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1929// headerFilterFn returns a function that filters for the header filters in the
1930// query. A nil function is returned if there are no header filters.
1931func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1932 if len(q.Filter.Headers) == 0 {
1936 lowerValues := make([]string, len(q.Filter.Headers))
1937 for i, t := range q.Filter.Headers {
1938 lowerValues[i] = strings.ToLower(t[1])
1941 return func(m store.Message) bool {
1942 if !state.ensurePart(m, true) {
1945 hdr, err := state.part.Header()
1947 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1952 for i, t := range q.Filter.Headers {
1956 if v == "" && len(l) > 0 {
1959 for _, e := range l {
1960 if strings.Contains(strings.ToLower(e), v) {
1970// wordFiltersFn returns a function that applies the word filters of the query. A
1971// nil function is returned when query does not contain a word filter.
1972func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1973 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1977 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1979 return func(m store.Message) bool {
1980 if !state.ensurePart(m, true) {
1984 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1985 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)