3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
 
8	cryptrand "crypto/rand"
 
24	"github.com/mjl-/bstore"
 
25	"github.com/mjl-/sherpa"
 
27	"github.com/mjl-/mox/dns"
 
28	"github.com/mjl-/mox/message"
 
29	"github.com/mjl-/mox/metrics"
 
30	"github.com/mjl-/mox/mlog"
 
31	"github.com/mjl-/mox/mox-"
 
32	"github.com/mjl-/mox/moxvar"
 
33	"github.com/mjl-/mox/smtp"
 
34	"github.com/mjl-/mox/store"
 
37// Request is a request to an SSE connection to send messages, either for a new
 
38// view, to continue with an existing view, or to a cancel an ongoing request.
 
42	SSEID int64 // SSE connection.
 
44	// To indicate a request is a continuation (more results) of the previous view.
 
45	// Echoed in events, client checks if it is getting results for the latest request.
 
48	// If set, this request and its view are canceled. A new view must be started.
 
58	ThreadOff    ThreadMode = "off"
 
59	ThreadOn     ThreadMode = "on"
 
60	ThreadUnread ThreadMode = "unread"
 
63// Query is a request for messages that match filters, in a given order.
 
65	OrderAsc  bool // Order by received ascending or desending.
 
71// AttachmentType is for filtering by attachment type.
 
72type AttachmentType string
 
75	AttachmentIndifferent  AttachmentType = ""
 
76	AttachmentNone         AttachmentType = "none"
 
77	AttachmentAny          AttachmentType = "any"
 
78	AttachmentImage        AttachmentType = "image" // png, jpg, gif, ...
 
79	AttachmentPDF          AttachmentType = "pdf"
 
80	AttachmentArchive      AttachmentType = "archive"      // zip files, tgz, ...
 
81	AttachmentSpreadsheet  AttachmentType = "spreadsheet"  // ods, xlsx, ...
 
82	AttachmentDocument     AttachmentType = "document"     // odt, docx, ...
 
83	AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
 
86// Filter selects the messages to return. Fields that are set must all match,
 
87// for slices each element by match ("and").
 
89	// If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
 
92	// If true, also submailboxes are included in the search.
 
93	MailboxChildrenIncluded bool
 
95	// In case client doesn't know mailboxes and their IDs yet. Only used during sse
 
96	// connection setup, where it is turned into a MailboxID. Filtering only looks at
 
100	Words       []string // Case insensitive substring match for each string.
 
102	To          []string // Including Cc and Bcc.
 
106	Attachments AttachmentType
 
