1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36)
37
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
40type Request struct {
41 ID int64
42
43 SSEID int64 // SSE connection.
44
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
47 ViewID int64
48
49 // If set, this request and its view are canceled. A new view must be started.
50 Cancel bool
51
52 Query Query
53 Page Page
54}
55
56type ThreadMode string
57
58const (
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
62)
63
64// Query is a request for messages that match filters, in a given order.
65type Query struct {
66 OrderAsc bool // Order by received ascending or desending.
67 Threading ThreadMode
68 Filter Filter
69 NotFilter NotFilter
70}
71
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
74
75const (
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
85)
86
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
89type Filter struct {
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
91 MailboxID int64
92
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
95
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
98 // MailboxID.
99 MailboxName string
100
101 Words []string // Case insensitive substring match for each string.
102 From []string
103 To []string // Including Cc and Bcc.
104 Oldest *time.Time
105 Newest *time.Time
106 Subject []string
107 Attachments AttachmentType
108 Labels []string
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
110 SizeMin int64
111 SizeMax int64
112}
113
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
116 Words []string
117 From []string
118 To []string
119 Subject []string
120 Attachments AttachmentType
121 Labels []string
122}
123
124// Page holds pagination parameters for a request.
125type Page struct {
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
129
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
131 // one request.
132 Count int
133
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
136 // message if found.
137 DestMessageID int64
138}
139
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143// included.
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
147 Domain dns.Domain
148}
149
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
154 Date time.Time
155 Subject string
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
159 To []MessageAddress
160 CC []MessageAddress
161 BCC []MessageAddress
162 InReplyTo string
163 MessageID string
164}
165
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
168// messages.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
173 IsSigned bool
174 IsEncrypted bool
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177}
178
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
181// not duplicated.
182type ParsedMessage struct {
183 ID int64
184 Part message.Part
185 Headers map[string][]string
186 ViewMode store.ViewMode
187
188 // Text parts, can be empty.
189 Texts []string
190
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
195 // attribute.
196 HasHTML bool
197
198 ListReplyAddress *MessageAddress // From List-Post.
199
200 // Information used by MessageItem, not exported in this type.
201 envelope MessageEnvelope
202 attachments []Attachment
203 isSigned bool
204 isEncrypted bool
205 firstLine string
206}
207
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
212 SSEID int64
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 MailboxName string
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
221 Version string
222}
223
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
226// account.
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparator string // Can be empty.
229 LocalpartCaseSensitive bool
230}
231
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
235 ViewID int64
236 RequestID int64
237
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
247
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
250
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
253 ViewEnd bool
254}
255
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
259 ViewID int64
260 RequestID int64
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
263}
264
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
270 ViewID int64
271 RequestID int64
272}
273
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
278 ViewID int64
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
280}
281
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
284 store.ChangeAddUID
285 MessageItems []MessageItem
286}
287
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
291}
292
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
295 store.ChangeFlags
296}
297
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
300 store.ChangeThread
301}
302
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
306}
307
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
311}
312
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
317}
318
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
322}
323
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
327}
328
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
333}
334
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
339type view struct {
340 Request Request
341
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
345
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
349 End bool
350
351 // Whether message must or must not match mailboxIDs.
352 matchMailboxIDs bool
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
356
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
362}
363
364// sses tracks all sse connections, and access to them.
365var sses = struct {
366 sync.Mutex
367 gen int64
368 m map[int64]sse
369}{m: map[int64]sse{}}
370
371// sse represents an sse connection.
372type sse struct {
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
376}
377
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
380 sses.Lock()
381 defer sses.Unlock()
382 delete(sses.m, sse.ID)
383
384 // Drain any pending requests, preventing blocked goroutines from API calls.
385 for {
386 select {
387 case <-sse.Request:
388 default:
389 return
390 }
391 }
392}
393
394func sseRegister(accountName string) sse {
395 sses.Lock()
396 defer sses.Unlock()
397 sses.gen++
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
399 sses.m[v.ID] = v
400 return v
401}
402
403// sseGet returns a reference to an existing connection if it exists and user
404// has access.
405func sseGet(id int64, accountName string) (sse, bool) {
406 sses.Lock()
407 defer sses.Unlock()
408 s := sses.m[id]
409 if s.AccountName != accountName {
410 return sse{}, false
411 }
412 return s, true
413}
414
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
419 accName string
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422 validUntil time.Time
423}
424
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
428 sync.Mutex
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
431}
432
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
436}
437
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
445
446 x.Lock()
447 defer x.Unlock()
448 n := len(x.accountTokens[accName])
449 if n >= 10 {
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
452 }
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
455 }
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
458 return st.token
459}
460
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
463 x.Lock()
464 defer x.Unlock()
465
466 st, ok := x.tokens[token]
467 if !ok {
468 return "", "", "", false, nil
469 }
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
473 } else {
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
478 }
479 }
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
482 }
483 return st.accName, st.address, st.sessionToken, true, nil
484}
485
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
487type ioErr struct {
488 err error
489}
490
491// serveEvents serves an SSE connection. Authentication is done through a query
492// string parameter "token", a one-time-use token returned by the Token API call.
493func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
494 if r.Method != "GET" {
495 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
496 return
497 }
498
499 flusher, ok := w.(http.Flusher)
500 if !ok {
501 log.Error("internal error: ResponseWriter not a http.Flusher")
502 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
503 return
504 }
505
506 q := r.URL.Query()
507 token := q.Get("token")
508 if token == "" {
509 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
510 return
511 }
512 accName, address, sessionToken, ok, err := sseTokens.check(token)
513 if err != nil {
514 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
515 return
516 }
517 if !ok {
518 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
519 return
520 }
521 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
522 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
523 return
524 }
525
526 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
527 // incoming responses with its slow-network similation.
528 var waitMin, waitMax time.Duration
529 waitMinMsec := q.Get("waitMinMsec")
530 waitMaxMsec := q.Get("waitMaxMsec")
531 if waitMinMsec != "" && waitMaxMsec != "" {
532 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
533 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
534 return
535 } else {
536 waitMin = time.Duration(v) * time.Millisecond
537 }
538
539 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
540 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
541 return
542 } else {
543 waitMax = time.Duration(v) * time.Millisecond
544 }
545 }
546
547 // Parse the request with initial mailbox/search criteria.
548 var req Request
549 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
550 dec.DisallowUnknownFields()
551 if err := dec.Decode(&req); err != nil {
552 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
553 return
554 } else if req.Page.Count <= 0 {
555 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
556 return
557 }
558 if req.Query.Threading == "" {
559 req.Query.Threading = ThreadOff
560 }
561
562 var writer *eventWriter
563
564 metricSSEConnections.Inc()
565 defer metricSSEConnections.Dec()
566
567 // Below here, error handling cause through xcheckf, which panics with
568 // *sherpa.Error, after which we send an error event to the client. We can also get
569 // an *ioErr when the connection is broken.
570 defer func() {
571 x := recover()
572 if x == nil {
573 return
574 }
575 if err, ok := x.(*sherpa.Error); ok {
576 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
577 } else if _, ok := x.(ioErr); ok {
578 return
579 } else {
580 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
581 debug.PrintStack()
582 metrics.PanicInc(metrics.Webmail)
583 panic(x)
584 }
585 }()
586
587 h := w.Header()
588 h.Set("Content-Type", "text/event-stream")
589 h.Set("Cache-Control", "no-cache")
590
591 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
592 // keys), so should be quite compressible.
593 var out writeFlusher
594 gz := mox.AcceptsGzip(r)
595 if gz {
596 h.Set("Content-Encoding", "gzip")
597 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
598 } else {
599 out = nopFlusher{w}
600 }
601 out = httpFlusher{out, flusher}
602
603 // We'll be writing outgoing SSE events through writer.
604 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
605 defer writer.close()
606
607 // Fetch initial data.
608 acc, err := store.OpenAccount(log, accName)
609 xcheckf(ctx, err, "open account")
610 defer func() {
611 err := acc.Close()
612 log.Check(err, "closing account")
613 }()
614 comm := store.RegisterComm(acc)
615 defer comm.Unregister()
616
617 // List addresses that the client can use to send email from.
618 accConf, _ := acc.Conf()
619 loginAddr, err := smtp.ParseAddress(address)
620 xcheckf(ctx, err, "parsing login address")
621 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
622 xcheckf(ctx, err, "looking up destination for login address")
623 loginName := accConf.FullName
624 if dest.FullName != "" {
625 loginName = dest.FullName
626 }
627 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
628 var addresses []MessageAddress
629 for a, dest := range accConf.Destinations {
630 name := dest.FullName
631 if name == "" {
632 name = accConf.FullName
633 }
634 var ma MessageAddress
635 if strings.HasPrefix(a, "@") {
636 dom, err := dns.ParseDomain(a[1:])
637 xcheckf(ctx, err, "parsing destination address for account")
638 ma = MessageAddress{Domain: dom}
639 } else {
640 addr, err := smtp.ParseAddress(a)
641 xcheckf(ctx, err, "parsing destination address for account")
642 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
643 }
644 addresses = append(addresses, ma)
645 }
646 // User is allowed to send using alias address as message From address. Webmail
647 // will choose it when replying to a message sent to that address.
648 aliasAddrs := map[MessageAddress]bool{}
649 for _, a := range accConf.Aliases {
650 if a.Alias.AllowMsgFrom {
651 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
652 if !aliasAddrs[ma] {
653 addresses = append(addresses, ma)
654 }
655 aliasAddrs[ma] = true
656 }
657 }
658
659 // We implicitly start a query. We use the reqctx for the transaction, because the
660 // transaction is passed to the query, which can be canceled.
661 reqctx, reqctxcancel := context.WithCancel(ctx)
662 defer func() {
663 // We also cancel in cancelDrain later on, but there is a brief window where the
664 // context wouldn't be canceled.
665 if reqctxcancel != nil {
666 reqctxcancel()
667 reqctxcancel = nil
668 }
669 }()
670
671 // qtx is kept around during connection initialization, until we pass it off to the
672 // goroutine that starts querying for messages.
673 var qtx *bstore.Tx
674 defer func() {
675 if qtx != nil {
676 err := qtx.Rollback()
677 log.Check(err, "rolling back")
678 }
679 }()
680
681 var mbl []store.Mailbox
682 settings := store.Settings{ID: 1}
683
684 // We only take the rlock when getting the tx.
685 acc.WithRLock(func() {
686 // Now a read-only transaction we'll use during the query.
687 qtx, err = acc.DB.Begin(reqctx, false)
688 xcheckf(ctx, err, "begin transaction")
689
690 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
691 xcheckf(ctx, err, "list mailboxes")
692
693 err = qtx.Get(&settings)
694 xcheckf(ctx, err, "get settings")
695 })
696
697 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
698 var zerofilter Filter
699 var zeronotfilter NotFilter
700 var mailbox store.Mailbox
701 var mailboxPrefixes []string
702 var matchMailboxes bool
703 mailboxIDs := map[int64]bool{}
704 mailboxName := req.Query.Filter.MailboxName
705 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
706 if mailboxName == "" {
707 mailboxName = "Inbox"
708 }
709
710 var inbox store.Mailbox
711 for _, e := range mbl {
712 if e.Name == mailboxName {
713 mailbox = e
714 }
715 if e.Name == "Inbox" {
716 inbox = e
717 }
718 }
719 if mailbox.ID == 0 {
720 mailbox = inbox
721 }
722 if mailbox.ID == 0 {
723 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
724 }
725 req.Query.Filter.MailboxID = mailbox.ID
726 req.Query.Filter.MailboxName = ""
727 mailboxPrefixes = []string{mailbox.Name + "/"}
728 matchMailboxes = true
729 mailboxIDs[mailbox.ID] = true
730 } else {
731 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
732 }
733 if req.Query.Filter.MailboxChildrenIncluded {
734 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
735 }
736
737 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
738
739 sse := sseRegister(acc.Name)
740 defer sse.unregister()
741
742 // Per-domain localpart config so webclient can decide if an address belongs to the account.
743 domainAddressConfigs := map[string]DomainAddressConfig{}
744 for _, a := range addresses {
745 dom, _ := mox.Conf.Domain(a.Domain)
746 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
747 }
748
749 // Write first event, allowing client to fill its UI with mailboxes.
750 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
751 writer.xsendEvent(ctx, log, "start", start)
752
753 // The goroutine doing the querying will send messages on these channels, which
754 // result in an event being written on the SSE connection.
755 viewMsgsc := make(chan EventViewMsgs)
756 viewErrc := make(chan EventViewErr)
757 viewResetc := make(chan EventViewReset)
758 donec := make(chan int64) // When request is done.
759
760 // Start a view, it determines if we send a change to the client. And start an
761 // implicit query for messages, we'll send the messages to the client which can
762 // fill its ui with messages.
763 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
764 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
765 qtx = nil // viewRequestTx closes qtx
766
767 // When canceling a query, we must drain its messages until it says it is done.
768 // Otherwise the sending goroutine would hang indefinitely on a channel send.
769 cancelDrain := func() {
770 if reqctxcancel != nil {
771 // Cancel the goroutine doing the querying.
772 reqctxcancel()
773 reqctx = nil
774 reqctxcancel = nil
775 } else {
776 return
777 }
778
779 // Drain events until done.
780 for {
781 select {
782 case <-viewMsgsc:
783 case <-viewErrc:
784 case <-viewResetc:
785 case <-donec:
786 return
787 }
788 }
789 }
790
791 // If we stop and a query is in progress, we must drain the channel it will send on.
792 defer cancelDrain()
793
794 // Changes broadcasted by other connections on this account. If applicable for the
795 // connection/view, we send events.
796 xprocessChanges := func(changes []store.Change) {
797 taggedChanges := [][2]any{}
798
799 // We get a transaction first time we need it.
800 var xtx *bstore.Tx
801 defer func() {
802 if xtx != nil {
803 err := xtx.Rollback()
804 log.Check(err, "rolling back transaction")
805 }
806 }()
807 ensureTx := func() error {
808 if xtx != nil {
809 return nil
810 }
811 acc.RLock()
812 defer acc.RUnlock()
813 var err error
814 xtx, err = acc.DB.Begin(ctx, false)
815 return err
816 }
817 // This getmsg will now only be called mailboxID+UID, not with messageID set.
818 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
819 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
820 if err := ensureTx(); err != nil {
821 return store.Message{}, fmt.Errorf("transaction: %v", err)
822 }
823 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
824 }
825
826 // Return uids that are within range in view. Because the end has been reached, or
827 // because the UID is not after the last message.
828 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
829 uidsAny := make([]any, len(uids))
830 for i, uid := range uids {
831 uidsAny[i] = uid
832 }
833 err := ensureTx()
834 xcheckf(ctx, err, "transaction")
835 q := bstore.QueryTx[store.Message](xtx)
836 q.FilterNonzero(store.Message{MailboxID: mailboxID})
837 q.FilterEqual("UID", uidsAny...)
838 mbOK := v.matchesMailbox(mailboxID)
839 err = q.ForEach(func(m store.Message) error {
840 _, thread := v.threadIDs[m.ThreadID]
841 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
842 changedUIDs = append(changedUIDs, m.UID)
843 }
844 return nil
845 })
846 xcheckf(ctx, err, "fetching messages for change")
847 return changedUIDs
848 }
849
850 // Forward changes that are relevant to the current view.
851 for _, change := range changes {
852 switch c := change.(type) {
853 case store.ChangeAddUID:
854 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
855 xcheckf(ctx, err, "matching new message against view")
856 m, err := getmsg(0, c.MailboxID, c.UID)
857 xcheckf(ctx, err, "get message")
858 _, thread := v.threadIDs[m.ThreadID]
859 if !ok && !thread {
860 continue
861 }
862 state := msgState{acc: acc}
863 mi, err := messageItem(log, m, &state)
864 state.clear()
865 xcheckf(ctx, err, "make messageitem")
866 mi.MatchQuery = ok
867
868 mil := []MessageItem{mi}
869 if !thread && req.Query.Threading != ThreadOff {
870 err := ensureTx()
871 xcheckf(ctx, err, "transaction")
872 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
873 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
874 mil = append(mil, more...)
875 v.threadIDs[m.ThreadID] = struct{}{}
876 }
877
878 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
879
880 // If message extends the view, store it as such.
881 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
882 v.LastMessageReceived = m.Received
883 }
884
885 case store.ChangeRemoveUIDs:
886 // We may send changes for uids the client doesn't know, that's fine.
887 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
888 if len(changedUIDs) == 0 {
889 continue
890 }
891 ch := ChangeMsgRemove{c}
892 ch.UIDs = changedUIDs
893 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
894
895 case store.ChangeFlags:
896 // We may send changes for uids the client doesn't know, that's fine.
897 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
898 if len(changedUIDs) == 0 {
899 continue
900 }
901 ch := ChangeMsgFlags{c}
902 ch.UID = changedUIDs[0]
903 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
904
905 case store.ChangeThread:
906 // Change in muted/collaped state, just always ship it.
907 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
908
909 case store.ChangeRemoveMailbox:
910 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
911
912 case store.ChangeAddMailbox:
913 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
914
915 case store.ChangeRenameMailbox:
916 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
917
918 case store.ChangeMailboxCounts:
919 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
920
921 case store.ChangeMailboxSpecialUse:
922 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
923
924 case store.ChangeMailboxKeywords:
925 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
926
927 case store.ChangeAddSubscription:
928 // Webmail does not care about subscriptions.
929
930 default:
931 panic(fmt.Sprintf("missing case for change %T", c))
932 }
933 }
934
935 if len(taggedChanges) > 0 {
936 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
937 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
938 }
939 }
940
941 timer := time.NewTimer(5 * time.Minute) // For keepalives.
942 defer timer.Stop()
943 for {
944 if writer.wrote {
945 timer.Reset(5 * time.Minute)
946 writer.wrote = false
947 }
948
949 pending := comm.Pending
950 if reqctx != nil {
951 pending = nil
952 }
953
954 select {
955 case <-mox.Shutdown.Done():
956 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
957 // Work around go vet, it doesn't see defer cancelDrain.
958 if reqctxcancel != nil {
959 reqctxcancel()
960 }
961 return
962
963 case <-timer.C:
964 _, err := fmt.Fprintf(out, ": keepalive\n\n")
965 if err != nil {
966 log.Errorx("write keepalive", err)
967 // Work around go vet, it doesn't see defer cancelDrain.
968 if reqctxcancel != nil {
969 reqctxcancel()
970 }
971 return
972 }
973 out.Flush()
974 writer.wrote = true
975
976 case vm := <-viewMsgsc:
977 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
978 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
979 }
980 if vm.ViewEnd {
981 v.End = true
982 }
983 if len(vm.MessageItems) > 0 {
984 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
985 }
986 writer.xsendEvent(ctx, log, "viewMsgs", vm)
987
988 case ve := <-viewErrc:
989 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
990 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
991 }
992 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
993 // Work around go vet, it doesn't see defer cancelDrain.
994 if reqctxcancel != nil {
995 reqctxcancel()
996 }
997 return
998 }
999 writer.xsendEvent(ctx, log, "viewErr", ve)
1000
1001 case vr := <-viewResetc:
1002 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1003 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1004 }
1005 writer.xsendEvent(ctx, log, "viewReset", vr)
1006
1007 case id := <-donec:
1008 if id != v.Request.ID {
1009 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1010 }
1011 if reqctxcancel != nil {
1012 reqctxcancel()
1013 }
1014 reqctx = nil
1015 reqctxcancel = nil
1016
1017 case req := <-sse.Request:
1018 if reqctx != nil {
1019 cancelDrain()
1020 }
1021 if req.Cancel {
1022 v = view{req, time.Time{}, false, false, nil, nil}
1023 continue
1024 }
1025
1026 reqctx, reqctxcancel = context.WithCancel(ctx)
1027
1028 stop := func() (stop bool) {
1029 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1030 var rtx *bstore.Tx
1031 var err error
1032 defer func() {
1033 if rtx != nil {
1034 err = rtx.Rollback()
1035 log.Check(err, "rolling back transaction")
1036 }
1037 }()
1038 acc.WithRLock(func() {
1039 rtx, err = acc.DB.Begin(reqctx, false)
1040 })
1041 if err != nil {
1042 reqctxcancel()
1043 reqctx = nil
1044 reqctxcancel = nil
1045
1046 if errors.Is(err, context.Canceled) {
1047 return true
1048 }
1049 err := fmt.Errorf("begin transaction: %v", err)
1050 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1051 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1052 return false
1053 }
1054
1055 // Reset view state for new query.
1056 if req.ViewID != v.Request.ViewID {
1057 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1058 if req.Query.Filter.MailboxChildrenIncluded {
1059 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1060 }
1061 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1062 } else {
1063 v.Request = req
1064 }
1065 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1066 rtx = nil
1067 return false
1068 }()
1069 if stop {
1070 return
1071 }
1072
1073 case <-pending:
1074 xprocessChanges(comm.Get())
1075
1076 case <-ctx.Done():
1077 // Work around go vet, it doesn't see defer cancelDrain.
1078 if reqctxcancel != nil {
1079 reqctxcancel()
1080 }
1081 return
1082 }
1083 }
1084}
1085
1086// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1087// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1088// mailboxIDs must or must not match. mailboxPrefixes is for use with
1089// xgatherMailboxIDs to gather children of the mailboxIDs.
1090func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1091 matchMailboxes = true
1092 mailboxIDs = map[int64]bool{}
1093 if f.MailboxID == -1 {
1094 matchMailboxes = false
1095 // Add the trash, junk and account rejects mailbox.
1096 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1097 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1098 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1099 mailboxIDs[mb.ID] = true
1100 }
1101 return nil
1102 })
1103 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1104 } else if f.MailboxID > 0 {
1105 mb := store.Mailbox{ID: f.MailboxID}
1106 err := tx.Get(&mb)
1107 xcheckf(ctx, err, "get mailbox")
1108 mailboxIDs[f.MailboxID] = true
1109 mailboxPrefixes = []string{mb.Name + "/"}
1110 }
1111 return
1112}
1113
1114// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1115// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1116func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1117 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1118 if len(mailboxPrefixes) == 0 {
1119 return
1120 }
1121 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1122 for _, p := range mailboxPrefixes {
1123 if strings.HasPrefix(mb.Name, p) {
1124 mailboxIDs[mb.ID] = true
1125 break
1126 }
1127 }
1128 return nil
1129 })
1130 xcheckf(ctx, err, "gathering mailboxes")
1131}
1132
1133// matchesMailbox returns whether a mailbox matches the view.
1134func (v view) matchesMailbox(mailboxID int64) bool {
1135 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1136}
1137
1138// inRange returns whether m is within the range for the view, whether a change for
1139// this message should be sent to the client so it can update its state.
1140func (v view) inRange(m store.Message) bool {
1141 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1142}
1143
1144// matches checks if the message, identified by either messageID or mailboxID+UID,
1145// is in the current "view" (i.e. passing the filters, and if checkRange is set
1146// also if within the range of sent messages based on sort order and the last seen
1147// message). getmsg retrieves the message, which may be necessary depending on the
1148// active filters. Used to determine if a store.Change with a new message should be
1149// sent, and for the destination and anchor messages in view requests.
1150func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1151 var m store.Message
1152 ensureMessage := func() bool {
1153 if m.ID == 0 && rerr == nil {
1154 m, rerr = getmsg(messageID, mailboxID, uid)
1155 }
1156 return rerr == nil
1157 }
1158
1159 q := v.Request.Query
1160
1161 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1162
1163 // Check filters.
1164 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1165 return false, rerr
1166 }
1167 // note: anchorMessageID is not relevant for matching.
1168 flagfilter := q.flagFilterFn()
1169 if flagfilter != nil && !flagfilter(flags, keywords) {
1170 return false, rerr
1171 }
1172
1173 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1174 return false, rerr
1175 }
1176 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1177 return false, rerr
1178 }
1179
1180 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1181 return false, rerr
1182 }
1183 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1184 return false, rerr
1185 }
1186
1187 state := msgState{acc: acc}
1188 defer func() {
1189 if rerr == nil && state.err != nil {
1190 rerr = state.err
1191 }
1192 state.clear()
1193 }()
1194
1195 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1196 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1197 return false, rerr
1198 }
1199
1200 envFilter := q.envFilterFn(log, &state)
1201 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1202 return false, rerr
1203 }
1204
1205 headerFilter := q.headerFilterFn(log, &state)
1206 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1207 return false, rerr
1208 }
1209
1210 wordsFilter := q.wordsFilterFn(log, &state)
1211 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1212 return false, rerr
1213 }
1214
1215 // Now check that we are either within the sorting order, or "last" was sent.
1216 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1217 return true, rerr
1218 }
1219 return false, rerr
1220}
1221
1222type msgResp struct {
1223 err error // If set, an error happened and fields below are not set.
1224 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1225 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1226 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1227 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1228}
1229
1230// viewRequestTx executes a request (query with filters, pagination) by
1231// launching a new goroutine with queryMessages, receiving results as msgResp,
1232// and sending Event* to the SSE connection.
1233//
1234// It always closes tx.
1235func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1236 defer func() {
1237 err := tx.Rollback()
1238 log.Check(err, "rolling back query transaction")
1239
1240 donec <- v.Request.ID
1241
1242 x := recover() // Should not happen, but don't take program down if it does.
1243 if x != nil {
1244 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1245 debug.PrintStack()
1246 metrics.PanicInc(metrics.Webmailrequest)
1247 }
1248 }()
1249
1250 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1251 var parsedMessage *ParsedMessage
1252 var viewEnd bool
1253
1254 var immediate bool // No waiting, flush immediate.
1255 t := time.NewTimer(300 * time.Millisecond)
1256 defer t.Stop()
1257
1258 sendViewMsgs := func(force bool) {
1259 if len(msgitems) == 0 && !force {
1260 return
1261 }
1262
1263 immediate = false
1264 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1265 msgitems = nil
1266 parsedMessage = nil
1267 t.Reset(300 * time.Millisecond)
1268 }
1269
1270 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1271
1272 mrc := make(chan msgResp, 1)
1273 go queryMessages(ctx, log, acc, tx, v, mrc)
1274
1275 for {
1276 select {
1277 case mr, ok := <-mrc:
1278 if !ok {
1279 sendViewMsgs(false)
1280 // Empty message list signals this query is done.
1281 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1282 return
1283 }
1284 if mr.err != nil {
1285 sendViewMsgs(false)
1286 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1287 return
1288 }
1289 if mr.reset {
1290 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1291 continue
1292 }
1293 if mr.viewEnd {
1294 viewEnd = true
1295 sendViewMsgs(true)
1296 return
1297 }
1298
1299 msgitems = append(msgitems, mr.mil)
1300 if mr.pm != nil {
1301 parsedMessage = mr.pm
1302 }
1303 if immediate {
1304 sendViewMsgs(true)
1305 }
1306
1307 case <-t.C:
1308 if len(msgitems) == 0 {
1309 // Nothing to send yet. We'll send immediately when the next message comes in.
1310 immediate = true
1311 } else {
1312 sendViewMsgs(false)
1313 }
1314 }
1315 }
1316}
1317
1318// queryMessages executes a query, with filter, pagination, destination message id
1319// to fetch (the message that the client had in view and wants to display again).
1320// It sends on msgc, with several types of messages: errors, whether the view is
1321// reset due to missing AnchorMessageID, and when the end of the view was reached
1322// and/or for a message.
1323func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1324 defer func() {
1325 x := recover() // Should not happen, but don't take program down if it does.
1326 if x != nil {
1327 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1328 debug.PrintStack()
1329 mrc <- msgResp{err: fmt.Errorf("query failed")}
1330 metrics.PanicInc(metrics.Webmailquery)
1331 }
1332
1333 close(mrc)
1334 }()
1335
1336 query := v.Request.Query
1337 page := v.Request.Page
1338
1339 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1340
1341 checkMessage := func(id int64) (valid bool, rerr error) {
1342 m := store.Message{ID: id}
1343 err := tx.Get(&m)
1344 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1345 return false, nil
1346 } else if err != nil {
1347 return false, err
1348 } else {
1349 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1350 return m, nil
1351 })
1352 }
1353 }
1354
1355 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1356 if page.AnchorMessageID > 0 {
1357 // Check if message exists and (still) matches the filter.
1358 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1359 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1360 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1361 return
1362 } else if !valid {
1363 mrc <- msgResp{reset: true}
1364 page.AnchorMessageID = 0
1365 }
1366 }
1367
1368 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1369 // it instead of continuing to send message till the end of the view.
1370 if page.DestMessageID > 0 {
1371 if valid, err := checkMessage(page.DestMessageID); err != nil {
1372 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1373 return
1374 } else if !valid {
1375 page.DestMessageID = 0
1376 }
1377 }
1378
1379 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1380
1381 q := bstore.QueryTx[store.Message](tx)
1382 q.FilterEqual("Expunged", false)
1383 if len(v.mailboxIDs) > 0 {
1384 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1385 // Should result in fast indexed query.
1386 for mbID := range v.mailboxIDs {
1387 q.FilterNonzero(store.Message{MailboxID: mbID})
1388 }
1389 } else {
1390 idsAny := make([]any, 0, len(v.mailboxIDs))
1391 for mbID := range v.mailboxIDs {
1392 idsAny = append(idsAny, mbID)
1393 }
1394 if v.matchMailboxIDs {
1395 q.FilterEqual("MailboxID", idsAny...)
1396 } else {
1397 q.FilterNotEqual("MailboxID", idsAny...)
1398 }
1399 }
1400 }
1401
1402 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1403 if page.AnchorMessageID > 0 {
1404 var seen = false
1405 q.FilterFn(func(m store.Message) bool {
1406 if seen {
1407 return true
1408 }
1409 seen = m.ID == page.AnchorMessageID
1410 return false
1411 })
1412 }
1413
1414 // We may be added filters the the query below. The FilterFn signature does not
1415 // implement reporting errors, or anything else, just a bool. So when making the
1416 // filter functions, we give them a place to store parsed message state, and an
1417 // error. We check the error during and after query execution.
1418 state := msgState{acc: acc}
1419 defer state.clear()
1420
1421 flagfilter := query.flagFilterFn()
1422 if flagfilter != nil {
1423 q.FilterFn(func(m store.Message) bool {
1424 return flagfilter(m.Flags, m.Keywords)
1425 })
1426 }
1427
1428 if query.Filter.Oldest != nil {
1429 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1430 }
1431 if query.Filter.Newest != nil {
1432 q.FilterLessEqual("Received", *query.Filter.Newest)
1433 }
1434
1435 if query.Filter.SizeMin > 0 {
1436 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1437 }
1438 if query.Filter.SizeMax > 0 {
1439 q.FilterLessEqual("Size", query.Filter.SizeMax)
1440 }
1441
1442 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1443 if attachmentFilter != nil {
1444 q.FilterFn(attachmentFilter)
1445 }
1446
1447 envFilter := query.envFilterFn(log, &state)
1448 if envFilter != nil {
1449 q.FilterFn(envFilter)
1450 }
1451
1452 headerFilter := query.headerFilterFn(log, &state)
1453 if headerFilter != nil {
1454 q.FilterFn(headerFilter)
1455 }
1456
1457 wordsFilter := query.wordsFilterFn(log, &state)
1458 if wordsFilter != nil {
1459 q.FilterFn(wordsFilter)
1460 }
1461
1462 if query.OrderAsc {
1463 q.SortAsc("Received")
1464 } else {
1465 q.SortDesc("Received")
1466 }
1467 found := page.DestMessageID <= 0
1468 end := true
1469 have := 0
1470 err := q.ForEach(func(m store.Message) error {
1471 // Check for an error in one of the filters, propagate it.
1472 if state.err != nil {
1473 return state.err
1474 }
1475
1476 if have >= page.Count && found || have > 10000 {
1477 end = false
1478 return bstore.StopForEach
1479 }
1480
1481 if _, ok := v.threadIDs[m.ThreadID]; ok {
1482 // Message was already returned as part of a thread.
1483 return nil
1484 }
1485
1486 var pm *ParsedMessage
1487 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1488 // For threads, if there was not DestMessageID, we may be getting the newest
1489 // message. For an initial view, this isn't necessarily the first the user is
1490 // expected to read first, that would be the first unread, which we'll get below
1491 // when gathering the thread.
1492 found = true
1493 xpm, err := parsedMessage(log, m, &state, true, false)
1494 if err != nil {
1495 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1496 }
1497 pm = &xpm
1498 }
1499
1500 mi, err := messageItem(log, m, &state)
1501 if err != nil {
1502 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1503 }
1504 mil := []MessageItem{mi}
1505 if query.Threading != ThreadOff {
1506 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1507 if err != nil {
1508 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1509 }
1510 if xpm != nil {
1511 pm = xpm
1512 found = true
1513 }
1514 mil = append(mil, more...)
1515 v.threadIDs[m.ThreadID] = struct{}{}
1516
1517 // Calculate how many messages the frontend is going to show, and only count those as returned.
1518 collapsed := map[int64]bool{}
1519 for _, mi := range mil {
1520 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1521 }
1522 unread := map[int64]bool{} // Propagated to thread root.
1523 if query.Threading == ThreadUnread {
1524 for _, mi := range mil {
1525 mm := mi.Message
1526 if mm.Seen {
1527 continue
1528 }
1529 unread[mm.ID] = true
1530 for _, id := range mm.ThreadParentIDs {
1531 unread[id] = true
1532 }
1533 }
1534 }
1535 for _, mi := range mil {
1536 mm := mi.Message
1537 threadRoot := true
1538 rootID := mm.ID
1539 for _, id := range mm.ThreadParentIDs {
1540 if _, ok := collapsed[id]; ok {
1541 threadRoot = false
1542 rootID = id
1543 }
1544 }
1545 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1546 have++
1547 }
1548 }
1549 } else {
1550 have++
1551 }
1552 if pm != nil && len(pm.envelope.From) == 1 {
1553 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1554 if err != nil {
1555 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1556 }
1557 }
1558 mrc <- msgResp{mil: mil, pm: pm}
1559 return nil
1560 })
1561 // Check for an error in one of the filters again. Check in ForEach would not
1562 // trigger if the last message has the error.
1563 if err == nil && state.err != nil {
1564 err = state.err
1565 }
1566 if err != nil {
1567 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1568 return
1569 }
1570 if end {
1571 mrc <- msgResp{viewEnd: true}
1572 }
1573}
1574
1575func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1576 if m.ThreadID == 0 {
1577 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1578 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1579 }
1580
1581 // Fetch other messages for this thread.
1582 qt := bstore.QueryTx[store.Message](tx)
1583 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1584 qt.FilterEqual("Expunged", false)
1585 qt.FilterNotEqual("ID", m.ID)
1586 qt.SortAsc("ID")
1587 tml, err := qt.List()
1588 if err != nil {
1589 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1590 }
1591
1592 var mil []MessageItem
1593 var pm *ParsedMessage
1594 var firstUnread bool
1595 for _, tm := range tml {
1596 err := func() error {
1597 xstate := msgState{acc: acc}
1598 defer xstate.clear()
1599
1600 mi, err := messageItem(log, tm, &xstate)
1601 if err != nil {
1602 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1603 }
1604 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1605 return tm, nil
1606 })
1607 if err != nil {
1608 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1609 }
1610 mil = append(mil, mi)
1611
1612 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1613 firstUnread = !tm.Seen
1614 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1615 if err != nil {
1616 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1617 }
1618 pm = &xpm
1619 }
1620 return nil
1621 }()
1622 if err != nil {
1623 return nil, nil, err
1624 }
1625 }
1626
1627 // Finally, the message that caused us to gather this thread (which is likely the
1628 // most recent message in the thread) could be the only unread message.
1629 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1630 xstate := msgState{acc: acc}
1631 defer xstate.clear()
1632 xpm, err := parsedMessage(log, m, &xstate, true, false)
1633 if err != nil {
1634 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1635 }
1636 pm = &xpm
1637 }
1638
1639 return mil, pm, nil
1640}
1641
1642// While checking the filters on a message, we may need to get more message
1643// details as each filter passes. We check the filters that need the basic
1644// information first, and load and cache more details for the next filters.
1645// msgState holds parsed details for a message, it is updated while filtering,
1646// with more information or reset for a next message.
1647type msgState struct {
1648 acc *store.Account // Never changes during lifetime.
1649 err error // Once set, doesn't get cleared.
1650 m store.Message
1651 part *message.Part // Will be without Reader when msgr is nil.
1652 msgr *store.MsgReader
1653}
1654
1655func (ms *msgState) clear() {
1656 if ms.msgr != nil {
1657 ms.msgr.Close()
1658 ms.msgr = nil
1659 }
1660 *ms = msgState{acc: ms.acc, err: ms.err}
1661}
1662
1663func (ms *msgState) ensureMsg(m store.Message) {
1664 if m.ID != ms.m.ID {
1665 ms.clear()
1666 }
1667 ms.m = m
1668}
1669
1670func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1671 ms.ensureMsg(m)
1672
1673 if ms.err == nil {
1674 if ms.part == nil {
1675 if m.ParsedBuf == nil {
1676 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1677 return false
1678 }
1679 var p message.Part
1680 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1681 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1682 return false
1683 }
1684 ms.part = &p
1685 }
1686 if withMsgReader && ms.msgr == nil {
1687 ms.msgr = ms.acc.MessageReader(m)
1688 ms.part.SetReaderAt(ms.msgr)
1689 }
1690 }
1691 return ms.part != nil
1692}
1693
1694// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1695// filters for a query. A nil function is returned if there are no flags to filter
1696// on.
1697func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1698 labels := map[string]bool{}
1699 for _, k := range q.Filter.Labels {
1700 labels[k] = true
1701 }
1702 for _, k := range q.NotFilter.Labels {
1703 labels[k] = false
1704 }
1705
1706 if len(labels) == 0 {
1707 return nil
1708 }
1709
1710 var mask, flags store.Flags
1711 systemflags := map[string][]*bool{
1712 `\answered`: {&mask.Answered, &flags.Answered},
1713 `\flagged`: {&mask.Flagged, &flags.Flagged},
1714 `\deleted`: {&mask.Deleted, &flags.Deleted},
1715 `\seen`: {&mask.Seen, &flags.Seen},
1716 `\draft`: {&mask.Draft, &flags.Draft},
1717 `$junk`: {&mask.Junk, &flags.Junk},
1718 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1719 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1720 `$phishing`: {&mask.Phishing, &flags.Phishing},
1721 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1722 }
1723 keywords := map[string]bool{}
1724 for k, v := range labels {
1725 k = strings.ToLower(k)
1726 if mf, ok := systemflags[k]; ok {
1727 *mf[0] = true
1728 *mf[1] = v
1729 } else {
1730 keywords[k] = v
1731 }
1732 }
1733 return func(msgFlags store.Flags, msgKeywords []string) bool {
1734 var f store.Flags
1735 if f.Set(mask, msgFlags) != flags {
1736 return false
1737 }
1738 for k, v := range keywords {
1739 if slices.Contains(msgKeywords, k) != v {
1740 return false
1741 }
1742 }
1743 return true
1744 }
1745}
1746
1747// attachmentFilterFn returns a function that filters for the attachment-related
1748// filter from the query. A nil function is returned if there are attachment
1749// filters.
1750func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1751 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1752 return nil
1753 }
1754
1755 return func(m store.Message) bool {
1756 if !state.ensurePart(m, false) {
1757 return false
1758 }
1759 types, err := attachmentTypes(log, m, state)
1760 if err != nil {
1761 state.err = err
1762 return false
1763 }
1764 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1765 }
1766}
1767
1768var attachmentMimetypes = map[string]AttachmentType{
1769 "application/pdf": AttachmentPDF,
1770 "application/zip": AttachmentArchive,
1771 "application/x-rar-compressed": AttachmentArchive,
1772 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1773 "application/vnd.ms-excel": AttachmentSpreadsheet,
1774 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1775 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1776 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1777 "application/vnd.ms-powerpoint": AttachmentPresentation,
1778 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1779}
1780var attachmentExtensions = map[string]AttachmentType{
1781 ".pdf": AttachmentPDF,
1782 ".zip": AttachmentArchive,
1783 ".tar": AttachmentArchive,
1784 ".tgz": AttachmentArchive,
1785 ".tar.gz": AttachmentArchive,
1786 ".tbz2": AttachmentArchive,
1787 ".tar.bz2": AttachmentArchive,
1788 ".tar.lz": AttachmentArchive,
1789 ".tlz": AttachmentArchive,
1790 ".tar.xz": AttachmentArchive,
1791 ".txz": AttachmentArchive,
1792 ".tar.zst": AttachmentArchive,
1793 ".tar.lz4": AttachmentArchive,
1794 ".7z": AttachmentArchive,
1795 ".rar": AttachmentArchive,
1796 ".ods": AttachmentSpreadsheet,
1797 ".xls": AttachmentSpreadsheet,
1798 ".xlsx": AttachmentSpreadsheet,
1799 ".odt": AttachmentDocument,
1800 ".doc": AttachmentDocument,
1801 ".docx": AttachmentDocument,
1802 ".odp": AttachmentPresentation,
1803 ".ppt": AttachmentPresentation,
1804 ".pptx": AttachmentPresentation,
1805}
1806
1807func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1808 types := map[AttachmentType]bool{}
1809
1810 pm, err := parsedMessage(log, m, state, false, false)
1811 if err != nil {
1812 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1813 }
1814 for _, a := range pm.attachments {
1815 if a.Part.MediaType == "IMAGE" {
1816 types[AttachmentImage] = true
1817 continue
1818 }
1819 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1820 if t, ok := attachmentMimetypes[mt]; ok {
1821 types[t] = true
1822 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1823 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1824 types[t] = true
1825 } else {
1826 continue
1827 }
1828 }
1829 }
1830
1831 if len(types) == 0 {
1832 types[AttachmentNone] = true
1833 } else {
1834 types[AttachmentAny] = true
1835 }
1836 return types, nil
1837}
1838
1839// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1840// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1841// clash with SMTP envelope) for the query. A nil function is returned if no
1842// filtering is needed.
1843func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1844 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1845 return nil
1846 }
1847
1848 lower := func(l []string) []string {
1849 if len(l) == 0 {
1850 return nil
1851 }
1852 r := make([]string, len(l))
1853 for i, s := range l {
1854 r[i] = strings.ToLower(s)
1855 }
1856 return r
1857 }
1858
1859 filterSubject := lower(q.Filter.Subject)
1860 notFilterSubject := lower(q.NotFilter.Subject)
1861 filterFrom := lower(q.Filter.From)
1862 notFilterFrom := lower(q.NotFilter.From)
1863 filterTo := lower(q.Filter.To)
1864 notFilterTo := lower(q.NotFilter.To)
1865
1866 return func(m store.Message) bool {
1867 if !state.ensurePart(m, false) {
1868 return false
1869 }
1870
1871 var env message.Envelope
1872 if state.part.Envelope != nil {
1873 env = *state.part.Envelope
1874 }
1875
1876 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1877 subject := strings.ToLower(env.Subject)
1878 for _, s := range filterSubject {
1879 if !strings.Contains(subject, s) {
1880 return false
1881 }
1882 }
1883 for _, s := range notFilterSubject {
1884 if strings.Contains(subject, s) {
1885 return false
1886 }
1887 }
1888 }
1889
1890 contains := func(textLower []string, l []message.Address, all bool) bool {
1891 next:
1892 for _, s := range textLower {
1893 for _, a := range l {
1894 name := strings.ToLower(a.Name)
1895 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1896 if strings.Contains(name, s) || strings.Contains(addr, s) {
1897 if !all {
1898 return true
1899 }
1900 continue next
1901 }
1902 }
1903 if all {
1904 return false
1905 }
1906 }
1907 return all
1908 }
1909
1910 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1911 return false
1912 }
1913 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1914 return false
1915 }
1916 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1917 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1918 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1919 return false
1920 }
1921 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1922 return false
1923 }
1924 }
1925 return true
1926 }
1927}
1928
1929// headerFilterFn returns a function that filters for the header filters in the
1930// query. A nil function is returned if there are no header filters.
1931func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1932 if len(q.Filter.Headers) == 0 {
1933 return nil
1934 }
1935
1936 lowerValues := make([]string, len(q.Filter.Headers))
1937 for i, t := range q.Filter.Headers {
1938 lowerValues[i] = strings.ToLower(t[1])
1939 }
1940
1941 return func(m store.Message) bool {
1942 if !state.ensurePart(m, true) {
1943 return false
1944 }
1945 hdr, err := state.part.Header()
1946 if err != nil {
1947 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1948 return false
1949 }
1950
1951 next:
1952 for i, t := range q.Filter.Headers {
1953 k := t[0]
1954 v := lowerValues[i]
1955 l := hdr.Values(k)
1956 if v == "" && len(l) > 0 {
1957 continue
1958 }
1959 for _, e := range l {
1960 if strings.Contains(strings.ToLower(e), v) {
1961 continue next
1962 }
1963 }
1964 return false
1965 }
1966 return true
1967 }
1968}
1969
1970// wordFiltersFn returns a function that applies the word filters of the query. A
1971// nil function is returned when query does not contain a word filter.
1972func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1973 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1974 return nil
1975 }
1976
1977 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1978
1979 return func(m store.Message) bool {
1980 if !state.ensurePart(m, true) {
1981 return false
1982 }
1983
1984 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1985 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
1986 return false
1987 } else {
1988 return ok
1989 }
1990 }
1991}
1992