1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36)
37
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
40type Request struct {
41 ID int64
42
43 SSEID int64 // SSE connection.
44
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
47 ViewID int64
48
49 // If set, this request and its view are canceled. A new view must be started.
50 Cancel bool
51
52 Query Query
53 Page Page
54}
55
56type ThreadMode string
57
58const (
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
62)
63
64// Query is a request for messages that match filters, in a given order.
65type Query struct {
66 OrderAsc bool // Order by received ascending or desending.
67 Threading ThreadMode
68 Filter Filter
69 NotFilter NotFilter
70}
71
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
74
75const (
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
85)
86
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
89type Filter struct {
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
91 MailboxID int64
92
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
95
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
98 // MailboxID.
99 MailboxName string
100
101 Words []string // Case insensitive substring match for each string.
102 From []string
103 To []string // Including Cc and Bcc.
104 Oldest *time.Time
105 Newest *time.Time
106 Subject []string
107 Attachments AttachmentType
108 Labels []string
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
110 SizeMin int64
111 SizeMax int64
112}
113
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
116 Words []string
117 From []string
118 To []string
119 Subject []string
120 Attachments AttachmentType
121 Labels []string
122}
123
124// Page holds pagination parameters for a request.
125type Page struct {
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
129
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
131 // one request.
132 Count int
133
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
136 // message if found.
137 DestMessageID int64
138}
139
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143// included.
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
147 Domain dns.Domain
148}
149
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
154 Date time.Time
155 Subject string
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
159 To []MessageAddress
160 CC []MessageAddress
161 BCC []MessageAddress
162 InReplyTo string
163 MessageID string
164}
165
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
168// messages.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
173 IsSigned bool
174 IsEncrypted bool
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177}
178
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
181// not duplicated.
182type ParsedMessage struct {
183 ID int64
184 Part message.Part
185 Headers map[string][]string
186 ViewMode store.ViewMode
187
188 // Text parts, can be empty.
189 Texts []string
190
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
195 // attribute.
196 HasHTML bool
197
198 ListReplyAddress *MessageAddress // From List-Post.
199
200 // Information used by MessageItem, not exported in this type.
201 envelope MessageEnvelope
202 attachments []Attachment
203 isSigned bool
204 isEncrypted bool
205 firstLine string
206}
207
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
212 SSEID int64
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 MailboxName string
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
221 Version string
222}
223
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
226// account.
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparator string // Can be empty.
229 LocalpartCaseSensitive bool
230}
231
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
235 ViewID int64
236 RequestID int64
237
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
247
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
250
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
253 ViewEnd bool
254}
255
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
259 ViewID int64
260 RequestID int64
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
263}
264
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
270 ViewID int64
271 RequestID int64
272}
273
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
278 ViewID int64
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
280}
281
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
284 store.ChangeAddUID
285 MessageItems []MessageItem
286}
287
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
291}
292
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
295 store.ChangeFlags
296}
297
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
300 store.ChangeThread
301}
302
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
306}
307
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
311}
312
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
317}
318
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
322}
323
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
327}
328
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
333}
334
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
339type view struct {
340 Request Request
341
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
345
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
349 End bool
350
351 // Whether message must or must not match mailboxIDs.
352 matchMailboxIDs bool
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
356
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
362}
363
364// sses tracks all sse connections, and access to them.
365var sses = struct {
366 sync.Mutex
367 gen int64
368 m map[int64]sse
369}{m: map[int64]sse{}}
370
371// sse represents an sse connection.
372type sse struct {
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
376}
377
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
380 sses.Lock()
381 defer sses.Unlock()
382 delete(sses.m, sse.ID)
383
384 // Drain any pending requests, preventing blocked goroutines from API calls.
385 for {
386 select {
387 case <-sse.Request:
388 default:
389 return
390 }
391 }
392}
393
394func sseRegister(accountName string) sse {
395 sses.Lock()
396 defer sses.Unlock()
397 sses.gen++
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
399 sses.m[v.ID] = v
400 return v
401}
402
403// sseGet returns a reference to an existing connection if it exists and user
404// has access.
405func sseGet(id int64, accountName string) (sse, bool) {
406 sses.Lock()
407 defer sses.Unlock()
408 s := sses.m[id]
409 if s.AccountName != accountName {
410 return sse{}, false
411 }
412 return s, true
413}
414
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
419 accName string
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422 validUntil time.Time
423}
424
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
428 sync.Mutex
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
431}
432
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
436}
437
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
445
446 x.Lock()
447 defer x.Unlock()
448 n := len(x.accountTokens[accName])
449 if n >= 10 {
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
452 }
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
455 }
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
458 return st.token
459}
460
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
463 x.Lock()
464 defer x.Unlock()
465
466 st, ok := x.tokens[token]
467 if !ok {
468 return "", "", "", false, nil
469 }
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
473 } else {
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
478 }
479 }
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
482 }
483 return st.accName, st.address, st.sessionToken, true, nil
484}
485
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
487type ioErr struct {
488 err error
489}
490
491// serveEvents serves an SSE connection. Authentication is done through a query
492// string parameter "singleUseToken", a one-time-use token returned by the Token
493// API call.
494func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
495 if r.Method != "GET" {
496 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
497 return
498 }
499
500 flusher, ok := w.(http.Flusher)
501 if !ok {
502 log.Error("internal error: ResponseWriter not a http.Flusher")
503 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
504 return
505 }
506
507 q := r.URL.Query()
508 token := q.Get("singleUseToken")
509 if token == "" {
510 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
511 return
512 }
513 accName, address, sessionToken, ok, err := sseTokens.check(token)
514 if err != nil {
515 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
516 return
517 }
518 if !ok {
519 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
520 return
521 }
522 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
523 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
524 return
525 }
526
527 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
528 // incoming responses with its slow-network similation.
529 var waitMin, waitMax time.Duration
530 waitMinMsec := q.Get("waitMinMsec")
531 waitMaxMsec := q.Get("waitMaxMsec")
532 if waitMinMsec != "" && waitMaxMsec != "" {
533 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
534 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
535 return
536 } else {
537 waitMin = time.Duration(v) * time.Millisecond
538 }
539
540 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
541 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
542 return
543 } else {
544 waitMax = time.Duration(v) * time.Millisecond
545 }
546 }
547
548 // Parse the request with initial mailbox/search criteria.
549 var req Request
550 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
551 dec.DisallowUnknownFields()
552 if err := dec.Decode(&req); err != nil {
553 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
554 return
555 } else if req.Page.Count <= 0 {
556 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
557 return
558 }
559 if req.Query.Threading == "" {
560 req.Query.Threading = ThreadOff
561 }
562
563 var writer *eventWriter
564
565 metricSSEConnections.Inc()
566 defer metricSSEConnections.Dec()
567
568 // Below here, error handling cause through xcheckf, which panics with
569 // *sherpa.Error, after which we send an error event to the client. We can also get
570 // an *ioErr when the connection is broken.
571 defer func() {
572 x := recover()
573 if x == nil {
574 return
575 }
576 if err, ok := x.(*sherpa.Error); ok {
577 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
578 } else if _, ok := x.(ioErr); ok {
579 return
580 } else {
581 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
582 debug.PrintStack()
583 metrics.PanicInc(metrics.Webmail)
584 panic(x)
585 }
586 }()
587
588 h := w.Header()
589 h.Set("Content-Type", "text/event-stream")
590 h.Set("Cache-Control", "no-cache")
591
592 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
593 // keys), so should be quite compressible.
594 var out writeFlusher
595 gz := mox.AcceptsGzip(r)
596 if gz {
597 h.Set("Content-Encoding", "gzip")
598 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
599 } else {
600 out = nopFlusher{w}
601 }
602 out = httpFlusher{out, flusher}
603
604 // We'll be writing outgoing SSE events through writer.
605 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
606 defer writer.close()
607
608 // Fetch initial data.
609 acc, err := store.OpenAccount(log, accName)
610 xcheckf(ctx, err, "open account")
611 defer func() {
612 err := acc.Close()
613 log.Check(err, "closing account")
614 }()
615 comm := store.RegisterComm(acc)
616 defer comm.Unregister()
617
618 // List addresses that the client can use to send email from.
619 accConf, _ := acc.Conf()
620 loginAddr, err := smtp.ParseAddress(address)
621 xcheckf(ctx, err, "parsing login address")
622 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
623 xcheckf(ctx, err, "looking up destination for login address")
624 loginName := accConf.FullName
625 if dest.FullName != "" {
626 loginName = dest.FullName
627 }
628 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
629 var addresses []MessageAddress
630 for a, dest := range accConf.Destinations {
631 name := dest.FullName
632 if name == "" {
633 name = accConf.FullName
634 }
635 var ma MessageAddress
636 if strings.HasPrefix(a, "@") {
637 dom, err := dns.ParseDomain(a[1:])
638 xcheckf(ctx, err, "parsing destination address for account")
639 ma = MessageAddress{Domain: dom}
640 } else {
641 addr, err := smtp.ParseAddress(a)
642 xcheckf(ctx, err, "parsing destination address for account")
643 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
644 }
645 addresses = append(addresses, ma)
646 }
647 // User is allowed to send using alias address as message From address. Webmail
648 // will choose it when replying to a message sent to that address.
649 aliasAddrs := map[MessageAddress]bool{}
650 for _, a := range accConf.Aliases {
651 if a.Alias.AllowMsgFrom {
652 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
653 if !aliasAddrs[ma] {
654 addresses = append(addresses, ma)
655 }
656 aliasAddrs[ma] = true
657 }
658 }
659
660 // We implicitly start a query. We use the reqctx for the transaction, because the
661 // transaction is passed to the query, which can be canceled.
662 reqctx, reqctxcancel := context.WithCancel(ctx)
663 defer func() {
664 // We also cancel in cancelDrain later on, but there is a brief window where the
665 // context wouldn't be canceled.
666 if reqctxcancel != nil {
667 reqctxcancel()
668 reqctxcancel = nil
669 }
670 }()
671
672 // qtx is kept around during connection initialization, until we pass it off to the
673 // goroutine that starts querying for messages.
674 var qtx *bstore.Tx
675 defer func() {
676 if qtx != nil {
677 err := qtx.Rollback()
678 log.Check(err, "rolling back")
679 }
680 }()
681
682 var mbl []store.Mailbox
683 settings := store.Settings{ID: 1}
684
685 // We only take the rlock when getting the tx.
686 acc.WithRLock(func() {
687 // Now a read-only transaction we'll use during the query.
688 qtx, err = acc.DB.Begin(reqctx, false)
689 xcheckf(ctx, err, "begin transaction")
690
691 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
692 xcheckf(ctx, err, "list mailboxes")
693
694 err = qtx.Get(&settings)
695 xcheckf(ctx, err, "get settings")
696 })
697
698 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
699 var zerofilter Filter
700 var zeronotfilter NotFilter
701 var mailbox store.Mailbox
702 var mailboxPrefixes []string
703 var matchMailboxes bool
704 mailboxIDs := map[int64]bool{}
705 mailboxName := req.Query.Filter.MailboxName
706 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
707 if mailboxName == "" {
708 mailboxName = "Inbox"
709 }
710
711 var inbox store.Mailbox
712 for _, e := range mbl {
713 if e.Name == mailboxName {
714 mailbox = e
715 }
716 if e.Name == "Inbox" {
717 inbox = e
718 }
719 }
720 if mailbox.ID == 0 {
721 mailbox = inbox
722 }
723 if mailbox.ID == 0 {
724 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
725 }
726 req.Query.Filter.MailboxID = mailbox.ID
727 req.Query.Filter.MailboxName = ""
728 mailboxPrefixes = []string{mailbox.Name + "/"}
729 matchMailboxes = true
730 mailboxIDs[mailbox.ID] = true
731 } else {
732 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
733 }
734 if req.Query.Filter.MailboxChildrenIncluded {
735 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
736 }
737
738 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
739
740 sse := sseRegister(acc.Name)
741 defer sse.unregister()
742
743 // Per-domain localpart config so webclient can decide if an address belongs to the account.
744 domainAddressConfigs := map[string]DomainAddressConfig{}
745 for _, a := range addresses {
746 dom, _ := mox.Conf.Domain(a.Domain)
747 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
748 }
749
750 // Write first event, allowing client to fill its UI with mailboxes.
751 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
752 writer.xsendEvent(ctx, log, "start", start)
753
754 // The goroutine doing the querying will send messages on these channels, which
755 // result in an event being written on the SSE connection.
756 viewMsgsc := make(chan EventViewMsgs)
757 viewErrc := make(chan EventViewErr)
758 viewResetc := make(chan EventViewReset)
759 donec := make(chan int64) // When request is done.
760
761 // Start a view, it determines if we send a change to the client. And start an
762 // implicit query for messages, we'll send the messages to the client which can
763 // fill its ui with messages.
764 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
765 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
766 qtx = nil // viewRequestTx closes qtx
767
768 // When canceling a query, we must drain its messages until it says it is done.
769 // Otherwise the sending goroutine would hang indefinitely on a channel send.
770 cancelDrain := func() {
771 if reqctxcancel != nil {
772 // Cancel the goroutine doing the querying.
773 reqctxcancel()
774 reqctx = nil
775 reqctxcancel = nil
776 } else {
777 return
778 }
779
780 // Drain events until done.
781 for {
782 select {
783 case <-viewMsgsc:
784 case <-viewErrc:
785 case <-viewResetc:
786 case <-donec:
787 return
788 }
789 }
790 }
791
792 // If we stop and a query is in progress, we must drain the channel it will send on.
793 defer cancelDrain()
794
795 // Changes broadcasted by other connections on this account. If applicable for the
796 // connection/view, we send events.
797 xprocessChanges := func(changes []store.Change) {
798 taggedChanges := [][2]any{}
799
800 // We get a transaction first time we need it.
801 var xtx *bstore.Tx
802 defer func() {
803 if xtx != nil {
804 err := xtx.Rollback()
805 log.Check(err, "rolling back transaction")
806 }
807 }()
808 ensureTx := func() error {
809 if xtx != nil {
810 return nil
811 }
812 acc.RLock()
813 defer acc.RUnlock()
814 var err error
815 xtx, err = acc.DB.Begin(ctx, false)
816 return err
817 }
818 // This getmsg will now only be called mailboxID+UID, not with messageID set.
819 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
820 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
821 if err := ensureTx(); err != nil {
822 return store.Message{}, fmt.Errorf("transaction: %v", err)
823 }
824 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
825 }
826
827 // Return uids that are within range in view. Because the end has been reached, or
828 // because the UID is not after the last message.
829 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
830 uidsAny := make([]any, len(uids))
831 for i, uid := range uids {
832 uidsAny[i] = uid
833 }
834 err := ensureTx()
835 xcheckf(ctx, err, "transaction")
836 q := bstore.QueryTx[store.Message](xtx)
837 q.FilterNonzero(store.Message{MailboxID: mailboxID})
838 q.FilterEqual("UID", uidsAny...)
839 mbOK := v.matchesMailbox(mailboxID)
840 err = q.ForEach(func(m store.Message) error {
841 _, thread := v.threadIDs[m.ThreadID]
842 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
843 changedUIDs = append(changedUIDs, m.UID)
844 }
845 return nil
846 })
847 xcheckf(ctx, err, "fetching messages for change")
848 return changedUIDs
849 }
850
851 // Forward changes that are relevant to the current view.
852 for _, change := range changes {
853 switch c := change.(type) {
854 case store.ChangeAddUID:
855 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
856 xcheckf(ctx, err, "matching new message against view")
857 m, err := getmsg(0, c.MailboxID, c.UID)
858 xcheckf(ctx, err, "get message")
859 _, thread := v.threadIDs[m.ThreadID]
860 if !ok && !thread {
861 continue
862 }
863 state := msgState{acc: acc}
864 mi, err := messageItem(log, m, &state)
865 state.clear()
866 xcheckf(ctx, err, "make messageitem")
867 mi.MatchQuery = ok
868
869 mil := []MessageItem{mi}
870 if !thread && req.Query.Threading != ThreadOff {
871 err := ensureTx()
872 xcheckf(ctx, err, "transaction")
873 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
874 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
875 mil = append(mil, more...)
876 v.threadIDs[m.ThreadID] = struct{}{}
877 }
878
879 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
880
881 // If message extends the view, store it as such.
882 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
883 v.LastMessageReceived = m.Received
884 }
885
886 case store.ChangeRemoveUIDs:
887 // We may send changes for uids the client doesn't know, that's fine.
888 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
889 if len(changedUIDs) == 0 {
890 continue
891 }
892 ch := ChangeMsgRemove{c}
893 ch.UIDs = changedUIDs
894 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
895
896 case store.ChangeFlags:
897 // We may send changes for uids the client doesn't know, that's fine.
898 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
899 if len(changedUIDs) == 0 {
900 continue
901 }
902 ch := ChangeMsgFlags{c}
903 ch.UID = changedUIDs[0]
904 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
905
906 case store.ChangeThread:
907 // Change in muted/collaped state, just always ship it.
908 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
909
910 case store.ChangeRemoveMailbox:
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
912
913 case store.ChangeAddMailbox:
914 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
915
916 case store.ChangeRenameMailbox:
917 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
918
919 case store.ChangeMailboxCounts:
920 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
921
922 case store.ChangeMailboxSpecialUse:
923 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
924
925 case store.ChangeMailboxKeywords:
926 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
927
928 case store.ChangeAddSubscription:
929 // Webmail does not care about subscriptions.
930
931 default:
932 panic(fmt.Sprintf("missing case for change %T", c))
933 }
934 }
935
936 if len(taggedChanges) > 0 {
937 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
938 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
939 }
940 }
941
942 timer := time.NewTimer(5 * time.Minute) // For keepalives.
943 defer timer.Stop()
944 for {
945 if writer.wrote {
946 timer.Reset(5 * time.Minute)
947 writer.wrote = false
948 }
949
950 pending := comm.Pending
951 if reqctx != nil {
952 pending = nil
953 }
954
955 select {
956 case <-mox.Shutdown.Done():
957 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
958 // Work around go vet, it doesn't see defer cancelDrain.
959 if reqctxcancel != nil {
960 reqctxcancel()
961 }
962 return
963
964 case <-timer.C:
965 _, err := fmt.Fprintf(out, ": keepalive\n\n")
966 if err != nil {
967 log.Errorx("write keepalive", err)
968 // Work around go vet, it doesn't see defer cancelDrain.
969 if reqctxcancel != nil {
970 reqctxcancel()
971 }
972 return
973 }
974 out.Flush()
975 writer.wrote = true
976
977 case vm := <-viewMsgsc:
978 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
979 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
980 }
981 if vm.ViewEnd {
982 v.End = true
983 }
984 if len(vm.MessageItems) > 0 {
985 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
986 }
987 writer.xsendEvent(ctx, log, "viewMsgs", vm)
988
989 case ve := <-viewErrc:
990 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
991 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
992 }
993 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
994 // Work around go vet, it doesn't see defer cancelDrain.
995 if reqctxcancel != nil {
996 reqctxcancel()
997 }
998 return
999 }
1000 writer.xsendEvent(ctx, log, "viewErr", ve)
1001
1002 case vr := <-viewResetc:
1003 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1004 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1005 }
1006 writer.xsendEvent(ctx, log, "viewReset", vr)
1007
1008 case id := <-donec:
1009 if id != v.Request.ID {
1010 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1011 }
1012 if reqctxcancel != nil {
1013 reqctxcancel()
1014 }
1015 reqctx = nil
1016 reqctxcancel = nil
1017
1018 case req := <-sse.Request:
1019 if reqctx != nil {
1020 cancelDrain()
1021 }
1022 if req.Cancel {
1023 v = view{req, time.Time{}, false, false, nil, nil}
1024 continue
1025 }
1026
1027 reqctx, reqctxcancel = context.WithCancel(ctx)
1028
1029 stop := func() (stop bool) {
1030 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1031 var rtx *bstore.Tx
1032 var err error
1033 defer func() {
1034 if rtx != nil {
1035 err = rtx.Rollback()
1036 log.Check(err, "rolling back transaction")
1037 }
1038 }()
1039 acc.WithRLock(func() {
1040 rtx, err = acc.DB.Begin(reqctx, false)
1041 })
1042 if err != nil {
1043 reqctxcancel()
1044 reqctx = nil
1045 reqctxcancel = nil
1046
1047 if errors.Is(err, context.Canceled) {
1048 return true
1049 }
1050 err := fmt.Errorf("begin transaction: %v", err)
1051 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1052 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1053 return false
1054 }
1055
1056 // Reset view state for new query.
1057 if req.ViewID != v.Request.ViewID {
1058 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1059 if req.Query.Filter.MailboxChildrenIncluded {
1060 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1061 }
1062 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1063 } else {
1064 v.Request = req
1065 }
1066 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1067 rtx = nil
1068 return false
1069 }()
1070 if stop {
1071 return
1072 }
1073
1074 case <-pending:
1075 xprocessChanges(comm.Get())
1076
1077 case <-ctx.Done():
1078 // Work around go vet, it doesn't see defer cancelDrain.
1079 if reqctxcancel != nil {
1080 reqctxcancel()
1081 }
1082 return
1083 }
1084 }
1085}
1086
1087// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1088// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1089// mailboxIDs must or must not match. mailboxPrefixes is for use with
1090// xgatherMailboxIDs to gather children of the mailboxIDs.
1091func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1092 matchMailboxes = true
1093 mailboxIDs = map[int64]bool{}
1094 if f.MailboxID == -1 {
1095 matchMailboxes = false
1096 // Add the trash, junk and account rejects mailbox.
1097 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1098 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1099 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1100 mailboxIDs[mb.ID] = true
1101 }
1102 return nil
1103 })
1104 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1105 } else if f.MailboxID > 0 {
1106 mb := store.Mailbox{ID: f.MailboxID}
1107 err := tx.Get(&mb)
1108 xcheckf(ctx, err, "get mailbox")
1109 mailboxIDs[f.MailboxID] = true
1110 mailboxPrefixes = []string{mb.Name + "/"}
1111 }
1112 return
1113}
1114
1115// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1116// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1117func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1118 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1119 if len(mailboxPrefixes) == 0 {
1120 return
1121 }
1122 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1123 for _, p := range mailboxPrefixes {
1124 if strings.HasPrefix(mb.Name, p) {
1125 mailboxIDs[mb.ID] = true
1126 break
1127 }
1128 }
1129 return nil
1130 })
1131 xcheckf(ctx, err, "gathering mailboxes")
1132}
1133
1134// matchesMailbox returns whether a mailbox matches the view.
1135func (v view) matchesMailbox(mailboxID int64) bool {
1136 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1137}
1138
1139// inRange returns whether m is within the range for the view, whether a change for
1140// this message should be sent to the client so it can update its state.
1141func (v view) inRange(m store.Message) bool {
1142 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1143}
1144
1145// matches checks if the message, identified by either messageID or mailboxID+UID,
1146// is in the current "view" (i.e. passing the filters, and if checkRange is set
1147// also if within the range of sent messages based on sort order and the last seen
1148// message). getmsg retrieves the message, which may be necessary depending on the
1149// active filters. Used to determine if a store.Change with a new message should be
1150// sent, and for the destination and anchor messages in view requests.
1151func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1152 var m store.Message
1153 ensureMessage := func() bool {
1154 if m.ID == 0 && rerr == nil {
1155 m, rerr = getmsg(messageID, mailboxID, uid)
1156 }
1157 return rerr == nil
1158 }
1159
1160 q := v.Request.Query
1161
1162 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1163
1164 // Check filters.
1165 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1166 return false, rerr
1167 }
1168 // note: anchorMessageID is not relevant for matching.
1169 flagfilter := q.flagFilterFn()
1170 if flagfilter != nil && !flagfilter(flags, keywords) {
1171 return false, rerr
1172 }
1173
1174 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1175 return false, rerr
1176 }
1177 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1178 return false, rerr
1179 }
1180
1181 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1182 return false, rerr
1183 }
1184 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1185 return false, rerr
1186 }
1187
1188 state := msgState{acc: acc}
1189 defer func() {
1190 if rerr == nil && state.err != nil {
1191 rerr = state.err
1192 }
1193 state.clear()
1194 }()
1195
1196 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1197 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1198 return false, rerr
1199 }
1200
1201 envFilter := q.envFilterFn(log, &state)
1202 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1203 return false, rerr
1204 }
1205
1206 headerFilter := q.headerFilterFn(log, &state)
1207 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1208 return false, rerr
1209 }
1210
1211 wordsFilter := q.wordsFilterFn(log, &state)
1212 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1213 return false, rerr
1214 }
1215
1216 // Now check that we are either within the sorting order, or "last" was sent.
1217 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1218 return true, rerr
1219 }
1220 return false, rerr
1221}
1222
1223type msgResp struct {
1224 err error // If set, an error happened and fields below are not set.
1225 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1226 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1227 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1228 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1229}
1230
1231// viewRequestTx executes a request (query with filters, pagination) by
1232// launching a new goroutine with queryMessages, receiving results as msgResp,
1233// and sending Event* to the SSE connection.
1234//
1235// It always closes tx.
1236func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1237 defer func() {
1238 err := tx.Rollback()
1239 log.Check(err, "rolling back query transaction")
1240
1241 donec <- v.Request.ID
1242
1243 x := recover() // Should not happen, but don't take program down if it does.
1244 if x != nil {
1245 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1246 debug.PrintStack()
1247 metrics.PanicInc(metrics.Webmailrequest)
1248 }
1249 }()
1250
1251 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1252 var parsedMessage *ParsedMessage
1253 var viewEnd bool
1254
1255 var immediate bool // No waiting, flush immediate.
1256 t := time.NewTimer(300 * time.Millisecond)
1257 defer t.Stop()
1258
1259 sendViewMsgs := func(force bool) {
1260 if len(msgitems) == 0 && !force {
1261 return
1262 }
1263
1264 immediate = false
1265 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1266 msgitems = nil
1267 parsedMessage = nil
1268 t.Reset(300 * time.Millisecond)
1269 }
1270
1271 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1272
1273 mrc := make(chan msgResp, 1)
1274 go queryMessages(ctx, log, acc, tx, v, mrc)
1275
1276 for {
1277 select {
1278 case mr, ok := <-mrc:
1279 if !ok {
1280 sendViewMsgs(false)
1281 // Empty message list signals this query is done.
1282 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1283 return
1284 }
1285 if mr.err != nil {
1286 sendViewMsgs(false)
1287 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1288 return
1289 }
1290 if mr.reset {
1291 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1292 continue
1293 }
1294 if mr.viewEnd {
1295 viewEnd = true
1296 sendViewMsgs(true)
1297 return
1298 }
1299
1300 msgitems = append(msgitems, mr.mil)
1301 if mr.pm != nil {
1302 parsedMessage = mr.pm
1303 }
1304 if immediate {
1305 sendViewMsgs(true)
1306 }
1307
1308 case <-t.C:
1309 if len(msgitems) == 0 {
1310 // Nothing to send yet. We'll send immediately when the next message comes in.
1311 immediate = true
1312 } else {
1313 sendViewMsgs(false)
1314 }
1315 }
1316 }
1317}
1318
1319// queryMessages executes a query, with filter, pagination, destination message id
1320// to fetch (the message that the client had in view and wants to display again).
1321// It sends on msgc, with several types of messages: errors, whether the view is
1322// reset due to missing AnchorMessageID, and when the end of the view was reached
1323// and/or for a message.
1324func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1325 defer func() {
1326 x := recover() // Should not happen, but don't take program down if it does.
1327 if x != nil {
1328 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1329 debug.PrintStack()
1330 mrc <- msgResp{err: fmt.Errorf("query failed")}
1331 metrics.PanicInc(metrics.Webmailquery)
1332 }
1333
1334 close(mrc)
1335 }()
1336
1337 query := v.Request.Query
1338 page := v.Request.Page
1339
1340 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1341
1342 checkMessage := func(id int64) (valid bool, rerr error) {
1343 m := store.Message{ID: id}
1344 err := tx.Get(&m)
1345 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1346 return false, nil
1347 } else if err != nil {
1348 return false, err
1349 } else {
1350 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1351 return m, nil
1352 })
1353 }
1354 }
1355
1356 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1357 if page.AnchorMessageID > 0 {
1358 // Check if message exists and (still) matches the filter.
1359 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1360 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1361 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1362 return
1363 } else if !valid {
1364 mrc <- msgResp{reset: true}
1365 page.AnchorMessageID = 0
1366 }
1367 }
1368
1369 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1370 // it instead of continuing to send message till the end of the view.
1371 if page.DestMessageID > 0 {
1372 if valid, err := checkMessage(page.DestMessageID); err != nil {
1373 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1374 return
1375 } else if !valid {
1376 page.DestMessageID = 0
1377 }
1378 }
1379
1380 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1381
1382 q := bstore.QueryTx[store.Message](tx)
1383 q.FilterEqual("Expunged", false)
1384 if len(v.mailboxIDs) > 0 {
1385 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1386 // Should result in fast indexed query.
1387 for mbID := range v.mailboxIDs {
1388 q.FilterNonzero(store.Message{MailboxID: mbID})
1389 }
1390 } else {
1391 idsAny := make([]any, 0, len(v.mailboxIDs))
1392 for mbID := range v.mailboxIDs {
1393 idsAny = append(idsAny, mbID)
1394 }
1395 if v.matchMailboxIDs {
1396 q.FilterEqual("MailboxID", idsAny...)
1397 } else {
1398 q.FilterNotEqual("MailboxID", idsAny...)
1399 }
1400 }
1401 }
1402
1403 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1404 if page.AnchorMessageID > 0 {
1405 var seen = false
1406 q.FilterFn(func(m store.Message) bool {
1407 if seen {
1408 return true
1409 }
1410 seen = m.ID == page.AnchorMessageID
1411 return false
1412 })
1413 }
1414
1415 // We may be added filters the the query below. The FilterFn signature does not
1416 // implement reporting errors, or anything else, just a bool. So when making the
1417 // filter functions, we give them a place to store parsed message state, and an
1418 // error. We check the error during and after query execution.
1419 state := msgState{acc: acc}
1420 defer state.clear()
1421
1422 flagfilter := query.flagFilterFn()
1423 if flagfilter != nil {
1424 q.FilterFn(func(m store.Message) bool {
1425 return flagfilter(m.Flags, m.Keywords)
1426 })
1427 }
1428
1429 if query.Filter.Oldest != nil {
1430 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1431 }
1432 if query.Filter.Newest != nil {
1433 q.FilterLessEqual("Received", *query.Filter.Newest)
1434 }
1435
1436 if query.Filter.SizeMin > 0 {
1437 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1438 }
1439 if query.Filter.SizeMax > 0 {
1440 q.FilterLessEqual("Size", query.Filter.SizeMax)
1441 }
1442
1443 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1444 if attachmentFilter != nil {
1445 q.FilterFn(attachmentFilter)
1446 }
1447
1448 envFilter := query.envFilterFn(log, &state)
1449 if envFilter != nil {
1450 q.FilterFn(envFilter)
1451 }
1452
1453 headerFilter := query.headerFilterFn(log, &state)
1454 if headerFilter != nil {
1455 q.FilterFn(headerFilter)
1456 }
1457
1458 wordsFilter := query.wordsFilterFn(log, &state)
1459 if wordsFilter != nil {
1460 q.FilterFn(wordsFilter)
1461 }
1462
1463 if query.OrderAsc {
1464 q.SortAsc("Received")
1465 } else {
1466 q.SortDesc("Received")
1467 }
1468 found := page.DestMessageID <= 0
1469 end := true
1470 have := 0
1471 err := q.ForEach(func(m store.Message) error {
1472 // Check for an error in one of the filters, propagate it.
1473 if state.err != nil {
1474 return state.err
1475 }
1476
1477 if have >= page.Count && found || have > 10000 {
1478 end = false
1479 return bstore.StopForEach
1480 }
1481
1482 if _, ok := v.threadIDs[m.ThreadID]; ok {
1483 // Message was already returned as part of a thread.
1484 return nil
1485 }
1486
1487 var pm *ParsedMessage
1488 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1489 // For threads, if there was not DestMessageID, we may be getting the newest
1490 // message. For an initial view, this isn't necessarily the first the user is
1491 // expected to read first, that would be the first unread, which we'll get below
1492 // when gathering the thread.
1493 found = true
1494 xpm, err := parsedMessage(log, m, &state, true, false)
1495 if err != nil {
1496 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1497 }
1498 pm = &xpm
1499 }
1500
1501 mi, err := messageItem(log, m, &state)
1502 if err != nil {
1503 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1504 }
1505 mil := []MessageItem{mi}
1506 if query.Threading != ThreadOff {
1507 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1508 if err != nil {
1509 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1510 }
1511 if xpm != nil {
1512 pm = xpm
1513 found = true
1514 }
1515 mil = append(mil, more...)
1516 v.threadIDs[m.ThreadID] = struct{}{}
1517
1518 // Calculate how many messages the frontend is going to show, and only count those as returned.
1519 collapsed := map[int64]bool{}
1520 for _, mi := range mil {
1521 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1522 }
1523 unread := map[int64]bool{} // Propagated to thread root.
1524 if query.Threading == ThreadUnread {
1525 for _, mi := range mil {
1526 mm := mi.Message
1527 if mm.Seen {
1528 continue
1529 }
1530 unread[mm.ID] = true
1531 for _, id := range mm.ThreadParentIDs {
1532 unread[id] = true
1533 }
1534 }
1535 }
1536 for _, mi := range mil {
1537 mm := mi.Message
1538 threadRoot := true
1539 rootID := mm.ID
1540 for _, id := range mm.ThreadParentIDs {
1541 if _, ok := collapsed[id]; ok {
1542 threadRoot = false
1543 rootID = id
1544 }
1545 }
1546 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1547 have++
1548 }
1549 }
1550 } else {
1551 have++
1552 }
1553 if pm != nil && len(pm.envelope.From) == 1 {
1554 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1555 if err != nil {
1556 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1557 }
1558 }
1559 mrc <- msgResp{mil: mil, pm: pm}
1560 return nil
1561 })
1562 // Check for an error in one of the filters again. Check in ForEach would not
1563 // trigger if the last message has the error.
1564 if err == nil && state.err != nil {
1565 err = state.err
1566 }
1567 if err != nil {
1568 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1569 return
1570 }
1571 if end {
1572 mrc <- msgResp{viewEnd: true}
1573 }
1574}
1575
1576func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1577 if m.ThreadID == 0 {
1578 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1579 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1580 }
1581
1582 // Fetch other messages for this thread.
1583 qt := bstore.QueryTx[store.Message](tx)
1584 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1585 qt.FilterEqual("Expunged", false)
1586 qt.FilterNotEqual("ID", m.ID)
1587 qt.SortAsc("ID")
1588 tml, err := qt.List()
1589 if err != nil {
1590 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1591 }
1592
1593 var mil []MessageItem
1594 var pm *ParsedMessage
1595 var firstUnread bool
1596 for _, tm := range tml {
1597 err := func() error {
1598 xstate := msgState{acc: acc}
1599 defer xstate.clear()
1600
1601 mi, err := messageItem(log, tm, &xstate)
1602 if err != nil {
1603 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1604 }
1605 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1606 return tm, nil
1607 })
1608 if err != nil {
1609 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1610 }
1611 mil = append(mil, mi)
1612
1613 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1614 firstUnread = !tm.Seen
1615 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1616 if err != nil {
1617 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1618 }
1619 pm = &xpm
1620 }
1621 return nil
1622 }()
1623 if err != nil {
1624 return nil, nil, err
1625 }
1626 }
1627
1628 // Finally, the message that caused us to gather this thread (which is likely the
1629 // most recent message in the thread) could be the only unread message.
1630 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1631 xstate := msgState{acc: acc}
1632 defer xstate.clear()
1633 xpm, err := parsedMessage(log, m, &xstate, true, false)
1634 if err != nil {
1635 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1636 }
1637 pm = &xpm
1638 }
1639
1640 return mil, pm, nil
1641}
1642
1643// While checking the filters on a message, we may need to get more message
1644// details as each filter passes. We check the filters that need the basic
1645// information first, and load and cache more details for the next filters.
1646// msgState holds parsed details for a message, it is updated while filtering,
1647// with more information or reset for a next message.
1648type msgState struct {
1649 acc *store.Account // Never changes during lifetime.
1650 err error // Once set, doesn't get cleared.
1651 m store.Message
1652 part *message.Part // Will be without Reader when msgr is nil.
1653 msgr *store.MsgReader
1654}
1655
1656func (ms *msgState) clear() {
1657 if ms.msgr != nil {
1658 ms.msgr.Close()
1659 ms.msgr = nil
1660 }
1661 *ms = msgState{acc: ms.acc, err: ms.err}
1662}
1663
1664func (ms *msgState) ensureMsg(m store.Message) {
1665 if m.ID != ms.m.ID {
1666 ms.clear()
1667 }
1668 ms.m = m
1669}
1670
1671func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1672 ms.ensureMsg(m)
1673
1674 if ms.err == nil {
1675 if ms.part == nil {
1676 if m.ParsedBuf == nil {
1677 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1678 return false
1679 }
1680 var p message.Part
1681 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1682 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1683 return false
1684 }
1685 ms.part = &p
1686 }
1687 if withMsgReader && ms.msgr == nil {
1688 ms.msgr = ms.acc.MessageReader(m)
1689 ms.part.SetReaderAt(ms.msgr)
1690 }
1691 }
1692 return ms.part != nil
1693}
1694
1695// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1696// filters for a query. A nil function is returned if there are no flags to filter
1697// on.
1698func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1699 labels := map[string]bool{}
1700 for _, k := range q.Filter.Labels {
1701 labels[k] = true
1702 }
1703 for _, k := range q.NotFilter.Labels {
1704 labels[k] = false
1705 }
1706
1707 if len(labels) == 0 {
1708 return nil
1709 }
1710
1711 var mask, flags store.Flags
1712 systemflags := map[string][]*bool{
1713 `\answered`: {&mask.Answered, &flags.Answered},
1714 `\flagged`: {&mask.Flagged, &flags.Flagged},
1715 `\deleted`: {&mask.Deleted, &flags.Deleted},
1716 `\seen`: {&mask.Seen, &flags.Seen},
1717 `\draft`: {&mask.Draft, &flags.Draft},
1718 `$junk`: {&mask.Junk, &flags.Junk},
1719 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1720 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1721 `$phishing`: {&mask.Phishing, &flags.Phishing},
1722 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1723 }
1724 keywords := map[string]bool{}
1725 for k, v := range labels {
1726 k = strings.ToLower(k)
1727 if mf, ok := systemflags[k]; ok {
1728 *mf[0] = true
1729 *mf[1] = v
1730 } else {
1731 keywords[k] = v
1732 }
1733 }
1734 return func(msgFlags store.Flags, msgKeywords []string) bool {
1735 var f store.Flags
1736 if f.Set(mask, msgFlags) != flags {
1737 return false
1738 }
1739 for k, v := range keywords {
1740 if slices.Contains(msgKeywords, k) != v {
1741 return false
1742 }
1743 }
1744 return true
1745 }
1746}
1747
1748// attachmentFilterFn returns a function that filters for the attachment-related
1749// filter from the query. A nil function is returned if there are attachment
1750// filters.
1751func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1752 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1753 return nil
1754 }
1755
1756 return func(m store.Message) bool {
1757 if !state.ensurePart(m, false) {
1758 return false
1759 }
1760 types, err := attachmentTypes(log, m, state)
1761 if err != nil {
1762 state.err = err
1763 return false
1764 }
1765 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1766 }
1767}
1768
1769var attachmentMimetypes = map[string]AttachmentType{
1770 "application/pdf": AttachmentPDF,
1771 "application/zip": AttachmentArchive,
1772 "application/x-rar-compressed": AttachmentArchive,
1773 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1774 "application/vnd.ms-excel": AttachmentSpreadsheet,
1775 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1776 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1777 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1778 "application/vnd.ms-powerpoint": AttachmentPresentation,
1779 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1780}
1781var attachmentExtensions = map[string]AttachmentType{
1782 ".pdf": AttachmentPDF,
1783 ".zip": AttachmentArchive,
1784 ".tar": AttachmentArchive,
1785 ".tgz": AttachmentArchive,
1786 ".tar.gz": AttachmentArchive,
1787 ".tbz2": AttachmentArchive,
1788 ".tar.bz2": AttachmentArchive,
1789 ".tar.lz": AttachmentArchive,
1790 ".tlz": AttachmentArchive,
1791 ".tar.xz": AttachmentArchive,
1792 ".txz": AttachmentArchive,
1793 ".tar.zst": AttachmentArchive,
1794 ".tar.lz4": AttachmentArchive,
1795 ".7z": AttachmentArchive,
1796 ".rar": AttachmentArchive,
1797 ".ods": AttachmentSpreadsheet,
1798 ".xls": AttachmentSpreadsheet,
1799 ".xlsx": AttachmentSpreadsheet,
1800 ".odt": AttachmentDocument,
1801 ".doc": AttachmentDocument,
1802 ".docx": AttachmentDocument,
1803 ".odp": AttachmentPresentation,
1804 ".ppt": AttachmentPresentation,
1805 ".pptx": AttachmentPresentation,
1806}
1807
1808func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1809 types := map[AttachmentType]bool{}
1810
1811 pm, err := parsedMessage(log, m, state, false, false)
1812 if err != nil {
1813 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1814 }
1815 for _, a := range pm.attachments {
1816 if a.Part.MediaType == "IMAGE" {
1817 types[AttachmentImage] = true
1818 continue
1819 }
1820 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1821 if t, ok := attachmentMimetypes[mt]; ok {
1822 types[t] = true
1823 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1824 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1825 types[t] = true
1826 } else {
1827 continue
1828 }
1829 }
1830 }
1831
1832 if len(types) == 0 {
1833 types[AttachmentNone] = true
1834 } else {
1835 types[AttachmentAny] = true
1836 }
1837 return types, nil
1838}
1839
1840// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1841// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1842// clash with SMTP envelope) for the query. A nil function is returned if no
1843// filtering is needed.
1844func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1845 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1846 return nil
1847 }
1848
1849 lower := func(l []string) []string {
1850 if len(l) == 0 {
1851 return nil
1852 }
1853 r := make([]string, len(l))
1854 for i, s := range l {
1855 r[i] = strings.ToLower(s)
1856 }
1857 return r
1858 }
1859
1860 filterSubject := lower(q.Filter.Subject)
1861 notFilterSubject := lower(q.NotFilter.Subject)
1862 filterFrom := lower(q.Filter.From)
1863 notFilterFrom := lower(q.NotFilter.From)
1864 filterTo := lower(q.Filter.To)
1865 notFilterTo := lower(q.NotFilter.To)
1866
1867 return func(m store.Message) bool {
1868 if !state.ensurePart(m, false) {
1869 return false
1870 }
1871
1872 var env message.Envelope
1873 if state.part.Envelope != nil {
1874 env = *state.part.Envelope
1875 }
1876
1877 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1878 subject := strings.ToLower(env.Subject)
1879 for _, s := range filterSubject {
1880 if !strings.Contains(subject, s) {
1881 return false
1882 }
1883 }
1884 for _, s := range notFilterSubject {
1885 if strings.Contains(subject, s) {
1886 return false
1887 }
1888 }
1889 }
1890
1891 contains := func(textLower []string, l []message.Address, all bool) bool {
1892 next:
1893 for _, s := range textLower {
1894 for _, a := range l {
1895 name := strings.ToLower(a.Name)
1896 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1897 if strings.Contains(name, s) || strings.Contains(addr, s) {
1898 if !all {
1899 return true
1900 }
1901 continue next
1902 }
1903 }
1904 if all {
1905 return false
1906 }
1907 }
1908 return all
1909 }
1910
1911 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1912 return false
1913 }
1914 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1915 return false
1916 }
1917 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1918 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1919 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1920 return false
1921 }
1922 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1923 return false
1924 }
1925 }
1926 return true
1927 }
1928}
1929
1930// headerFilterFn returns a function that filters for the header filters in the
1931// query. A nil function is returned if there are no header filters.
1932func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1933 if len(q.Filter.Headers) == 0 {
1934 return nil
1935 }
1936
1937 lowerValues := make([]string, len(q.Filter.Headers))
1938 for i, t := range q.Filter.Headers {
1939 lowerValues[i] = strings.ToLower(t[1])
1940 }
1941
1942 return func(m store.Message) bool {
1943 if !state.ensurePart(m, true) {
1944 return false
1945 }
1946 hdr, err := state.part.Header()
1947 if err != nil {
1948 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1949 return false
1950 }
1951
1952 next:
1953 for i, t := range q.Filter.Headers {
1954 k := t[0]
1955 v := lowerValues[i]
1956 l := hdr.Values(k)
1957 if v == "" && len(l) > 0 {
1958 continue
1959 }
1960 for _, e := range l {
1961 if strings.Contains(strings.ToLower(e), v) {
1962 continue next
1963 }
1964 }
1965 return false
1966 }
1967 return true
1968 }
1969}
1970
1971// wordFiltersFn returns a function that applies the word filters of the query. A
1972// nil function is returned when query does not contain a word filter.
1973func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1974 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1975 return nil
1976 }
1977
1978 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1979
1980 return func(m store.Message) bool {
1981 if !state.ensurePart(m, true) {
1982 return false
1983 }
1984
1985 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1986 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
1987 return false
1988 } else {
1989 return ok
1990 }
1991 }
1992}
1993