108	Headers     [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
 
113// NotFilter matches messages that don't match these fields.
 
114type NotFilter struct {
 
119	Attachments AttachmentType
 
123// Page holds pagination parameters for a request.
 
125	// Start returning messages after this ID, if > 0. For pagination, fetching the
 
126	// next set of messages.
 
127	AnchorMessageID int64
 
129	// Number of messages to return, must be >= 1, we never return more than 10000 for
 
133	// If > 0, return messages until DestMessageID is found. More than Count messages
 
134	// can be returned. For long-running searches, it may take a while before this
 
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
 
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
 
143type MessageAddress struct {
 
144	Name   string // Free-form name for display in mail applications.
 
145	User   string // Localpart, encoded.
 
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
 
150// unicode host names for IDNA names.
 
151type MessageEnvelope struct {
 
152	// todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
 
155	From      []MessageAddress
 
156	Sender    []MessageAddress
 
157	ReplyTo   []MessageAddress
 
165// MessageItem is sent by queries, it has derived information analyzed from
 
166// message.Part, made for the needs of the message items in the message list.
 
168type MessageItem struct {
 
169	Message     store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
 
170	Envelope    MessageEnvelope
 
171	Attachments []Attachment
 
174	MatchQuery  bool        // If message does not match query, it can still be included because of threading.
 
175	MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
 
178// ParsedMessage has more parsed/derived information about a message, intended
 
179// for rendering the (contents of the) message. Information from MessageItem is
 
181type ParsedMessage struct {
 
184	Headers  map[string][]string
 
185	ViewMode store.ViewMode
 
187	Texts []string // Contents of text parts, can be empty.
 
189	// Whether there is an HTML part. The webclient renders HTML message parts through
 
190	// an iframe and a separate request with strict CSP headers to prevent script
 
191	// execution and loading of external resources, which isn't possible when loading
 
192	// in iframe with inline HTML because not all browsers support the iframe csp
 
196	ListReplyAddress *MessageAddress // From List-Post.
 
198	TextPaths [][]int // Paths to text parts.
 
199	HTMLPath  []int   // Path to HTML part.
 
201	// Information used by MessageItem, not exported in this type.
 
202	envelope    MessageEnvelope
 
203	attachments []Attachment
 
208// EventStart is the first message sent on an SSE connection, giving the client
 
209// basic data to populate its UI. After this event, messages will follow quickly in
 
210// an EventViewMsgs event.
 
211type EventStart struct {
 
213	LoginAddress         MessageAddress
 
214	Addresses            []MessageAddress
 
215	DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
 
217	Mailboxes            []store.Mailbox
 
218	RejectsMailbox       string
 
219	Settings             store.Settings
 
220	AccountPath          string // If nonempty, the path on same host to webaccount interface.
 
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
 
225// the webmail client can decide if an address matches the addresses of the
 
227type DomainAddressConfig struct {
 
228	LocalpartCatchallSeparators []string // Can be empty.
 
229	LocalpartCaseSensitive      bool
 
232// EventViewMsgs contains messages for a view, possibly a continuation of an
 
233// earlier list of messages.
 
234type EventViewMsgs struct {
 
238	// If empty, this was the last message for the request. If non-empty, a list of
 
239	// thread messages. Each with the first message being the reason this thread is
 
240	// included and can be used as AnchorID in followup requests. If the threading mode
 
241	// is "off" in the query, there will always be only a single message. If a thread
 
242	// is sent, all messages in the thread are sent, including those that don't match
 
243	// the query (e.g. from another mailbox). Threads can be displayed based on the
 
244	// ThreadParentIDs field, with possibly slightly different display based on field
 
245	// ThreadMissingLink.
 
246	MessageItems [][]MessageItem
 
248	// If set, will match the target page.DestMessageID from the request.
 
249	ParsedMessage *ParsedMessage
 
251	// If set, there are no more messages in this view at this moment. Messages can be
 
252	// added, typically via Change messages, e.g. for new deliveries.
 
256// EventViewErr indicates an error during a query for messages. The request is
 
257// aborted, no more request-related messages will be sent until the next request.
 
258type EventViewErr struct {
 
261	Err       string // To be displayed in client.
 
262	err       error  // Original message, for checking against context.Canceled.
 
265// EventViewReset indicates that a request for the next set of messages in a few
 
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
 
267// The client should clear its list of messages. This can happen before
 
268// EventViewMsgs events are sent.
 
269type EventViewReset struct {
 
274// EventViewChanges contain one or more changes relevant for the client, either
 
275// with new mailbox total/unseen message counts, or messages added/removed/modified
 
276// (flags) for the current view.
 
277type EventViewChanges struct {
 
279	Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
 
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
 
283type ChangeMsgAdd struct {
 
285	MessageItems []MessageItem
 
288// ChangeMsgRemove removes one or more messages from the view.
 
289type ChangeMsgRemove struct {
 
290	store.ChangeRemoveUIDs
 
293// ChangeMsgFlags updates flags for one message.
 
294type ChangeMsgFlags struct {
 
298// ChangeMsgThread updates muted/collapsed fields for one message.
 
299type ChangeMsgThread struct {
 
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
 
304type ChangeMailboxRemove struct {
 
305	store.ChangeRemoveMailbox
 
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
 
309type ChangeMailboxAdd struct {
 
310	Mailbox store.Mailbox
 
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
 
314// It could be under a new parent.
 
315type ChangeMailboxRename struct {
 
316	store.ChangeRenameMailbox
 
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
 
320type ChangeMailboxCounts struct {
 
321	store.ChangeMailboxCounts
 
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
 
325type ChangeMailboxSpecialUse struct {
 
326	store.ChangeMailboxSpecialUse
 
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
 
330// a message was added with a keyword that wasn't in the mailbox yet.
 
331type ChangeMailboxKeywords struct {
 
332	store.ChangeMailboxKeywords
 
335// View holds the information about the returned data for a query. It is used to
 
336// determine whether mailbox changes should be sent to the client, we only send
 
337// addition/removal/flag-changes of messages that are in view, or would extend it
 
338// if the view is at the end of the results.
 
342	// Received of last message we sent to the client. We use it to decide if a newly
 
343	// delivered message is within the view and the client should get a notification.
 
344	LastMessageReceived time.Time
 
346	// If set, the last message in the query view has been sent. There is no need to do
 
347	// another query, it will not return more data. Used to decide if an event for a
 
348	// new message should be sent.
 
351	// Whether message must or must not match mailboxIDs.
 
353	// Mailboxes to match, can be multiple, for matching children. If empty, there is
 
354	// no filter on mailboxes.
 
355	mailboxIDs map[int64]bool
 
357	// Threads sent to client. New messages for this thread are also sent, regardless
 
358	// of regular query matching, so also for other mailboxes. If the user (re)moved
 
359	// all messages of a thread, they may still receive events for the thread. Only
 
360	// filled when query with threading not off.
 
361	threadIDs map[int64]struct{}
 
364// sses tracks all sse connections, and access to them.
 
371// sse represents an sse connection.
 
373	ID          int64        // Also returned in EventStart and used in Request to identify the request.
 
374	AccountName string       // Used to check the authenticated user has access to the SSE connection.
 
375	Request     chan Request // Goroutine will receive requests from here, coming from API calls.
 
378// called by the goroutine when the connection is closed or breaks.
 
379func (sse sse) unregister() {
 
382	delete(sses.m, sse.ID)
 
384	// Drain any pending requests, preventing blocked goroutines from API calls.
 
394func sseRegister(accountName string) sse {
 
398	v := sse{sses.gen, accountName, make(chan Request, 1)}
 
403// sseGet returns a reference to an existing connection if it exists and user
 
405func sseGet(id int64, accountName string) (sse, bool) {
 
409	if s.AccountName != accountName {
 
415// ssetoken is a temporary token that has not yet been used to start an SSE
 
416// connection. Created by Token, consumed by a new SSE connection.
 
417type ssetoken struct {
 
418	token        string // Uniquely generated.
 
420	address      string             // Address used to authenticate in call that created the token.
 
421	sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
 
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
 
426// can define methods.
 
427type ssetokens struct {
 
429	accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
 
430	tokens        map[string]ssetoken   // Token to details, for finding account for a token.
 
433var sseTokens = ssetokens{
 
434	accountTokens: map[string][]ssetoken{},
 
435	tokens:        map[string]ssetoken{},
 
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
 
439// per account exist, removing old ones if needed.
 
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
 
441	buf := make([]byte, 16)
 
442	_, err := cryptrand.Read(buf)
 
443	xcheckf(ctx, err, "generating token")
 
444	st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
 
448	n := len(x.accountTokens[accName])
 
450		for _, ost := range x.accountTokens[accName][:n-9] {
 
451			delete(x.tokens, ost.token)
 
453		copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
 
454		x.accountTokens[accName] = x.accountTokens[accName][:9]
 
456	x.accountTokens[accName] = append(x.accountTokens[accName], st)
 
457	x.tokens[st.token] = st
 
461// check verifies a token, and consumes it if valid.
 
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
 
466	st, ok := x.tokens[token]
 
468		return "", "", "", false, nil
 
470	delete(x.tokens, token)
 
471	if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
 
472		return "", "", "", false, errors.New("internal error, could not find token in account")
 
474		copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
 
475		x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
 
476		if len(x.accountTokens[st.accName]) == 0 {
 
477			delete(x.accountTokens, st.accName)
 
480	if time.Now().After(st.validUntil) {
 
481		return "", "", "", false, nil
 
483	return st.accName, st.address, st.sessionToken, true, nil
 
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
 
491// ensure we have a non-nil moreHeaders, taking it from Settings.
 
492func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
 
493	if moreHeaders != nil {
 
494		return moreHeaders, nil
 
497	s := store.Settings{ID: 1}
 
498	if err := tx.Get(&s); err != nil {
 
499		return nil, fmt.Errorf("get settings: %v", err)
 
501	moreHeaders = s.ShowHeaders
 
502	if moreHeaders == nil {
 
503		moreHeaders = []string{} // Ensure we won't get Settings again next call.
 
505	return moreHeaders, nil
 
508// serveEvents serves an SSE connection. Authentication is done through a query
 
509// string parameter "singleUseToken", a one-time-use token returned by the Token
 
511func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
 
512	if r.Method != "GET" {
 
513		http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
 
517	flusher, ok := w.(http.Flusher)
 
519		log.Error("internal error: ResponseWriter not a http.Flusher")
 
520		http.Error(w, "500 - internal error - cannot sync to http connection", 500)
 
525	token := q.Get("singleUseToken")
 
527		http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
 
530	accName, address, sessionToken, ok, err := sseTokens.check(token)
 
532		http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
 
536		http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
 
539	if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
 
540		http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
 
544	// We can simulate a slow SSE connection. It seems firefox doesn't slow down
 
545	// incoming responses with its slow-network similation.
 
546	var waitMin, waitMax time.Duration
 
547	waitMinMsec := q.Get("waitMinMsec")
 
548	waitMaxMsec := q.Get("waitMaxMsec")
 
549	if waitMinMsec != "" && waitMaxMsec != "" {
 
550		if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
 
551			http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
 
554			waitMin = time.Duration(v) * time.Millisecond
 
557		if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
 
558			http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
 
561			waitMax = time.Duration(v) * time.Millisecond
 
565	// Parse the request with initial mailbox/search criteria.
 
567	dec := json.NewDecoder(strings.NewReader(q.Get("request")))
 
568	dec.DisallowUnknownFields()
 
569	if err := dec.Decode(&req); err != nil {
 
570		http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
 
572	} else if req.Page.Count <= 0 {
 
573		http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
 
576	if req.Query.Threading == "" {
 
577		req.Query.Threading = ThreadOff
 
580	var writer *eventWriter
 
582	metricSSEConnections.Inc()
 
583	defer metricSSEConnections.Dec()
 
585	// Below here, error handling cause through xcheckf, which panics with
 
586	// *sherpa.Error, after which we send an error event to the client. We can also get
 
587	// an *ioErr when the connection is broken.
 
593		if err, ok := x.(*sherpa.Error); ok {
 
594			writer.xsendEvent(ctx, log, "fatalErr", err.Message)
 
595		} else if _, ok := x.(ioErr); ok {
 
598			log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
 
600			metrics.PanicInc(metrics.Webmail)
 
606	h.Set("Content-Type", "text/event-stream")
 
607	h.Set("Cache-Control", "no-cache")
 
609	// We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
 
610	// keys), so should be quite compressible.
 
612	gz := mox.AcceptsGzip(r)
 
614		h.Set("Content-Encoding", "gzip")
 
615		out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
 
619	out = httpFlusher{out, flusher}
 
621	// We'll be writing outgoing SSE events through writer.
 
622	writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
 
625	// Fetch initial data.
 
626	acc, err := store.OpenAccount(log, accName, true)
 
627	xcheckf(ctx, err, "open account")
 
630		log.Check(err, "closing account")
 
632	comm := store.RegisterComm(acc)
 
633	defer comm.Unregister()
 
635	// List addresses that the client can use to send email from.
 
636	accConf, _ := acc.Conf()
 
637	loginAddr, err := smtp.ParseAddress(address)
 
638	xcheckf(ctx, err, "parsing login address")
 
639	_, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
 
640	xcheckf(ctx, err, "looking up destination for login address")
 
641	loginName := accConf.FullName
 
642	if dest.FullName != "" {
 
643		loginName = dest.FullName
 
645	loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
 
646	var addresses []MessageAddress
 
647	for a, dest := range accConf.Destinations {
 
648		name := dest.FullName
 
650			name = accConf.FullName
 
652		var ma MessageAddress
 
653		if strings.HasPrefix(a, "@") {
 
654			dom, err := dns.ParseDomain(a[1:])
 
655			xcheckf(ctx, err, "parsing destination address for account")
 
656			ma = MessageAddress{Domain: dom}
 
658			addr, err := smtp.ParseAddress(a)
 
659			xcheckf(ctx, err, "parsing destination address for account")
 
660			ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
 
662		addresses = append(addresses, ma)
 
664	// User is allowed to send using alias address as message From address. Webmail
 
665	// will choose it when replying to a message sent to that address.
 
666	aliasAddrs := map[MessageAddress]bool{}
 
667	for _, a := range accConf.Aliases {
 
668		if a.Alias.AllowMsgFrom {
 
669			ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
 
671				addresses = append(addresses, ma)
 
673			aliasAddrs[ma] = true
 
677	// We implicitly start a query. We use the reqctx for the transaction, because the
 
678	// transaction is passed to the query, which can be canceled.
 
679	reqctx, reqctxcancel := context.WithCancel(ctx)
 
681		// We also cancel in cancelDrain later on, but there is a brief window where the
 
682		// context wouldn't be canceled.
 
683		if reqctxcancel != nil {
 
689	// qtx is kept around during connection initialization, until we pass it off to the
 
690	// goroutine that starts querying for messages.
 
694			err := qtx.Rollback()
 
695			log.Check(err, "rolling back")
 
699	var mbl []store.Mailbox
 
700	settings := store.Settings{ID: 1}
 
702	// We only take the rlock when getting the tx.
 
703	acc.WithRLock(func() {
 
704		// Now a read-only transaction we'll use during the query.
 
705		qtx, err = acc.DB.Begin(reqctx, false)
 
706		xcheckf(ctx, err, "begin transaction")
 
708		mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
 
709		xcheckf(ctx, err, "list mailboxes")
 
711		err = qtx.Get(&settings)
 
712		xcheckf(ctx, err, "get settings")
 
715	// Find the designated mailbox if a mailbox name is set, or there are no filters at all.
 
716	var zerofilter Filter
 
717	var zeronotfilter NotFilter
 
718	var mailbox store.Mailbox
 
719	var mailboxPrefixes []string
 
720	var matchMailboxes bool
 
721	mailboxIDs := map[int64]bool{}
 
722	mailboxName := req.Query.Filter.MailboxName
 
723	if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
 
724		if mailboxName == "" {
 
725			mailboxName = "Inbox"
 
728		var inbox store.Mailbox
 
729		for _, e := range mbl {
 
730			if e.Name == mailboxName {
 
733			if e.Name == "Inbox" {
 
741			xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
 
743		req.Query.Filter.MailboxID = mailbox.ID
 
744		req.Query.Filter.MailboxName = ""
 
745		mailboxPrefixes = []string{mailbox.Name + "/"}
 
746		matchMailboxes = true
 
747		mailboxIDs[mailbox.ID] = true
 
749		matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
 
751	if req.Query.Filter.MailboxChildrenIncluded {
 
752		xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
 
755	// todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
 
757	sse := sseRegister(acc.Name)
 
758	defer sse.unregister()
 
760	// Per-domain localpart config so webclient can decide if an address belongs to the account.
 
761	domainAddressConfigs := map[string]DomainAddressConfig{}
 
762	for _, a := range addresses {
 
763		dom, _ := mox.Conf.Domain(a.Domain)
 
764		domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
 
767	// Write first event, allowing client to fill its UI with mailboxes.
 
768	start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
 
769	writer.xsendEvent(ctx, log, "start", start)
 
771	// The goroutine doing the querying will send messages on these channels, which
 
772	// result in an event being written on the SSE connection.
 
773	viewMsgsc := make(chan EventViewMsgs)
 
774	viewErrc := make(chan EventViewErr)
 
775	viewResetc := make(chan EventViewReset)
 
776	donec := make(chan int64) // When request is done.
 
778	// Start a view, it determines if we send a change to the client. And start an
 
779	// implicit query for messages, we'll send the messages to the client which can
 
780	// fill its ui with messages.
 
781	v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
 
782	go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
 
783	qtx = nil // viewRequestTx closes qtx
 
785	// When canceling a query, we must drain its messages until it says it is done.
 
786	// Otherwise the sending goroutine would hang indefinitely on a channel send.
 
787	cancelDrain := func() {
 
788		if reqctxcancel != nil {
 
789			// Cancel the goroutine doing the querying.
 
797		// Drain events until done.
 
809	// If we stop and a query is in progress, we must drain the channel it will send on.
 
812	// Changes broadcasted by other connections on this account. If applicable for the
 
813	// connection/view, we send events.
 
814	xprocessChanges := func(changes []store.Change) {
 
815		taggedChanges := [][2]any{}
 
817		newPreviews := map[int64]string{}
 
818		defer storeNewPreviews(ctx, log, acc, newPreviews)
 
820		// We get a transaction first time we need it.
 
824				err := xtx.Rollback()
 
825				log.Check(err, "rolling back transaction")
 
828		ensureTx := func() error {
 
835			xtx, err = acc.DB.Begin(ctx, false)
 
838		// This getmsg will now only be called mailboxID+UID, not with messageID set.
 
839		// todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
 
840		getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
 
841			if err := ensureTx(); err != nil {
 
842				return store.Message{}, fmt.Errorf("transaction: %v", err)
 
844			return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
 
847		// Additional headers from settings to add to MessageItems.
 
848		var moreHeaders []string
 
849		xmoreHeaders := func() []string {
 
851			xcheckf(ctx, err, "transaction")
 
853			moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
 
854			xcheckf(ctx, err, "ensuring more headers")
 
858		// Return uids that are within range in view. Because the end has been reached, or
 
859		// because the UID is not after the last message.
 
860		xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
 
861			uidsAny := make([]any, len(uids))
 
862			for i, uid := range uids {
 
866			xcheckf(ctx, err, "transaction")
 
867			q := bstore.QueryTx[store.Message](xtx)
 
868			q.FilterNonzero(store.Message{MailboxID: mailboxID})
 
869			q.FilterEqual("UID", uidsAny...)
 
870			mbOK := v.matchesMailbox(mailboxID)
 
871			err = q.ForEach(func(m store.Message) error {
 
872				_, thread := v.threadIDs[m.ThreadID]
 
873				if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
 
874					changedUIDs = append(changedUIDs, m.UID)
 
878			xcheckf(ctx, err, "fetching messages for change")
 
882		// Forward changes that are relevant to the current view.
 
883		for _, change := range changes {
 
884			switch c := change.(type) {
 
885			case store.ChangeAddUID:
 
886				ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
 
887				xcheckf(ctx, err, "matching new message against view")
 
888				m, err := getmsg(0, c.MailboxID, c.UID)
 
889				xcheckf(ctx, err, "get message")
 
890				_, thread := v.threadIDs[m.ThreadID]
 
895				state := msgState{acc: acc, log: log, newPreviews: newPreviews}
 
896				mi, err := messageItem(log, m, &state, xmoreHeaders())
 
898				xcheckf(ctx, err, "make messageitem")
 
901				mil := []MessageItem{mi}
 
902				if !thread && req.Query.Threading != ThreadOff {
 
904					xcheckf(ctx, err, "transaction")
 
905					more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
 
906					xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
 
907					mil = append(mil, more...)
 
908					v.threadIDs[m.ThreadID] = struct{}{}
 
911				taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
 
913				// If message extends the view, store it as such.
 
914				if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
 
915					v.LastMessageReceived = m.Received
 
918			case store.ChangeRemoveUIDs:
 
921				// We may send changes for uids the client doesn't know, that's fine.
 
922				changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
 
923				if len(changedUIDs) == 0 {
 
926				ch := ChangeMsgRemove{c}
 
927				ch.UIDs = changedUIDs
 
928				taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
 
930			case store.ChangeFlags:
 
931				// We may send changes for uids the client doesn't know, that's fine.
 
932				changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
 
933				if len(changedUIDs) == 0 {
 
936				ch := ChangeMsgFlags{c}
 
937				ch.UID = changedUIDs[0]
 
938				taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
 
940			case store.ChangeThread:
 
941				// Change in muted/collaped state, just always ship it.
 
942				taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
 
944			case store.ChangeRemoveMailbox:
 
945				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
 
947			case store.ChangeAddMailbox:
 
948				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
 
950			case store.ChangeRenameMailbox:
 
951				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
 
953			case store.ChangeMailboxCounts:
 
954				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
 
956			case store.ChangeMailboxSpecialUse:
 
957				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
 
959			case store.ChangeMailboxKeywords:
 
960				taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
 
962			case store.ChangeAddSubscription, store.ChangeRemoveSubscription:
 
963				// Webmail does not care about subscriptions.
 
965			case store.ChangeAnnotation:
 
969				panic(fmt.Sprintf("missing case for change %T", c))
 
973		if len(taggedChanges) > 0 {
 
974			viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
 
975			writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
 
979	timer := time.NewTimer(5 * time.Minute) // For keepalives.
 
983			timer.Reset(5 * time.Minute)
 
987		pending := comm.Pending
 
993		case <-mox.Shutdown.Done():
 
994			writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
 
995			// Work around go vet, it doesn't see defer cancelDrain.
 
996			if reqctxcancel != nil {
 
1002			_, err := fmt.Fprintf(out, ": keepalive\n\n")
 
1007				log.Errorx("write keepalive", err)
 
1008				// Work around go vet, it doesn't see defer cancelDrain.
 
1009				if reqctxcancel != nil {
 
1016		case vm := <-viewMsgsc:
 
1017			if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
 
1018				panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
 
1023			if len(vm.MessageItems) > 0 {
 
1024				v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
 
1026			writer.xsendEvent(ctx, log, "viewMsgs", vm)
 
1028		case ve := <-viewErrc:
 
1029			if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
 
1030				panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
 
1032			if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
 
1033				// Work around go vet, it doesn't see defer cancelDrain.
 
1034				if reqctxcancel != nil {
 
1039			writer.xsendEvent(ctx, log, "viewErr", ve)
 
1041		case vr := <-viewResetc:
 
1042			if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
 
1043				panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
 
1045			writer.xsendEvent(ctx, log, "viewReset", vr)
 
1048			if id != v.Request.ID {
 
1049				panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
 
1051			if reqctxcancel != nil {
 
1057		case req := <-sse.Request:
 
1062				v = view{req, time.Time{}, false, false, nil, nil}
 
1066			reqctx, reqctxcancel = context.WithCancel(ctx)
 
1068			stop := func() (stop bool) {
 
1069				// rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
 
1074						err = rtx.Rollback()
 
1075						log.Check(err, "rolling back transaction")
 
1078				acc.WithRLock(func() {
 
1079					rtx, err = acc.DB.Begin(reqctx, false)
 
1086					if errors.Is(err, context.Canceled) {
 
1089					err := fmt.Errorf("begin transaction: %v", err)
 
1090					viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
 
1091					writer.xsendEvent(ctx, log, "viewErr", viewErr)
 
1095				// Reset view state for new query.
 
1096				if req.ViewID != v.Request.ViewID {
 
1097					matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
 
1098					if req.Query.Filter.MailboxChildrenIncluded {
 
1099						xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
 
1101					v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
 
1105				go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
 
1114			overflow, changes := comm.Get()
 
1116				writer.xsendEvent(ctx, log, "fatalErr", "out of sync, too many pending changes")
 
1119			xprocessChanges(changes)
 
1122			// Work around go vet, it doesn't see defer cancelDrain.
 
1123			if reqctxcancel != nil {
 
1131// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
 
1132// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
 
1133// mailboxIDs must or must not match. mailboxPrefixes is for use with
 
1134// xgatherMailboxIDs to gather children of the mailboxIDs.
 
1135func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
 
1136	matchMailboxes = true
 
1137	mailboxIDs = map[int64]bool{}
 
1138	if f.MailboxID == -1 {
 
1139		matchMailboxes = false
 
1140		// Add the trash, junk and account rejects mailbox.
 
1141		err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
 
1142			if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
 
1143				mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
 
1144				mailboxIDs[mb.ID] = true
 
1148		xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
 
1149	} else if f.MailboxID > 0 {
 
1150		mb, err := store.MailboxID(tx, f.MailboxID)
 
1151		xcheckf(ctx, err, "get mailbox")
 
1152		mailboxIDs[f.MailboxID] = true
 
1153		mailboxPrefixes = []string{mb.Name + "/"}
 
1158// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
 
1159// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
 
1160func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
 
1161	// Gather more mailboxes to filter on, based on mailboxPrefixes.
 
1162	if len(mailboxPrefixes) == 0 {
 
1165	err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
 
1166		for _, p := range mailboxPrefixes {
 
1167			if strings.HasPrefix(mb.Name, p) {
 
1168				mailboxIDs[mb.ID] = true
 
1174	xcheckf(ctx, err, "gathering mailboxes")
 
1177// matchesMailbox returns whether a mailbox matches the view.
 
1178func (v view) matchesMailbox(mailboxID int64) bool {
 
1179	return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
 
1182// inRange returns whether m is within the range for the view, whether a change for
 
1183// this message should be sent to the client so it can update its state.
 
1184func (v view) inRange(m store.Message) bool {
 
1185	return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
 
1188// matches checks if the message, identified by either messageID or mailboxID+UID,
 
1189// is in the current "view" (i.e. passing the filters, and if checkRange is set
 
1190// also if within the range of sent messages based on sort order and the last seen
 
1191// message). getmsg retrieves the message, which may be necessary depending on the
 
1192// active filters. Used to determine if a store.Change with a new message should be
 
1193// sent, and for the destination and anchor messages in view requests.
 
1194func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
 
1196	ensureMessage := func() bool {
 
1197		if m.ID == 0 && rerr == nil {
 
1198			m, rerr = getmsg(messageID, mailboxID, uid)
 
1203	q := v.Request.Query
 
1205	// Warning: Filters must be kept in sync between queryMessage and view.matches.
 
1208	if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
 
1211	// note: anchorMessageID is not relevant for matching.
 
1212	flagfilter := q.flagFilterFn()
 
1213	if flagfilter != nil && !flagfilter(flags, keywords) {
 
1217	if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
 
1220	if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
 
1224	if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
 
1227	if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
 
1231	state := msgState{acc: acc, log: log}
 
1233		if rerr == nil && state.err != nil {
 
1239	attachmentFilter := q.attachmentFilterFn(log, acc, &state)
 
1240	if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
 
1244	envFilter := q.envFilterFn(log, &state)
 
1245	if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
 
1249	headerFilter := q.headerFilterFn(log, &state)
 
1250	if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
 
1254	wordsFilter := q.wordsFilterFn(log, &state)
 
1255	if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
 
1259	// Now check that we are either within the sorting order, or "last" was sent.
 
1260	if !checkRange || v.End || ensureMessage() && v.inRange(m) {
 
1266type msgResp struct {
 
1267	err     error          // If set, an error happened and fields below are not set.
 
1268	reset   bool           // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
 
1269	viewEnd bool           // If set, the last message for the view was seen, no more should be requested, fields below not set.
 
1270	mil     []MessageItem  // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
 
1271	pm      *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
 
1274func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
 
1275	if len(newPreviews) == 0 {
 
1282			log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
 
1284			metrics.PanicInc(metrics.Store)
 
1288	err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
 
1289		for id, preview := range newPreviews {
 
1290			m := store.Message{ID: id}
 
1291			if err := tx.Get(&m); err != nil {
 
1292				return fmt.Errorf("get message with id %d to store preview: %w", id, err)
 
1293			} else if !m.Expunged {
 
1294				m.Preview = &preview
 
1295				if err := tx.Update(&m); err != nil {
 
1296					return fmt.Errorf("updating message with id %d: %v", m.ID, err)
 
1302	log.Check(err, "saving new previews with messages")
 
1305// viewRequestTx executes a request (query with filters, pagination) by
 
1306// launching a new goroutine with queryMessages, receiving results as msgResp,
 
1307// and sending Event* to the SSE connection.
 
1309// It always closes tx.
 
1310func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
 
1311	// Newly generated previews which we'll save when the operation is done.
 
1312	newPreviews := map[int64]string{}
 
1315		err := tx.Rollback()
 
1316		log.Check(err, "rolling back query transaction")
 
1318		donec <- v.Request.ID
 
1320		// ctx can be canceled, we still want to store the previews.
 
1321		storeNewPreviews(context.Background(), log, acc, newPreviews)
 
1323		x := recover() // Should not happen, but don't take program down if it does.
 
1325			log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
 
1327			metrics.PanicInc(metrics.Webmailrequest)
 
1331	var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
 
1332	var parsedMessage *ParsedMessage
 
1335	var immediate bool // No waiting, flush immediate.
 
1336	t := time.NewTimer(300 * time.Millisecond)
 
1339	sendViewMsgs := func(force bool) {
 
1340		if len(msgitems) == 0 && !force {
 
1345		msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
 
1348		t.Reset(300 * time.Millisecond)
 
1351	// todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
 
1353	mrc := make(chan msgResp, 1)
 
1354	go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
 
1358		case mr, ok := <-mrc:
 
1361				// Empty message list signals this query is done.
 
1362				msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
 
1367				errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
 
1371				resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
 
1380			msgitems = append(msgitems, mr.mil)
 
1382				parsedMessage = mr.pm
 
1389			if len(msgitems) == 0 {
 
1390				// Nothing to send yet. We'll send immediately when the next message comes in.
 
1399// queryMessages executes a query, with filter, pagination, destination message id
 
1400// to fetch (the message that the client had in view and wants to display again).
 
1401// It sends on msgc, with several types of messages: errors, whether the view is
 
1402// reset due to missing AnchorMessageID, and when the end of the view was reached
 
1403// and/or for a message.
 
1404// newPreviews is filled with previews, the caller must save them.
 
1405func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
 
1407		x := recover() // Should not happen, but don't take program down if it does.
 
1409			log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
 
1411			mrc <- msgResp{err: fmt.Errorf("query failed")}
 
1412			metrics.PanicInc(metrics.Webmailquery)
 
1418	query := v.Request.Query
 
1419	page := v.Request.Page
 
1421	// Warning: Filters must be kept in sync between queryMessage and view.matches.
 
1423	checkMessage := func(id int64) (valid bool, rerr error) {
 
1424		m := store.Message{ID: id}
 
1426		if err == bstore.ErrAbsent || err == nil && m.Expunged {
 
1428		} else if err != nil {
 
1431			return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
 
1437	// Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
 
1438	if page.AnchorMessageID > 0 {
 
1439		// Check if message exists and (still) matches the filter.
 
1440		// todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
 
1441		if valid, err := checkMessage(page.AnchorMessageID); err != nil {
 
1442			mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
 
1445			mrc <- msgResp{reset: true}
 
1446			page.AnchorMessageID = 0
 
1450	// Check if page.DestMessageID exists and matches filter. If not, we will ignore
 
1451	// it instead of continuing to send message till the end of the view.
 
1452	if page.DestMessageID > 0 {
 
1453		if valid, err := checkMessage(page.DestMessageID); err != nil {
 
1454			mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
 
1457			page.DestMessageID = 0
 
1461	// todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
 
1463	q := bstore.QueryTx[store.Message](tx)
 
1464	q.FilterEqual("Expunged", false)
 
1465	if len(v.mailboxIDs) > 0 {
 
1466		if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
 
1467			// Should result in fast indexed query.
 
1468			for mbID := range v.mailboxIDs {
 
1469				q.FilterNonzero(store.Message{MailboxID: mbID})
 
1472			idsAny := make([]any, 0, len(v.mailboxIDs))
 
1473			for mbID := range v.mailboxIDs {
 
1474				idsAny = append(idsAny, mbID)
 
1476			if v.matchMailboxIDs {
 
1477				q.FilterEqual("MailboxID", idsAny...)
 
1479				q.FilterNotEqual("MailboxID", idsAny...)
 
1484	// If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
 
1485	if page.AnchorMessageID > 0 {
 
1487		q.FilterFn(func(m store.Message) bool {
 
1491			seen = m.ID == page.AnchorMessageID
 
1496	// We may be added filters the the query below. The FilterFn signature does not
 
1497	// implement reporting errors, or anything else, just a bool. So when making the
 
1498	// filter functions, we give them a place to store parsed message state, and an
 
1499	// error. We check the error during and after query execution.
 
1500	state := msgState{acc: acc, log: log, newPreviews: newPreviews}
 
1503	flagfilter := query.flagFilterFn()
 
1504	if flagfilter != nil {
 
1505		q.FilterFn(func(m store.Message) bool {
 
1506			return flagfilter(m.Flags, m.Keywords)
 
1510	if query.Filter.Oldest != nil {
 
1511		q.FilterGreaterEqual("Received", *query.Filter.Oldest)
 
1513	if query.Filter.Newest != nil {
 
1514		q.FilterLessEqual("Received", *query.Filter.Newest)
 
1517	if query.Filter.SizeMin > 0 {
 
1518		q.FilterGreaterEqual("Size", query.Filter.SizeMin)
 
1520	if query.Filter.SizeMax > 0 {
 
1521		q.FilterLessEqual("Size", query.Filter.SizeMax)
 
1524	attachmentFilter := query.attachmentFilterFn(log, acc, &state)
 
1525	if attachmentFilter != nil {
 
1526		q.FilterFn(attachmentFilter)
 
1529	envFilter := query.envFilterFn(log, &state)
 
1530	if envFilter != nil {
 
1531		q.FilterFn(envFilter)
 
1534	headerFilter := query.headerFilterFn(log, &state)
 
1535	if headerFilter != nil {
 
1536		q.FilterFn(headerFilter)
 
1539	wordsFilter := query.wordsFilterFn(log, &state)
 
1540	if wordsFilter != nil {
 
1541		q.FilterFn(wordsFilter)
 
1544	var moreHeaders []string // From store.Settings.ShowHeaders
 
1547		q.SortAsc("Received")
 
1549		q.SortDesc("Received")
 
1551	found := page.DestMessageID <= 0
 
1554	err := q.ForEach(func(m store.Message) error {
 
1555		// Check for an error in one of the filters, propagate it.
 
1556		if state.err != nil {
 
1560		if have >= page.Count && found || have > 10000 {
 
1562			return bstore.StopForEach
 
1565		if _, ok := v.threadIDs[m.ThreadID]; ok {
 
1566			// Message was already returned as part of a thread.
 
1570		var pm *ParsedMessage
 
1571		if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
 
1572			// For threads, if there was no DestMessageID, we may be getting the newest
 
1573			// message. For an initial view, this isn't necessarily the first the user is
 
1574			// expected to read first, that would be the first unread, which we'll get below
 
1575			// when gathering the thread.
 
1577			xpm, err := parsedMessage(log, &m, &state, true, false, false)
 
1578			if err != nil && errors.Is(err, message.ErrHeader) {
 
1579				log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
 
1580			} else if err != nil {
 
1581				return fmt.Errorf("parsing message %d: %v", m.ID, err)
 
1588		moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
 
1590			return fmt.Errorf("ensuring more headers: %v", err)
 
1593		mi, err := messageItem(log, m, &state, moreHeaders)
 
1595			return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
 
1597		mil := []MessageItem{mi}
 
1598		if query.Threading != ThreadOff {
 
1599			more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
 
1601				return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
 
1607			mil = append(mil, more...)
 
1608			v.threadIDs[m.ThreadID] = struct{}{}
 
1610			// Calculate how many messages the frontend is going to show, and only count those as returned.
 
1611			collapsed := map[int64]bool{}
 
1612			for _, mi := range mil {
 
1613				collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
 
1615			unread := map[int64]bool{} // Propagated to thread root.
 
1616			if query.Threading == ThreadUnread {
 
1617				for _, mi := range mil {
 
1622					unread[mm.ID] = true
 
1623					for _, id := range mm.ThreadParentIDs {
 
1628			for _, mi := range mil {
 
1632				for _, id := range mm.ThreadParentIDs {
 
1633					if _, ok := collapsed[id]; ok {
 
1638				if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
 
1645		if pm != nil && len(pm.envelope.From) == 1 {
 
1646			pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
 
1648				return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
 
1651		mrc <- msgResp{mil: mil, pm: pm}
 
1654	// Check for an error in one of the filters again. Check in ForEach would not
 
1655	// trigger if the last message has the error.
 
1656	if err == nil && state.err != nil {
 
1660		mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
 
1664		mrc <- msgResp{viewEnd: true}
 
1668func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
 
1669	if m.ThreadID == 0 {
 
1670		// If we would continue, FilterNonzero would fail because there are no non-zero fields.
 
1671		return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
 
1674	// Fetch other messages for this thread.
 
1675	qt := bstore.QueryTx[store.Message](tx)
 
1676	qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
 
1677	qt.FilterEqual("Expunged", false)
 
1678	qt.FilterNotEqual("ID", m.ID)
 
1680	tml, err := qt.List()
 
1682		return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
 
1685	var mil []MessageItem
 
1686	var pm *ParsedMessage
 
1687	var firstUnread bool
 
1688	for _, tm := range tml {
 
1689		err := func() error {
 
1690			xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
 
1691			defer xstate.clear()
 
1693			mi, err := messageItem(log, tm, &xstate, moreHeaders)
 
1695				return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
 
1697			mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
 
1701				return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
 
1703			mil = append(mil, mi)
 
1705			if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
 
1706				firstUnread = !tm.Seen
 
1707				xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
 
1708				if err != nil && errors.Is(err, message.ErrHeader) {
 
1709					log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
 
1710				} else if err != nil {
 
1711					return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
 
1719			return nil, nil, err
 
1723	// Finally, the message that caused us to gather this thread (which is likely the
 
1724	// most recent message in the thread) could be the only unread message.
 
1725	if destMessageID == 0 && first && !m.Seen && !firstUnread {
 
1726		xstate := msgState{acc: acc, log: log}
 
1727		defer xstate.clear()
 
1728		xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
 
1729		if err != nil && errors.Is(err, message.ErrHeader) {
 
1730			log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
 
1731		} else if err != nil {
 
1732			return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
 
1741// While checking the filters on a message, we may need to get more message
 
1742// details as each filter passes. We check the filters that need the basic
 
1743// information first, and load and cache more details for the next filters.
 
1744// msgState holds parsed details for a message, it is updated while filtering,
 
1745// with more information or reset for a next message.
 
1746type msgState struct {
 
1747	acc  *store.Account // Never changes during lifetime.
 
1748	err  error          // Once set, doesn't get cleared.
 
1750	part *message.Part // Will be without Reader when msgr is nil.
 
1751	msgr *store.MsgReader
 
1754	// If not nil, messages will get their Preview field filled when nil, and message
 
1755	// id and preview added to newPreviews, and saved in a separate write transaction
 
1756	// when the operation is done.
 
1757	newPreviews map[int64]string
 
1760func (ms *msgState) clear() {
 
1762		err := ms.msgr.Close()
 
1763		ms.log.Check(err, "closing message reader from state")
 
1766	*ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
 
1769func (ms *msgState) ensureMsg(m store.Message) {
 
1770	if m.ID != ms.m.ID {
 
1776func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
 
1781			if m.ParsedBuf == nil {
 
1782				ms.err = fmt.Errorf("message %d not parsed", m.ID)
 
1786			if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
 
1787				ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
 
1792		if withMsgReader && ms.msgr == nil {
 
1793			ms.msgr = ms.acc.MessageReader(m)
 
1794			ms.part.SetReaderAt(ms.msgr)
 
1797	return ms.part != nil
 
1800// flagFilterFn returns a function that applies the flag/keyword/"label"-related
 
1801// filters for a query. A nil function is returned if there are no flags to filter
 
1803func (q Query) flagFilterFn() func(store.Flags, []string) bool {
 
1804	labels := map[string]bool{}
 
1805	for _, k := range q.Filter.Labels {
 
1808	for _, k := range q.NotFilter.Labels {
 
1812	if len(labels) == 0 {
 
1816	var mask, flags store.Flags
 
1817	systemflags := map[string][]*bool{
 
1818		`\answered`:  {&mask.Answered, &flags.Answered},
 
1819		`\flagged`:   {&mask.Flagged, &flags.Flagged},
 
1820		`\deleted`:   {&mask.Deleted, &flags.Deleted},
 
1821		`\seen`:      {&mask.Seen, &flags.Seen},
 
1822		`\draft`:     {&mask.Draft, &flags.Draft},
 
1823		`$junk`:      {&mask.Junk, &flags.Junk},
 
1824		`$notjunk`:   {&mask.Notjunk, &flags.Notjunk},
 
1825		`$forwarded`: {&mask.Forwarded, &flags.Forwarded},
 
1826		`$phishing`:  {&mask.Phishing, &flags.Phishing},
 
1827		`$mdnsent`:   {&mask.MDNSent, &flags.MDNSent},
 
1829	keywords := map[string]bool{}
 
1830	for k, v := range labels {
 
1831		k = strings.ToLower(k)
 
1832		if mf, ok := systemflags[k]; ok {
 
1839	return func(msgFlags store.Flags, msgKeywords []string) bool {
 
1841		if f.Set(mask, msgFlags) != flags {
 
1844		for k, v := range keywords {
 
1845			if slices.Contains(msgKeywords, k) != v {
 
1853// attachmentFilterFn returns a function that filters for the attachment-related
 
1854// filter from the query. A nil function is returned if there are attachment
 
1856func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
 
1857	if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
 
1861	return func(m store.Message) bool {
 
1862		if !state.ensurePart(m, true) {
 
1865		types, err := attachmentTypes(log, m, state)
 
1870		return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
 
1874var attachmentMimetypes = map[string]AttachmentType{
 
1875	"application/pdf":                                AttachmentPDF,
 
1876	"application/zip":                                AttachmentArchive,
 
1877	"application/x-rar-compressed":                   AttachmentArchive,
 
1878	"application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
 
1879	"application/vnd.ms-excel":                       AttachmentSpreadsheet,
 
1880	"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet":         AttachmentSpreadsheet,
 
1881	"application/vnd.oasis.opendocument.text":                                   AttachmentDocument,
 
1882	"application/vnd.oasis.opendocument.presentation":                           AttachmentPresentation,
 
1883	"application/vnd.ms-powerpoint":                                             AttachmentPresentation,
 
1884	"application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
 
1886var attachmentExtensions = map[string]AttachmentType{
 
1887	".pdf":     AttachmentPDF,
 
1888	".zip":     AttachmentArchive,
 
1889	".tar":     AttachmentArchive,
 
1890	".tgz":     AttachmentArchive,
 
1891	".tar.gz":  AttachmentArchive,
 
1892	".tbz2":    AttachmentArchive,
 
1893	".tar.bz2": AttachmentArchive,
 
1894	".tar.lz":  AttachmentArchive,
 
1895	".tlz":     AttachmentArchive,
 
1896	".tar.xz":  AttachmentArchive,
 
1897	".txz":     AttachmentArchive,
 
1898	".tar.zst": AttachmentArchive,
 
1899	".tar.lz4": AttachmentArchive,
 
1900	".7z":      AttachmentArchive,
 
1901	".rar":     AttachmentArchive,
 
1902	".ods":     AttachmentSpreadsheet,
 
1903	".xls":     AttachmentSpreadsheet,
 
1904	".xlsx":    AttachmentSpreadsheet,
 
1905	".odt":     AttachmentDocument,
 
1906	".doc":     AttachmentDocument,
 
1907	".docx":    AttachmentDocument,
 
1908	".odp":     AttachmentPresentation,
 
1909	".ppt":     AttachmentPresentation,
 
1910	".pptx":    AttachmentPresentation,
 
1913func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
 
1914	types := map[AttachmentType]bool{}
 
1916	pm, err := parsedMessage(log, &m, state, false, false, false)
 
1918		return nil, fmt.Errorf("parsing message for attachments: %w", err)
 
1920	for _, a := range pm.attachments {
 
1921		if a.Part.MediaType == "IMAGE" {
 
1922			types[AttachmentImage] = true
 
1925		mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
 
1926		if t, ok := attachmentMimetypes[mt]; ok {
 
1930		_, filename, err := a.Part.DispositionFilename()
 
1931		if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
 
1932			log.Debugx("parsing disposition/filename", err)
 
1933		} else if err != nil {
 
1934			return nil, fmt.Errorf("reading disposition/filename: %v", err)
 
1936		if ext := filepath.Ext(filename); ext != "" {
 
1937			if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
 
1943	if len(types) == 0 {
 
1944		types[AttachmentNone] = true
 
1946		types[AttachmentAny] = true
 
1951// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
 
1952// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
 
1953// clash with SMTP envelope) for the query. A nil function is returned if no
 
1954// filtering is needed.
 
1955func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
 
1956	if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
 
1960	lower := func(l []string) []string {
 
1964		r := make([]string, len(l))
 
1965		for i, s := range l {
 
1966			r[i] = strings.ToLower(s)
 
1971	filterSubject := lower(q.Filter.Subject)
 
1972	notFilterSubject := lower(q.NotFilter.Subject)
 
1973	filterFrom := lower(q.Filter.From)
 
1974	notFilterFrom := lower(q.NotFilter.From)
 
1975	filterTo := lower(q.Filter.To)
 
1976	notFilterTo := lower(q.NotFilter.To)
 
1978	return func(m store.Message) bool {
 
1979		if !state.ensurePart(m, false) {
 
1983		var env message.Envelope
 
1984		if state.part.Envelope != nil {
 
1985			env = *state.part.Envelope
 
1988		if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
 
1989			subject := strings.ToLower(env.Subject)
 
1990			for _, s := range filterSubject {
 
1991				if !strings.Contains(subject, s) {
 
1995			for _, s := range notFilterSubject {
 
1996				if strings.Contains(subject, s) {
 
2002		contains := func(textLower []string, l []message.Address, all bool) bool {
 
2004			for _, s := range textLower {
 
2005				for _, a := range l {
 
2006					name := strings.ToLower(a.Name)
 
2007					addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
 
2008					if strings.Contains(name, s) || strings.Contains(addr, s) {
 
2022		if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
 
2025		if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
 
2028		if len(filterTo) > 0 || len(notFilterTo) > 0 {
 
2029			to := slices.Concat(env.To, env.CC, env.BCC)
 
2030			if len(filterTo) > 0 && !contains(filterTo, to, true) {
 
2033			if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
 
2041// headerFilterFn returns a function that filters for the header filters in the
 
2042// query. A nil function is returned if there are no header filters.
 
2043func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
 
2044	if len(q.Filter.Headers) == 0 {
 
2048	lowerValues := make([]string, len(q.Filter.Headers))
 
2049	for i, t := range q.Filter.Headers {
 
2050		lowerValues[i] = strings.ToLower(t[1])
 
2053	return func(m store.Message) bool {
 
2054		if !state.ensurePart(m, true) {
 
2057		hdr, err := state.part.Header()
 
2059			state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
 
2064		for i, t := range q.Filter.Headers {
 
2068			if v == "" && len(l) > 0 {
 
2071			for _, e := range l {
 
2072				if strings.Contains(strings.ToLower(e), v) {
 
2082// wordFiltersFn returns a function that applies the word filters of the query. A
 
2083// nil function is returned when query does not contain a word filter.
 
2084func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
 
2085	if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
 
2089	ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
 
2091	return func(m store.Message) bool {
 
2092		if !state.ensurePart(m, true) {
 
2096		if ok, err := ws.MatchPart(log, state.part, true); err != nil {
 
2097			state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)