3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
42 SSEID int64 // SSE connection.
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
48 // If set, this request and its view are canceled. A new view must be started.
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
63// Query is a request for messages that match filters, in a given order.
65 OrderAsc bool // Order by received ascending or desending.
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
100 Words []string // Case insensitive substring match for each string.
102 To []string // Including Cc and Bcc.
106 Attachments AttachmentType
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
119 Attachments AttachmentType
123// Page holds pagination parameters for a request.
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
181type ParsedMessage struct {
184 Headers map[string][]string
185 ViewMode store.ViewMode
187 Texts []string // Contents of text parts, can be empty.
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
196 ListReplyAddress *MessageAddress // From List-Post.
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
285 MessageItems []MessageItem
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
351 // Whether message must or must not match mailboxIDs.
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
364// sses tracks all sse connections, and access to them.
371// sse represents an sse connection.
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
382 delete(sses.m, sse.ID)
384 // Drain any pending requests, preventing blocked goroutines from API calls.
394func sseRegister(accountName string) sse {
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
403// sseGet returns a reference to an existing connection if it exists and user
405func sseGet(id int64, accountName string) (sse, bool) {
409 if s.AccountName != accountName {
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
442 cryptrand.Read(buf[:])
443 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf[:]), accName, address, sessionToken, time.Now().Add(time.Minute)}
447 n := len(x.accountTokens[accName])
449 for _, ost := range x.accountTokens[accName][:n-9] {
450 delete(x.tokens, ost.token)
452 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
453 x.accountTokens[accName] = x.accountTokens[accName][:9]
455 x.accountTokens[accName] = append(x.accountTokens[accName], st)
456 x.tokens[st.token] = st
460// check verifies a token, and consumes it if valid.
461func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
465 st, ok := x.tokens[token]
467 return "", "", "", false, nil
469 delete(x.tokens, token)
470 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
471 return "", "", "", false, errors.New("internal error, could not find token in account")
473 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
474 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
475 if len(x.accountTokens[st.accName]) == 0 {
476 delete(x.accountTokens, st.accName)
479 if time.Now().After(st.validUntil) {
480 return "", "", "", false, nil
482 return st.accName, st.address, st.sessionToken, true, nil
485// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
490// ensure we have a non-nil moreHeaders, taking it from Settings.
491func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
492 if moreHeaders != nil {
493 return moreHeaders, nil
496 s := store.Settings{ID: 1}
497 if err := tx.Get(&s); err != nil {
498 return nil, fmt.Errorf("get settings: %v", err)
500 moreHeaders = s.ShowHeaders
501 if moreHeaders == nil {
502 moreHeaders = []string{} // Ensure we won't get Settings again next call.
504 return moreHeaders, nil
507// serveEvents serves an SSE connection. Authentication is done through a query
508// string parameter "singleUseToken", a one-time-use token returned by the Token
510func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
511 if r.Method != "GET" {
512 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
516 flusher, ok := w.(http.Flusher)
518 log.Error("internal error: ResponseWriter not a http.Flusher")
519 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
524 token := q.Get("singleUseToken")
526 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
529 accName, address, sessionToken, ok, err := sseTokens.check(token)
531 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
535 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
538 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
539 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
543 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
544 // incoming responses with its slow-network similation.
545 var waitMin, waitMax time.Duration
546 waitMinMsec := q.Get("waitMinMsec")
547 waitMaxMsec := q.Get("waitMaxMsec")
548 if waitMinMsec != "" && waitMaxMsec != "" {
549 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
550 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
553 waitMin = time.Duration(v) * time.Millisecond
556 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
557 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
560 waitMax = time.Duration(v) * time.Millisecond
564 // Parse the request with initial mailbox/search criteria.
566 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
567 dec.DisallowUnknownFields()
568 if err := dec.Decode(&req); err != nil {
569 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
571 } else if req.Page.Count <= 0 {
572 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
575 if req.Query.Threading == "" {
576 req.Query.Threading = ThreadOff
579 var writer *eventWriter
581 metricSSEConnections.Inc()
582 defer metricSSEConnections.Dec()
584 // Below here, error handling cause through xcheckf, which panics with
585 // *sherpa.Error, after which we send an error event to the client. We can also get
586 // an *ioErr when the connection is broken.
592 if err, ok := x.(*sherpa.Error); ok {
593 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
594 } else if _, ok := x.(ioErr); ok {
597 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
599 metrics.PanicInc(metrics.Webmail)
605 h.Set("Content-Type", "text/event-stream")
606 h.Set("Cache-Control", "no-cache")
608 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
609 // keys), so should be quite compressible.
611 gz := mox.AcceptsGzip(r)
613 h.Set("Content-Encoding", "gzip")
614 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
618 out = httpFlusher{out, flusher}
620 // We'll be writing outgoing SSE events through writer.
621 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
624 // Fetch initial data.
625 acc, err := store.OpenAccount(log, accName, true)
626 xcheckf(ctx, err, "open account")
629 log.Check(err, "closing account")
631 comm := store.RegisterComm(acc)
632 defer comm.Unregister()
634 // List addresses that the client can use to send email from.
635 accConf, _ := acc.Conf()
636 loginAddr, err := smtp.ParseAddress(address)
637 xcheckf(ctx, err, "parsing login address")
638 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
639 xcheckf(ctx, err, "looking up destination for login address")
640 loginName := accConf.FullName
641 if dest.FullName != "" {
642 loginName = dest.FullName
644 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
645 var addresses []MessageAddress
646 for a, dest := range accConf.Destinations {
647 name := dest.FullName
649 name = accConf.FullName
651 var ma MessageAddress
652 if strings.HasPrefix(a, "@") {
653 dom, err := dns.ParseDomain(a[1:])
654 xcheckf(ctx, err, "parsing destination address for account")
655 ma = MessageAddress{Domain: dom}
657 addr, err := smtp.ParseAddress(a)
658 xcheckf(ctx, err, "parsing destination address for account")
659 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
661 addresses = append(addresses, ma)
663 // User is allowed to send using alias address as message From address. Webmail
664 // will choose it when replying to a message sent to that address.
665 aliasAddrs := map[MessageAddress]bool{}
666 for _, a := range accConf.Aliases {
667 if a.Alias.AllowMsgFrom {
668 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
670 addresses = append(addresses, ma)
672 aliasAddrs[ma] = true
676 // We implicitly start a query. We use the reqctx for the transaction, because the
677 // transaction is passed to the query, which can be canceled.
678 reqctx, reqctxcancel := context.WithCancel(ctx)
680 // We also cancel in cancelDrain later on, but there is a brief window where the
681 // context wouldn't be canceled.
682 if reqctxcancel != nil {
688 // qtx is kept around during connection initialization, until we pass it off to the
689 // goroutine that starts querying for messages.
693 err := qtx.Rollback()
694 log.Check(err, "rolling back")
698 var mbl []store.Mailbox
699 settings := store.Settings{ID: 1}
701 // We only take the rlock when getting the tx.
702 acc.WithRLock(func() {
703 // Now a read-only transaction we'll use during the query.
704 qtx, err = acc.DB.Begin(reqctx, false)
705 xcheckf(ctx, err, "begin transaction")
707 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
708 xcheckf(ctx, err, "list mailboxes")
710 err = qtx.Get(&settings)
711 xcheckf(ctx, err, "get settings")
714 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
715 var zerofilter Filter
716 var zeronotfilter NotFilter
717 var mailbox store.Mailbox
718 var mailboxPrefixes []string
719 var matchMailboxes bool
720 mailboxIDs := map[int64]bool{}
721 mailboxName := req.Query.Filter.MailboxName
722 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
723 if mailboxName == "" {
724 mailboxName = "Inbox"
727 var inbox store.Mailbox
728 for _, e := range mbl {
729 if e.Name == mailboxName {
732 if e.Name == "Inbox" {
740 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
742 req.Query.Filter.MailboxID = mailbox.ID
743 req.Query.Filter.MailboxName = ""
744 mailboxPrefixes = []string{mailbox.Name + "/"}
745 matchMailboxes = true
746 mailboxIDs[mailbox.ID] = true
748 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
750 if req.Query.Filter.MailboxChildrenIncluded {
751 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
754 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
756 sse := sseRegister(acc.Name)
757 defer sse.unregister()
759 // Per-domain localpart config so webclient can decide if an address belongs to the account.
760 domainAddressConfigs := map[string]DomainAddressConfig{}
761 for _, a := range addresses {
762 dom, _ := mox.Conf.Domain(a.Domain)
763 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
766 // Write first event, allowing client to fill its UI with mailboxes.
767 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
768 writer.xsendEvent(ctx, log, "start", start)
770 // The goroutine doing the querying will send messages on these channels, which
771 // result in an event being written on the SSE connection.
772 viewMsgsc := make(chan EventViewMsgs)
773 viewErrc := make(chan EventViewErr)
774 viewResetc := make(chan EventViewReset)
775 donec := make(chan int64) // When request is done.
777 // Start a view, it determines if we send a change to the client. And start an
778 // implicit query for messages, we'll send the messages to the client which can
779 // fill its ui with messages.
780 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
781 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
782 qtx = nil // viewRequestTx closes qtx
784 // When canceling a query, we must drain its messages until it says it is done.
785 // Otherwise the sending goroutine would hang indefinitely on a channel send.
786 cancelDrain := func() {
787 if reqctxcancel != nil {
788 // Cancel the goroutine doing the querying.
796 // Drain events until done.
808 // If we stop and a query is in progress, we must drain the channel it will send on.
811 // Changes broadcasted by other connections on this account. If applicable for the
812 // connection/view, we send events.
813 xprocessChanges := func(changes []store.Change) {
814 taggedChanges := [][2]any{}
816 newPreviews := map[int64]string{}
817 defer storeNewPreviews(ctx, log, acc, newPreviews)
819 // We get a transaction first time we need it.
823 err := xtx.Rollback()
824 log.Check(err, "rolling back transaction")
827 ensureTx := func() error {
834 xtx, err = acc.DB.Begin(ctx, false)
837 // This getmsg will now only be called mailboxID+UID, not with messageID set.
838 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
839 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
840 if err := ensureTx(); err != nil {
841 return store.Message{}, fmt.Errorf("transaction: %v", err)
843 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
846 // Additional headers from settings to add to MessageItems.
847 var moreHeaders []string
848 xmoreHeaders := func() []string {
850 xcheckf(ctx, err, "transaction")
852 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
853 xcheckf(ctx, err, "ensuring more headers")
857 // Return uids that are within range in view. Because the end has been reached, or
858 // because the UID is not after the last message.
859 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
860 uidsAny := make([]any, len(uids))
861 for i, uid := range uids {
865 xcheckf(ctx, err, "transaction")
866 q := bstore.QueryTx[store.Message](xtx)
867 q.FilterNonzero(store.Message{MailboxID: mailboxID})
868 q.FilterEqual("UID", uidsAny...)
869 mbOK := v.matchesMailbox(mailboxID)
870 err = q.ForEach(func(m store.Message) error {
871 _, thread := v.threadIDs[m.ThreadID]
872 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
873 changedUIDs = append(changedUIDs, m.UID)
877 xcheckf(ctx, err, "fetching messages for change")
881 // Forward changes that are relevant to the current view.
882 for _, change := range changes {
883 switch c := change.(type) {
884 case store.ChangeAddUID:
885 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
886 xcheckf(ctx, err, "matching new message against view")
887 m, err := getmsg(0, c.MailboxID, c.UID)
888 xcheckf(ctx, err, "get message")
889 _, thread := v.threadIDs[m.ThreadID]
894 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
895 mi, err := messageItem(log, m, &state, xmoreHeaders())
897 xcheckf(ctx, err, "make messageitem")
900 mil := []MessageItem{mi}
901 if !thread && req.Query.Threading != ThreadOff {
903 xcheckf(ctx, err, "transaction")
904 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
905 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
906 mil = append(mil, more...)
907 v.threadIDs[m.ThreadID] = struct{}{}
910 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
912 // If message extends the view, store it as such.
913 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
914 v.LastMessageReceived = m.Received
917 case store.ChangeRemoveUIDs:
920 // We may send changes for uids the client doesn't know, that's fine.
921 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
922 if len(changedUIDs) == 0 {
925 ch := ChangeMsgRemove{c}
926 ch.UIDs = changedUIDs
927 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
929 case store.ChangeFlags:
930 // We may send changes for uids the client doesn't know, that's fine.
931 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
932 if len(changedUIDs) == 0 {
935 ch := ChangeMsgFlags{c}
936 ch.UID = changedUIDs[0]
937 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
939 case store.ChangeThread:
940 // Change in muted/collaped state, just always ship it.
941 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
943 case store.ChangeRemoveMailbox:
944 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
946 case store.ChangeAddMailbox:
947 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
949 case store.ChangeRenameMailbox:
950 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
952 case store.ChangeMailboxCounts:
953 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
955 case store.ChangeMailboxSpecialUse:
956 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
958 case store.ChangeMailboxKeywords:
959 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
961 case store.ChangeAddSubscription, store.ChangeRemoveSubscription:
962 // Webmail does not care about subscriptions.
964 case store.ChangeAnnotation:
968 panic(fmt.Sprintf("missing case for change %T", c))
972 if len(taggedChanges) > 0 {
973 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
974 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
978 timer := time.NewTimer(5 * time.Minute) // For keepalives.
982 timer.Reset(5 * time.Minute)
986 pending := comm.Pending
992 case <-mox.Shutdown.Done():
993 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
994 // Work around go vet, it doesn't see defer cancelDrain.
995 if reqctxcancel != nil {
1001 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1006 log.Errorx("write keepalive", err)
1007 // Work around go vet, it doesn't see defer cancelDrain.
1008 if reqctxcancel != nil {
1015 case vm := <-viewMsgsc:
1016 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1017 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1022 if len(vm.MessageItems) > 0 {
1023 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1025 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1027 case ve := <-viewErrc:
1028 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1029 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1031 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1032 // Work around go vet, it doesn't see defer cancelDrain.
1033 if reqctxcancel != nil {
1038 writer.xsendEvent(ctx, log, "viewErr", ve)
1040 case vr := <-viewResetc:
1041 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1042 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1044 writer.xsendEvent(ctx, log, "viewReset", vr)
1047 if id != v.Request.ID {
1048 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1050 if reqctxcancel != nil {
1056 case req := <-sse.Request:
1061 v = view{req, time.Time{}, false, false, nil, nil}
1065 reqctx, reqctxcancel = context.WithCancel(ctx)
1067 stop := func() (stop bool) {
1068 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1073 err = rtx.Rollback()
1074 log.Check(err, "rolling back transaction")
1077 acc.WithRLock(func() {
1078 rtx, err = acc.DB.Begin(reqctx, false)
1085 if errors.Is(err, context.Canceled) {
1088 err := fmt.Errorf("begin transaction: %v", err)
1089 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1090 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1094 // Reset view state for new query.
1095 if req.ViewID != v.Request.ViewID {
1096 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1097 if req.Query.Filter.MailboxChildrenIncluded {
1098 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1100 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1104 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1113 overflow, changes := comm.Get()
1115 writer.xsendEvent(ctx, log, "fatalErr", "out of sync, too many pending changes")
1118 xprocessChanges(changes)
1121 // Work around go vet, it doesn't see defer cancelDrain.
1122 if reqctxcancel != nil {
1130// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1131// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1132// mailboxIDs must or must not match. mailboxPrefixes is for use with
1133// xgatherMailboxIDs to gather children of the mailboxIDs.
1134func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1135 matchMailboxes = true
1136 mailboxIDs = map[int64]bool{}
1137 if f.MailboxID == -1 {
1138 matchMailboxes = false
1139 // Add the trash, junk and account rejects mailbox.
1140 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1141 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1142 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1143 mailboxIDs[mb.ID] = true
1147 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1148 } else if f.MailboxID > 0 {
1149 mb, err := store.MailboxID(tx, f.MailboxID)
1150 xcheckf(ctx, err, "get mailbox")
1151 mailboxIDs[f.MailboxID] = true
1152 mailboxPrefixes = []string{mb.Name + "/"}
1157// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1158// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1159func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1160 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1161 if len(mailboxPrefixes) == 0 {
1164 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1165 for _, p := range mailboxPrefixes {
1166 if strings.HasPrefix(mb.Name, p) {
1167 mailboxIDs[mb.ID] = true
1173 xcheckf(ctx, err, "gathering mailboxes")
1176// matchesMailbox returns whether a mailbox matches the view.
1177func (v view) matchesMailbox(mailboxID int64) bool {
1178 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1181// inRange returns whether m is within the range for the view, whether a change for
1182// this message should be sent to the client so it can update its state.
1183func (v view) inRange(m store.Message) bool {
1184 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1187// matches checks if the message, identified by either messageID or mailboxID+UID,
1188// is in the current "view" (i.e. passing the filters, and if checkRange is set
1189// also if within the range of sent messages based on sort order and the last seen
1190// message). getmsg retrieves the message, which may be necessary depending on the
1191// active filters. Used to determine if a store.Change with a new message should be
1192// sent, and for the destination and anchor messages in view requests.
1193func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1195 ensureMessage := func() bool {
1196 if m.ID == 0 && rerr == nil {
1197 m, rerr = getmsg(messageID, mailboxID, uid)
1202 q := v.Request.Query
1204 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1207 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1210 // note: anchorMessageID is not relevant for matching.
1211 flagfilter := q.flagFilterFn()
1212 if flagfilter != nil && !flagfilter(flags, keywords) {
1216 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1219 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1223 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1226 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1230 state := msgState{acc: acc, log: log}
1232 if rerr == nil && state.err != nil {
1238 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1239 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1243 envFilter := q.envFilterFn(log, &state)
1244 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1248 headerFilter := q.headerFilterFn(log, &state)
1249 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1253 wordsFilter := q.wordsFilterFn(log, &state)
1254 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1258 // Now check that we are either within the sorting order, or "last" was sent.
1259 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1265type msgResp struct {
1266 err error // If set, an error happened and fields below are not set.
1267 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1268 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1269 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1270 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1273func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1274 if len(newPreviews) == 0 {
1281 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1283 metrics.PanicInc(metrics.Store)
1287 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1288 for id, preview := range newPreviews {
1289 m := store.Message{ID: id}
1290 if err := tx.Get(&m); err != nil {
1291 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1292 } else if !m.Expunged {
1293 m.Preview = &preview
1294 if err := tx.Update(&m); err != nil {
1295 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1301 log.Check(err, "saving new previews with messages")
1304// viewRequestTx executes a request (query with filters, pagination) by
1305// launching a new goroutine with queryMessages, receiving results as msgResp,
1306// and sending Event* to the SSE connection.
1308// It always closes tx.
1309func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1310 // Newly generated previews which we'll save when the operation is done.
1311 newPreviews := map[int64]string{}
1314 err := tx.Rollback()
1315 log.Check(err, "rolling back query transaction")
1317 donec <- v.Request.ID
1319 // ctx can be canceled, we still want to store the previews.
1320 storeNewPreviews(context.Background(), log, acc, newPreviews)
1322 x := recover() // Should not happen, but don't take program down if it does.
1324 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1326 metrics.PanicInc(metrics.Webmailrequest)
1330 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1331 var parsedMessage *ParsedMessage
1334 var immediate bool // No waiting, flush immediate.
1335 t := time.NewTimer(300 * time.Millisecond)
1338 sendViewMsgs := func(force bool) {
1339 if len(msgitems) == 0 && !force {
1344 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1347 t.Reset(300 * time.Millisecond)
1350 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1352 mrc := make(chan msgResp, 1)
1353 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1357 case mr, ok := <-mrc:
1360 // Empty message list signals this query is done.
1361 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1366 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1370 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1379 msgitems = append(msgitems, mr.mil)
1381 parsedMessage = mr.pm
1388 if len(msgitems) == 0 {
1389 // Nothing to send yet. We'll send immediately when the next message comes in.
1398// queryMessages executes a query, with filter, pagination, destination message id
1399// to fetch (the message that the client had in view and wants to display again).
1400// It sends on msgc, with several types of messages: errors, whether the view is
1401// reset due to missing AnchorMessageID, and when the end of the view was reached
1402// and/or for a message.
1403// newPreviews is filled with previews, the caller must save them.
1404func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1406 x := recover() // Should not happen, but don't take program down if it does.
1408 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1410 mrc <- msgResp{err: fmt.Errorf("query failed")}
1411 metrics.PanicInc(metrics.Webmailquery)
1417 query := v.Request.Query
1418 page := v.Request.Page
1420 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1422 checkMessage := func(id int64) (valid bool, rerr error) {
1423 m := store.Message{ID: id}
1425 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1427 } else if err != nil {
1430 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1436 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1437 if page.AnchorMessageID > 0 {
1438 // Check if message exists and (still) matches the filter.
1439 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1440 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1441 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1444 mrc <- msgResp{reset: true}
1445 page.AnchorMessageID = 0
1449 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1450 // it instead of continuing to send message till the end of the view.
1451 if page.DestMessageID > 0 {
1452 if valid, err := checkMessage(page.DestMessageID); err != nil {
1453 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1456 page.DestMessageID = 0
1460 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1462 q := bstore.QueryTx[store.Message](tx)
1463 q.FilterEqual("Expunged", false)
1464 if len(v.mailboxIDs) > 0 {
1465 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1466 // Should result in fast indexed query.
1467 for mbID := range v.mailboxIDs {
1468 q.FilterNonzero(store.Message{MailboxID: mbID})
1471 idsAny := make([]any, 0, len(v.mailboxIDs))
1472 for mbID := range v.mailboxIDs {
1473 idsAny = append(idsAny, mbID)
1475 if v.matchMailboxIDs {
1476 q.FilterEqual("MailboxID", idsAny...)
1478 q.FilterNotEqual("MailboxID", idsAny...)
1483 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1484 if page.AnchorMessageID > 0 {
1486 q.FilterFn(func(m store.Message) bool {
1490 seen = m.ID == page.AnchorMessageID
1495 // We may be added filters the the query below. The FilterFn signature does not
1496 // implement reporting errors, or anything else, just a bool. So when making the
1497 // filter functions, we give them a place to store parsed message state, and an
1498 // error. We check the error during and after query execution.
1499 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1502 flagfilter := query.flagFilterFn()
1503 if flagfilter != nil {
1504 q.FilterFn(func(m store.Message) bool {
1505 return flagfilter(m.Flags, m.Keywords)
1509 if query.Filter.Oldest != nil {
1510 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1512 if query.Filter.Newest != nil {
1513 q.FilterLessEqual("Received", *query.Filter.Newest)
1516 if query.Filter.SizeMin > 0 {
1517 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1519 if query.Filter.SizeMax > 0 {
1520 q.FilterLessEqual("Size", query.Filter.SizeMax)
1523 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1524 if attachmentFilter != nil {
1525 q.FilterFn(attachmentFilter)
1528 envFilter := query.envFilterFn(log, &state)
1529 if envFilter != nil {
1530 q.FilterFn(envFilter)
1533 headerFilter := query.headerFilterFn(log, &state)
1534 if headerFilter != nil {
1535 q.FilterFn(headerFilter)
1538 wordsFilter := query.wordsFilterFn(log, &state)
1539 if wordsFilter != nil {
1540 q.FilterFn(wordsFilter)
1543 var moreHeaders []string // From store.Settings.ShowHeaders
1546 q.SortAsc("Received")
1548 q.SortDesc("Received")
1550 found := page.DestMessageID <= 0
1553 err := q.ForEach(func(m store.Message) error {
1554 // Check for an error in one of the filters, propagate it.
1555 if state.err != nil {
1559 if have >= page.Count && found || have > 10000 {
1561 return bstore.StopForEach
1564 if _, ok := v.threadIDs[m.ThreadID]; ok {
1565 // Message was already returned as part of a thread.
1569 var pm *ParsedMessage
1570 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1571 // For threads, if there was no DestMessageID, we may be getting the newest
1572 // message. For an initial view, this isn't necessarily the first the user is
1573 // expected to read first, that would be the first unread, which we'll get below
1574 // when gathering the thread.
1576 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1577 if err != nil && errors.Is(err, message.ErrHeader) {
1578 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1579 } else if err != nil {
1580 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1587 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1589 return fmt.Errorf("ensuring more headers: %v", err)
1592 mi, err := messageItem(log, m, &state, moreHeaders)
1594 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1596 mil := []MessageItem{mi}
1597 if query.Threading != ThreadOff {
1598 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1600 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1606 mil = append(mil, more...)
1607 v.threadIDs[m.ThreadID] = struct{}{}
1609 // Calculate how many messages the frontend is going to show, and only count those as returned.
1610 collapsed := map[int64]bool{}
1611 for _, mi := range mil {
1612 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1614 unread := map[int64]bool{} // Propagated to thread root.
1615 if query.Threading == ThreadUnread {
1616 for _, mi := range mil {
1621 unread[mm.ID] = true
1622 for _, id := range mm.ThreadParentIDs {
1627 for _, mi := range mil {
1631 for _, id := range mm.ThreadParentIDs {
1632 if _, ok := collapsed[id]; ok {
1637 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1644 if pm != nil && len(pm.envelope.From) == 1 {
1645 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1647 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1650 mrc <- msgResp{mil: mil, pm: pm}
1653 // Check for an error in one of the filters again. Check in ForEach would not
1654 // trigger if the last message has the error.
1655 if err == nil && state.err != nil {
1659 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1663 mrc <- msgResp{viewEnd: true}
1667func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1668 if m.ThreadID == 0 {
1669 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1670 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1673 // Fetch other messages for this thread.
1674 qt := bstore.QueryTx[store.Message](tx)
1675 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1676 qt.FilterEqual("Expunged", false)
1677 qt.FilterNotEqual("ID", m.ID)
1679 tml, err := qt.List()
1681 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1684 var mil []MessageItem
1685 var pm *ParsedMessage
1686 var firstUnread bool
1687 for _, tm := range tml {
1688 err := func() error {
1689 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1690 defer xstate.clear()
1692 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1694 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1696 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1700 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1702 mil = append(mil, mi)
1704 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1705 firstUnread = !tm.Seen
1706 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1707 if err != nil && errors.Is(err, message.ErrHeader) {
1708 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1709 } else if err != nil {
1710 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1718 return nil, nil, err
1722 // Finally, the message that caused us to gather this thread (which is likely the
1723 // most recent message in the thread) could be the only unread message.
1724 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1725 xstate := msgState{acc: acc, log: log}
1726 defer xstate.clear()
1727 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1728 if err != nil && errors.Is(err, message.ErrHeader) {
1729 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1730 } else if err != nil {
1731 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1740// While checking the filters on a message, we may need to get more message
1741// details as each filter passes. We check the filters that need the basic
1742// information first, and load and cache more details for the next filters.
1743// msgState holds parsed details for a message, it is updated while filtering,
1744// with more information or reset for a next message.
1745type msgState struct {
1746 acc *store.Account // Never changes during lifetime.
1747 err error // Once set, doesn't get cleared.
1749 part *message.Part // Will be without Reader when msgr is nil.
1750 msgr *store.MsgReader
1753 // If not nil, messages will get their Preview field filled when nil, and message
1754 // id and preview added to newPreviews, and saved in a separate write transaction
1755 // when the operation is done.
1756 newPreviews map[int64]string
1759func (ms *msgState) clear() {
1761 err := ms.msgr.Close()
1762 ms.log.Check(err, "closing message reader from state")
1765 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1768func (ms *msgState) ensureMsg(m store.Message) {
1769 if m.ID != ms.m.ID {
1775func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1780 if m.ParsedBuf == nil {
1781 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1785 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1786 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1791 if withMsgReader && ms.msgr == nil {
1792 ms.msgr = ms.acc.MessageReader(m)
1793 ms.part.SetReaderAt(ms.msgr)
1796 return ms.part != nil
1799// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1800// filters for a query. A nil function is returned if there are no flags to filter
1802func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1803 labels := map[string]bool{}
1804 for _, k := range q.Filter.Labels {
1807 for _, k := range q.NotFilter.Labels {
1811 if len(labels) == 0 {
1815 var mask, flags store.Flags
1816 systemflags := map[string][]*bool{
1817 `\answered`: {&mask.Answered, &flags.Answered},
1818 `\flagged`: {&mask.Flagged, &flags.Flagged},
1819 `\deleted`: {&mask.Deleted, &flags.Deleted},
1820 `\seen`: {&mask.Seen, &flags.Seen},
1821 `\draft`: {&mask.Draft, &flags.Draft},
1822 `$junk`: {&mask.Junk, &flags.Junk},
1823 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1824 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1825 `$phishing`: {&mask.Phishing, &flags.Phishing},
1826 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1828 keywords := map[string]bool{}
1829 for k, v := range labels {
1830 k = strings.ToLower(k)
1831 if mf, ok := systemflags[k]; ok {
1838 return func(msgFlags store.Flags, msgKeywords []string) bool {
1840 if f.Set(mask, msgFlags) != flags {
1843 for k, v := range keywords {
1844 if slices.Contains(msgKeywords, k) != v {
1852// attachmentFilterFn returns a function that filters for the attachment-related
1853// filter from the query. A nil function is returned if there are attachment
1855func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1856 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1860 return func(m store.Message) bool {
1861 if !state.ensurePart(m, true) {
1864 types, err := attachmentTypes(log, m, state)
1869 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1873var attachmentMimetypes = map[string]AttachmentType{
1874 "application/pdf": AttachmentPDF,
1875 "application/zip": AttachmentArchive,
1876 "application/x-rar-compressed": AttachmentArchive,
1877 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1878 "application/vnd.ms-excel": AttachmentSpreadsheet,
1879 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1880 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1881 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1882 "application/vnd.ms-powerpoint": AttachmentPresentation,
1883 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1885var attachmentExtensions = map[string]AttachmentType{
1886 ".pdf": AttachmentPDF,
1887 ".zip": AttachmentArchive,
1888 ".tar": AttachmentArchive,
1889 ".tgz": AttachmentArchive,
1890 ".tar.gz": AttachmentArchive,
1891 ".tbz2": AttachmentArchive,
1892 ".tar.bz2": AttachmentArchive,
1893 ".tar.lz": AttachmentArchive,
1894 ".tlz": AttachmentArchive,
1895 ".tar.xz": AttachmentArchive,
1896 ".txz": AttachmentArchive,
1897 ".tar.zst": AttachmentArchive,
1898 ".tar.lz4": AttachmentArchive,
1899 ".7z": AttachmentArchive,
1900 ".rar": AttachmentArchive,
1901 ".ods": AttachmentSpreadsheet,
1902 ".xls": AttachmentSpreadsheet,
1903 ".xlsx": AttachmentSpreadsheet,
1904 ".odt": AttachmentDocument,
1905 ".doc": AttachmentDocument,
1906 ".docx": AttachmentDocument,
1907 ".odp": AttachmentPresentation,
1908 ".ppt": AttachmentPresentation,
1909 ".pptx": AttachmentPresentation,
1912func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1913 types := map[AttachmentType]bool{}
1915 pm, err := parsedMessage(log, &m, state, false, false, false)
1917 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1919 for _, a := range pm.attachments {
1920 if a.Part.MediaType == "IMAGE" {
1921 types[AttachmentImage] = true
1924 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1925 if t, ok := attachmentMimetypes[mt]; ok {
1929 _, filename, err := a.Part.DispositionFilename()
1930 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1931 log.Debugx("parsing disposition/filename", err)
1932 } else if err != nil {
1933 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1935 if ext := filepath.Ext(filename); ext != "" {
1936 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1942 if len(types) == 0 {
1943 types[AttachmentNone] = true
1945 types[AttachmentAny] = true
1950// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1951// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1952// clash with SMTP envelope) for the query. A nil function is returned if no
1953// filtering is needed.
1954func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1955 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1959 lower := func(l []string) []string {
1963 r := make([]string, len(l))
1964 for i, s := range l {
1965 r[i] = strings.ToLower(s)
1970 filterSubject := lower(q.Filter.Subject)
1971 notFilterSubject := lower(q.NotFilter.Subject)
1972 filterFrom := lower(q.Filter.From)
1973 notFilterFrom := lower(q.NotFilter.From)
1974 filterTo := lower(q.Filter.To)
1975 notFilterTo := lower(q.NotFilter.To)
1977 return func(m store.Message) bool {
1978 if !state.ensurePart(m, false) {
1982 var env message.Envelope
1983 if state.part.Envelope != nil {
1984 env = *state.part.Envelope
1987 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1988 subject := strings.ToLower(env.Subject)
1989 for _, s := range filterSubject {
1990 if !strings.Contains(subject, s) {
1994 for _, s := range notFilterSubject {
1995 if strings.Contains(subject, s) {
2001 contains := func(textLower []string, l []message.Address, all bool) bool {
2003 for _, s := range textLower {
2004 for _, a := range l {
2005 name := strings.ToLower(a.Name)
2006 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2007 if strings.Contains(name, s) || strings.Contains(addr, s) {
2021 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2024 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2027 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2028 to := slices.Concat(env.To, env.CC, env.BCC)
2029 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2032 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2040// headerFilterFn returns a function that filters for the header filters in the
2041// query. A nil function is returned if there are no header filters.
2042func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2043 if len(q.Filter.Headers) == 0 {
2047 lowerValues := make([]string, len(q.Filter.Headers))
2048 for i, t := range q.Filter.Headers {
2049 lowerValues[i] = strings.ToLower(t[1])
2052 return func(m store.Message) bool {
2053 if !state.ensurePart(m, true) {
2056 hdr, err := state.part.Header()
2058 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2063 for i, t := range q.Filter.Headers {
2067 if v == "" && len(l) > 0 {
2070 for _, e := range l {
2071 if strings.Contains(strings.ToLower(e), v) {
2081// wordFiltersFn returns a function that applies the word filters of the query. A
2082// nil function is returned when query does not contain a word filter.
2083func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2084 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2088 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2090 return func(m store.Message) bool {
2091 if !state.ensurePart(m, true) {
2095 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2096 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)