1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36)
37
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
40type Request struct {
41 ID int64
42
43 SSEID int64 // SSE connection.
44
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
47 ViewID int64
48
49 // If set, this request and its view are canceled. A new view must be started.
50 Cancel bool
51
52 Query Query
53 Page Page
54}
55
56type ThreadMode string
57
58const (
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
62)
63
64// Query is a request for messages that match filters, in a given order.
65type Query struct {
66 OrderAsc bool // Order by received ascending or desending.
67 Threading ThreadMode
68 Filter Filter
69 NotFilter NotFilter
70}
71
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
74
75const (
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
85)
86
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
89type Filter struct {
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
91 MailboxID int64
92
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
95
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
98 // MailboxID.
99 MailboxName string
100
101 Words []string // Case insensitive substring match for each string.
102 From []string
103 To []string // Including Cc and Bcc.
104 Oldest *time.Time
105 Newest *time.Time
106 Subject []string
107 Attachments AttachmentType
108 Labels []string
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
110 SizeMin int64
111 SizeMax int64
112}
113
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
116 Words []string
117 From []string
118 To []string
119 Subject []string
120 Attachments AttachmentType
121 Labels []string
122}
123
124// Page holds pagination parameters for a request.
125type Page struct {
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
129
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
131 // one request.
132 Count int
133
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
136 // message if found.
137 DestMessageID int64
138}
139
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143// included.
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
147 Domain dns.Domain
148}
149
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
154 Date time.Time
155 Subject string
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
159 To []MessageAddress
160 CC []MessageAddress
161 BCC []MessageAddress
162 InReplyTo string
163 MessageID string
164}
165
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
168// messages.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
173 IsSigned bool
174 IsEncrypted bool
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
178}
179
180// ParsedMessage has more parsed/derived information about a message, intended
181// for rendering the (contents of the) message. Information from MessageItem is
182// not duplicated.
183type ParsedMessage struct {
184 ID int64
185 Part message.Part
186 Headers map[string][]string
187 ViewMode store.ViewMode
188
189 // Text parts, can be empty.
190 Texts []string
191
192 // Whether there is an HTML part. The webclient renders HTML message parts through
193 // an iframe and a separate request with strict CSP headers to prevent script
194 // execution and loading of external resources, which isn't possible when loading
195 // in iframe with inline HTML because not all browsers support the iframe csp
196 // attribute.
197 HasHTML bool
198
199 ListReplyAddress *MessageAddress // From List-Post.
200
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
204 isSigned bool
205 isEncrypted bool
206 firstLine string
207}
208
209// EventStart is the first message sent on an SSE connection, giving the client
210// basic data to populate its UI. After this event, messages will follow quickly in
211// an EventViewMsgs event.
212type EventStart struct {
213 SSEID int64
214 LoginAddress MessageAddress
215 Addresses []MessageAddress
216 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 MailboxName string
218 Mailboxes []store.Mailbox
219 RejectsMailbox string
220 Settings store.Settings
221 AccountPath string // If nonempty, the path on same host to webaccount interface.
222 Version string
223}
224
225// DomainAddressConfig has the address (localpart) configuration for a domain, so
226// the webmail client can decide if an address matches the addresses of the
227// account.
228type DomainAddressConfig struct {
229 LocalpartCatchallSeparator string // Can be empty.
230 LocalpartCaseSensitive bool
231}
232
233// EventViewMsgs contains messages for a view, possibly a continuation of an
234// earlier list of messages.
235type EventViewMsgs struct {
236 ViewID int64
237 RequestID int64
238
239 // If empty, this was the last message for the request. If non-empty, a list of
240 // thread messages. Each with the first message being the reason this thread is
241 // included and can be used as AnchorID in followup requests. If the threading mode
242 // is "off" in the query, there will always be only a single message. If a thread
243 // is sent, all messages in the thread are sent, including those that don't match
244 // the query (e.g. from another mailbox). Threads can be displayed based on the
245 // ThreadParentIDs field, with possibly slightly different display based on field
246 // ThreadMissingLink.
247 MessageItems [][]MessageItem
248
249 // If set, will match the target page.DestMessageID from the request.
250 ParsedMessage *ParsedMessage
251
252 // If set, there are no more messages in this view at this moment. Messages can be
253 // added, typically via Change messages, e.g. for new deliveries.
254 ViewEnd bool
255}
256
257// EventViewErr indicates an error during a query for messages. The request is
258// aborted, no more request-related messages will be sent until the next request.
259type EventViewErr struct {
260 ViewID int64
261 RequestID int64
262 Err string // To be displayed in client.
263 err error // Original message, for checking against context.Canceled.
264}
265
266// EventViewReset indicates that a request for the next set of messages in a few
267// could not be fulfilled, e.g. because the anchor message does not exist anymore.
268// The client should clear its list of messages. This can happen before
269// EventViewMsgs events are sent.
270type EventViewReset struct {
271 ViewID int64
272 RequestID int64
273}
274
275// EventViewChanges contain one or more changes relevant for the client, either
276// with new mailbox total/unseen message counts, or messages added/removed/modified
277// (flags) for the current view.
278type EventViewChanges struct {
279 ViewID int64
280 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
281}
282
283// ChangeMsgAdd adds a new message and possibly its thread to the view.
284type ChangeMsgAdd struct {
285 store.ChangeAddUID
286 MessageItems []MessageItem
287}
288
289// ChangeMsgRemove removes one or more messages from the view.
290type ChangeMsgRemove struct {
291 store.ChangeRemoveUIDs
292}
293
294// ChangeMsgFlags updates flags for one message.
295type ChangeMsgFlags struct {
296 store.ChangeFlags
297}
298
299// ChangeMsgThread updates muted/collapsed fields for one message.
300type ChangeMsgThread struct {
301 store.ChangeThread
302}
303
304// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
305type ChangeMailboxRemove struct {
306 store.ChangeRemoveMailbox
307}
308
309// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
310type ChangeMailboxAdd struct {
311 Mailbox store.Mailbox
312}
313
314// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
315// It could be under a new parent.
316type ChangeMailboxRename struct {
317 store.ChangeRenameMailbox
318}
319
320// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
321type ChangeMailboxCounts struct {
322 store.ChangeMailboxCounts
323}
324
325// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
326type ChangeMailboxSpecialUse struct {
327 store.ChangeMailboxSpecialUse
328}
329
330// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
331// a message was added with a keyword that wasn't in the mailbox yet.
332type ChangeMailboxKeywords struct {
333 store.ChangeMailboxKeywords
334}
335
336// View holds the information about the returned data for a query. It is used to
337// determine whether mailbox changes should be sent to the client, we only send
338// addition/removal/flag-changes of messages that are in view, or would extend it
339// if the view is at the end of the results.
340type view struct {
341 Request Request
342
343 // Received of last message we sent to the client. We use it to decide if a newly
344 // delivered message is within the view and the client should get a notification.
345 LastMessageReceived time.Time
346
347 // If set, the last message in the query view has been sent. There is no need to do
348 // another query, it will not return more data. Used to decide if an event for a
349 // new message should be sent.
350 End bool
351
352 // Whether message must or must not match mailboxIDs.
353 matchMailboxIDs bool
354 // Mailboxes to match, can be multiple, for matching children. If empty, there is
355 // no filter on mailboxes.
356 mailboxIDs map[int64]bool
357
358 // Threads sent to client. New messages for this thread are also sent, regardless
359 // of regular query matching, so also for other mailboxes. If the user (re)moved
360 // all messages of a thread, they may still receive events for the thread. Only
361 // filled when query with threading not off.
362 threadIDs map[int64]struct{}
363}
364
365// sses tracks all sse connections, and access to them.
366var sses = struct {
367 sync.Mutex
368 gen int64
369 m map[int64]sse
370}{m: map[int64]sse{}}
371
372// sse represents an sse connection.
373type sse struct {
374 ID int64 // Also returned in EventStart and used in Request to identify the request.
375 AccountName string // Used to check the authenticated user has access to the SSE connection.
376 Request chan Request // Goroutine will receive requests from here, coming from API calls.
377}
378
379// called by the goroutine when the connection is closed or breaks.
380func (sse sse) unregister() {
381 sses.Lock()
382 defer sses.Unlock()
383 delete(sses.m, sse.ID)
384
385 // Drain any pending requests, preventing blocked goroutines from API calls.
386 for {
387 select {
388 case <-sse.Request:
389 default:
390 return
391 }
392 }
393}
394
395func sseRegister(accountName string) sse {
396 sses.Lock()
397 defer sses.Unlock()
398 sses.gen++
399 v := sse{sses.gen, accountName, make(chan Request, 1)}
400 sses.m[v.ID] = v
401 return v
402}
403
404// sseGet returns a reference to an existing connection if it exists and user
405// has access.
406func sseGet(id int64, accountName string) (sse, bool) {
407 sses.Lock()
408 defer sses.Unlock()
409 s := sses.m[id]
410 if s.AccountName != accountName {
411 return sse{}, false
412 }
413 return s, true
414}
415
416// ssetoken is a temporary token that has not yet been used to start an SSE
417// connection. Created by Token, consumed by a new SSE connection.
418type ssetoken struct {
419 token string // Uniquely generated.
420 accName string
421 address string // Address used to authenticate in call that created the token.
422 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
423 validUntil time.Time
424}
425
426// ssetokens maintains unused tokens. We have just one, but it's a type so we
427// can define methods.
428type ssetokens struct {
429 sync.Mutex
430 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
431 tokens map[string]ssetoken // Token to details, for finding account for a token.
432}
433
434var sseTokens = ssetokens{
435 accountTokens: map[string][]ssetoken{},
436 tokens: map[string]ssetoken{},
437}
438
439// xgenerate creates and saves a new token. It ensures no more than 10 tokens
440// per account exist, removing old ones if needed.
441func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
442 buf := make([]byte, 16)
443 _, err := cryptrand.Read(buf)
444 xcheckf(ctx, err, "generating token")
445 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
446
447 x.Lock()
448 defer x.Unlock()
449 n := len(x.accountTokens[accName])
450 if n >= 10 {
451 for _, ost := range x.accountTokens[accName][:n-9] {
452 delete(x.tokens, ost.token)
453 }
454 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
455 x.accountTokens[accName] = x.accountTokens[accName][:9]
456 }
457 x.accountTokens[accName] = append(x.accountTokens[accName], st)
458 x.tokens[st.token] = st
459 return st.token
460}
461
462// check verifies a token, and consumes it if valid.
463func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
464 x.Lock()
465 defer x.Unlock()
466
467 st, ok := x.tokens[token]
468 if !ok {
469 return "", "", "", false, nil
470 }
471 delete(x.tokens, token)
472 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
473 return "", "", "", false, errors.New("internal error, could not find token in account")
474 } else {
475 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
476 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
477 if len(x.accountTokens[st.accName]) == 0 {
478 delete(x.accountTokens, st.accName)
479 }
480 }
481 if time.Now().After(st.validUntil) {
482 return "", "", "", false, nil
483 }
484 return st.accName, st.address, st.sessionToken, true, nil
485}
486
487// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
488type ioErr struct {
489 err error
490}
491
492// ensure we have a non-nil moreHeaders, taking it from Settings.
493func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
494 if moreHeaders != nil {
495 return moreHeaders, nil
496 }
497
498 s := store.Settings{ID: 1}
499 if err := tx.Get(&s); err != nil {
500 return nil, fmt.Errorf("get settings: %v", err)
501 }
502 moreHeaders = s.ShowHeaders
503 if moreHeaders == nil {
504 moreHeaders = []string{} // Ensure we won't get Settings again next call.
505 }
506 return moreHeaders, nil
507}
508
509// serveEvents serves an SSE connection. Authentication is done through a query
510// string parameter "singleUseToken", a one-time-use token returned by the Token
511// API call.
512func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
513 if r.Method != "GET" {
514 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
515 return
516 }
517
518 flusher, ok := w.(http.Flusher)
519 if !ok {
520 log.Error("internal error: ResponseWriter not a http.Flusher")
521 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
522 return
523 }
524
525 q := r.URL.Query()
526 token := q.Get("singleUseToken")
527 if token == "" {
528 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
529 return
530 }
531 accName, address, sessionToken, ok, err := sseTokens.check(token)
532 if err != nil {
533 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
534 return
535 }
536 if !ok {
537 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
538 return
539 }
540 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
541 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
542 return
543 }
544
545 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
546 // incoming responses with its slow-network similation.
547 var waitMin, waitMax time.Duration
548 waitMinMsec := q.Get("waitMinMsec")
549 waitMaxMsec := q.Get("waitMaxMsec")
550 if waitMinMsec != "" && waitMaxMsec != "" {
551 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
552 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
553 return
554 } else {
555 waitMin = time.Duration(v) * time.Millisecond
556 }
557
558 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
559 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
560 return
561 } else {
562 waitMax = time.Duration(v) * time.Millisecond
563 }
564 }
565
566 // Parse the request with initial mailbox/search criteria.
567 var req Request
568 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
569 dec.DisallowUnknownFields()
570 if err := dec.Decode(&req); err != nil {
571 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
572 return
573 } else if req.Page.Count <= 0 {
574 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
575 return
576 }
577 if req.Query.Threading == "" {
578 req.Query.Threading = ThreadOff
579 }
580
581 var writer *eventWriter
582
583 metricSSEConnections.Inc()
584 defer metricSSEConnections.Dec()
585
586 // Below here, error handling cause through xcheckf, which panics with
587 // *sherpa.Error, after which we send an error event to the client. We can also get
588 // an *ioErr when the connection is broken.
589 defer func() {
590 x := recover()
591 if x == nil {
592 return
593 }
594 if err, ok := x.(*sherpa.Error); ok {
595 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
596 } else if _, ok := x.(ioErr); ok {
597 return
598 } else {
599 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
600 debug.PrintStack()
601 metrics.PanicInc(metrics.Webmail)
602 panic(x)
603 }
604 }()
605
606 h := w.Header()
607 h.Set("Content-Type", "text/event-stream")
608 h.Set("Cache-Control", "no-cache")
609
610 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
611 // keys), so should be quite compressible.
612 var out writeFlusher
613 gz := mox.AcceptsGzip(r)
614 if gz {
615 h.Set("Content-Encoding", "gzip")
616 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
617 } else {
618 out = nopFlusher{w}
619 }
620 out = httpFlusher{out, flusher}
621
622 // We'll be writing outgoing SSE events through writer.
623 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
624 defer writer.close()
625
626 // Fetch initial data.
627 acc, err := store.OpenAccount(log, accName)
628 xcheckf(ctx, err, "open account")
629 defer func() {
630 err := acc.Close()
631 log.Check(err, "closing account")
632 }()
633 comm := store.RegisterComm(acc)
634 defer comm.Unregister()
635
636 // List addresses that the client can use to send email from.
637 accConf, _ := acc.Conf()
638 loginAddr, err := smtp.ParseAddress(address)
639 xcheckf(ctx, err, "parsing login address")
640 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false)
641 xcheckf(ctx, err, "looking up destination for login address")
642 loginName := accConf.FullName
643 if dest.FullName != "" {
644 loginName = dest.FullName
645 }
646 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
647 var addresses []MessageAddress
648 for a, dest := range accConf.Destinations {
649 name := dest.FullName
650 if name == "" {
651 name = accConf.FullName
652 }
653 var ma MessageAddress
654 if strings.HasPrefix(a, "@") {
655 dom, err := dns.ParseDomain(a[1:])
656 xcheckf(ctx, err, "parsing destination address for account")
657 ma = MessageAddress{Domain: dom}
658 } else {
659 addr, err := smtp.ParseAddress(a)
660 xcheckf(ctx, err, "parsing destination address for account")
661 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
662 }
663 addresses = append(addresses, ma)
664 }
665 // User is allowed to send using alias address as message From address. Webmail
666 // will choose it when replying to a message sent to that address.
667 aliasAddrs := map[MessageAddress]bool{}
668 for _, a := range accConf.Aliases {
669 if a.Alias.AllowMsgFrom {
670 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
671 if !aliasAddrs[ma] {
672 addresses = append(addresses, ma)
673 }
674 aliasAddrs[ma] = true
675 }
676 }
677
678 // We implicitly start a query. We use the reqctx for the transaction, because the
679 // transaction is passed to the query, which can be canceled.
680 reqctx, reqctxcancel := context.WithCancel(ctx)
681 defer func() {
682 // We also cancel in cancelDrain later on, but there is a brief window where the
683 // context wouldn't be canceled.
684 if reqctxcancel != nil {
685 reqctxcancel()
686 reqctxcancel = nil
687 }
688 }()
689
690 // qtx is kept around during connection initialization, until we pass it off to the
691 // goroutine that starts querying for messages.
692 var qtx *bstore.Tx
693 defer func() {
694 if qtx != nil {
695 err := qtx.Rollback()
696 log.Check(err, "rolling back")
697 }
698 }()
699
700 var mbl []store.Mailbox
701 settings := store.Settings{ID: 1}
702
703 // We only take the rlock when getting the tx.
704 acc.WithRLock(func() {
705 // Now a read-only transaction we'll use during the query.
706 qtx, err = acc.DB.Begin(reqctx, false)
707 xcheckf(ctx, err, "begin transaction")
708
709 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
710 xcheckf(ctx, err, "list mailboxes")
711
712 err = qtx.Get(&settings)
713 xcheckf(ctx, err, "get settings")
714 })
715
716 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
717 var zerofilter Filter
718 var zeronotfilter NotFilter
719 var mailbox store.Mailbox
720 var mailboxPrefixes []string
721 var matchMailboxes bool
722 mailboxIDs := map[int64]bool{}
723 mailboxName := req.Query.Filter.MailboxName
724 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
725 if mailboxName == "" {
726 mailboxName = "Inbox"
727 }
728
729 var inbox store.Mailbox
730 for _, e := range mbl {
731 if e.Name == mailboxName {
732 mailbox = e
733 }
734 if e.Name == "Inbox" {
735 inbox = e
736 }
737 }
738 if mailbox.ID == 0 {
739 mailbox = inbox
740 }
741 if mailbox.ID == 0 {
742 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
743 }
744 req.Query.Filter.MailboxID = mailbox.ID
745 req.Query.Filter.MailboxName = ""
746 mailboxPrefixes = []string{mailbox.Name + "/"}
747 matchMailboxes = true
748 mailboxIDs[mailbox.ID] = true
749 } else {
750 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
751 }
752 if req.Query.Filter.MailboxChildrenIncluded {
753 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
754 }
755
756 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
757
758 sse := sseRegister(acc.Name)
759 defer sse.unregister()
760
761 // Per-domain localpart config so webclient can decide if an address belongs to the account.
762 domainAddressConfigs := map[string]DomainAddressConfig{}
763 for _, a := range addresses {
764 dom, _ := mox.Conf.Domain(a.Domain)
765 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
766 }
767
768 // Write first event, allowing client to fill its UI with mailboxes.
769 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
770 writer.xsendEvent(ctx, log, "start", start)
771
772 // The goroutine doing the querying will send messages on these channels, which
773 // result in an event being written on the SSE connection.
774 viewMsgsc := make(chan EventViewMsgs)
775 viewErrc := make(chan EventViewErr)
776 viewResetc := make(chan EventViewReset)
777 donec := make(chan int64) // When request is done.
778
779 // Start a view, it determines if we send a change to the client. And start an
780 // implicit query for messages, we'll send the messages to the client which can
781 // fill its ui with messages.
782 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
783 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
784 qtx = nil // viewRequestTx closes qtx
785
786 // When canceling a query, we must drain its messages until it says it is done.
787 // Otherwise the sending goroutine would hang indefinitely on a channel send.
788 cancelDrain := func() {
789 if reqctxcancel != nil {
790 // Cancel the goroutine doing the querying.
791 reqctxcancel()
792 reqctx = nil
793 reqctxcancel = nil
794 } else {
795 return
796 }
797
798 // Drain events until done.
799 for {
800 select {
801 case <-viewMsgsc:
802 case <-viewErrc:
803 case <-viewResetc:
804 case <-donec:
805 return
806 }
807 }
808 }
809
810 // If we stop and a query is in progress, we must drain the channel it will send on.
811 defer cancelDrain()
812
813 // Changes broadcasted by other connections on this account. If applicable for the
814 // connection/view, we send events.
815 xprocessChanges := func(changes []store.Change) {
816 taggedChanges := [][2]any{}
817
818 // We get a transaction first time we need it.
819 var xtx *bstore.Tx
820 defer func() {
821 if xtx != nil {
822 err := xtx.Rollback()
823 log.Check(err, "rolling back transaction")
824 }
825 }()
826 ensureTx := func() error {
827 if xtx != nil {
828 return nil
829 }
830 acc.RLock()
831 defer acc.RUnlock()
832 var err error
833 xtx, err = acc.DB.Begin(ctx, false)
834 return err
835 }
836 // This getmsg will now only be called mailboxID+UID, not with messageID set.
837 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
838 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
839 if err := ensureTx(); err != nil {
840 return store.Message{}, fmt.Errorf("transaction: %v", err)
841 }
842 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
843 }
844
845 // Additional headers from settings to add to MessageItems.
846 var moreHeaders []string
847 xmoreHeaders := func() []string {
848 err := ensureTx()
849 xcheckf(ctx, err, "transaction")
850
851 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
852 xcheckf(ctx, err, "ensuring more headers")
853 return moreHeaders
854 }
855
856 // Return uids that are within range in view. Because the end has been reached, or
857 // because the UID is not after the last message.
858 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
859 uidsAny := make([]any, len(uids))
860 for i, uid := range uids {
861 uidsAny[i] = uid
862 }
863 err := ensureTx()
864 xcheckf(ctx, err, "transaction")
865 q := bstore.QueryTx[store.Message](xtx)
866 q.FilterNonzero(store.Message{MailboxID: mailboxID})
867 q.FilterEqual("UID", uidsAny...)
868 mbOK := v.matchesMailbox(mailboxID)
869 err = q.ForEach(func(m store.Message) error {
870 _, thread := v.threadIDs[m.ThreadID]
871 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
872 changedUIDs = append(changedUIDs, m.UID)
873 }
874 return nil
875 })
876 xcheckf(ctx, err, "fetching messages for change")
877 return changedUIDs
878 }
879
880 // Forward changes that are relevant to the current view.
881 for _, change := range changes {
882 switch c := change.(type) {
883 case store.ChangeAddUID:
884 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
885 xcheckf(ctx, err, "matching new message against view")
886 m, err := getmsg(0, c.MailboxID, c.UID)
887 xcheckf(ctx, err, "get message")
888 _, thread := v.threadIDs[m.ThreadID]
889 if !ok && !thread {
890 continue
891 }
892
893 state := msgState{acc: acc}
894 mi, err := messageItem(log, m, &state, xmoreHeaders())
895 state.clear()
896 xcheckf(ctx, err, "make messageitem")
897 mi.MatchQuery = ok
898
899 mil := []MessageItem{mi}
900 if !thread && req.Query.Threading != ThreadOff {
901 err := ensureTx()
902 xcheckf(ctx, err, "transaction")
903 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
904 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
905 mil = append(mil, more...)
906 v.threadIDs[m.ThreadID] = struct{}{}
907 }
908
909 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
910
911 // If message extends the view, store it as such.
912 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
913 v.LastMessageReceived = m.Received
914 }
915
916 case store.ChangeRemoveUIDs:
917 // We may send changes for uids the client doesn't know, that's fine.
918 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
919 if len(changedUIDs) == 0 {
920 continue
921 }
922 ch := ChangeMsgRemove{c}
923 ch.UIDs = changedUIDs
924 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
925
926 case store.ChangeFlags:
927 // We may send changes for uids the client doesn't know, that's fine.
928 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
929 if len(changedUIDs) == 0 {
930 continue
931 }
932 ch := ChangeMsgFlags{c}
933 ch.UID = changedUIDs[0]
934 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
935
936 case store.ChangeThread:
937 // Change in muted/collaped state, just always ship it.
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
939
940 case store.ChangeRemoveMailbox:
941 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
942
943 case store.ChangeAddMailbox:
944 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
945
946 case store.ChangeRenameMailbox:
947 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
948
949 case store.ChangeMailboxCounts:
950 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
951
952 case store.ChangeMailboxSpecialUse:
953 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
954
955 case store.ChangeMailboxKeywords:
956 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
957
958 case store.ChangeAddSubscription:
959 // Webmail does not care about subscriptions.
960
961 default:
962 panic(fmt.Sprintf("missing case for change %T", c))
963 }
964 }
965
966 if len(taggedChanges) > 0 {
967 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
968 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
969 }
970 }
971
972 timer := time.NewTimer(5 * time.Minute) // For keepalives.
973 defer timer.Stop()
974 for {
975 if writer.wrote {
976 timer.Reset(5 * time.Minute)
977 writer.wrote = false
978 }
979
980 pending := comm.Pending
981 if reqctx != nil {
982 pending = nil
983 }
984
985 select {
986 case <-mox.Shutdown.Done():
987 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
988 // Work around go vet, it doesn't see defer cancelDrain.
989 if reqctxcancel != nil {
990 reqctxcancel()
991 }
992 return
993
994 case <-timer.C:
995 _, err := fmt.Fprintf(out, ": keepalive\n\n")
996 if err != nil {
997 log.Errorx("write keepalive", err)
998 // Work around go vet, it doesn't see defer cancelDrain.
999 if reqctxcancel != nil {
1000 reqctxcancel()
1001 }
1002 return
1003 }
1004 out.Flush()
1005 writer.wrote = true
1006
1007 case vm := <-viewMsgsc:
1008 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1009 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1010 }
1011 if vm.ViewEnd {
1012 v.End = true
1013 }
1014 if len(vm.MessageItems) > 0 {
1015 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1016 }
1017 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1018
1019 case ve := <-viewErrc:
1020 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1021 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1022 }
1023 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
1024 // Work around go vet, it doesn't see defer cancelDrain.
1025 if reqctxcancel != nil {
1026 reqctxcancel()
1027 }
1028 return
1029 }
1030 writer.xsendEvent(ctx, log, "viewErr", ve)
1031
1032 case vr := <-viewResetc:
1033 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1034 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1035 }
1036 writer.xsendEvent(ctx, log, "viewReset", vr)
1037
1038 case id := <-donec:
1039 if id != v.Request.ID {
1040 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1041 }
1042 if reqctxcancel != nil {
1043 reqctxcancel()
1044 }
1045 reqctx = nil
1046 reqctxcancel = nil
1047
1048 case req := <-sse.Request:
1049 if reqctx != nil {
1050 cancelDrain()
1051 }
1052 if req.Cancel {
1053 v = view{req, time.Time{}, false, false, nil, nil}
1054 continue
1055 }
1056
1057 reqctx, reqctxcancel = context.WithCancel(ctx)
1058
1059 stop := func() (stop bool) {
1060 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1061 var rtx *bstore.Tx
1062 var err error
1063 defer func() {
1064 if rtx != nil {
1065 err = rtx.Rollback()
1066 log.Check(err, "rolling back transaction")
1067 }
1068 }()
1069 acc.WithRLock(func() {
1070 rtx, err = acc.DB.Begin(reqctx, false)
1071 })
1072 if err != nil {
1073 reqctxcancel()
1074 reqctx = nil
1075 reqctxcancel = nil
1076
1077 if errors.Is(err, context.Canceled) {
1078 return true
1079 }
1080 err := fmt.Errorf("begin transaction: %v", err)
1081 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1082 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1083 return false
1084 }
1085
1086 // Reset view state for new query.
1087 if req.ViewID != v.Request.ViewID {
1088 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1089 if req.Query.Filter.MailboxChildrenIncluded {
1090 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1091 }
1092 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1093 } else {
1094 v.Request = req
1095 }
1096 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1097 rtx = nil
1098 return false
1099 }()
1100 if stop {
1101 return
1102 }
1103
1104 case <-pending:
1105 xprocessChanges(comm.Get())
1106
1107 case <-ctx.Done():
1108 // Work around go vet, it doesn't see defer cancelDrain.
1109 if reqctxcancel != nil {
1110 reqctxcancel()
1111 }
1112 return
1113 }
1114 }
1115}
1116
1117// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1118// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1119// mailboxIDs must or must not match. mailboxPrefixes is for use with
1120// xgatherMailboxIDs to gather children of the mailboxIDs.
1121func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1122 matchMailboxes = true
1123 mailboxIDs = map[int64]bool{}
1124 if f.MailboxID == -1 {
1125 matchMailboxes = false
1126 // Add the trash, junk and account rejects mailbox.
1127 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1128 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1129 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1130 mailboxIDs[mb.ID] = true
1131 }
1132 return nil
1133 })
1134 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1135 } else if f.MailboxID > 0 {
1136 mb := store.Mailbox{ID: f.MailboxID}
1137 err := tx.Get(&mb)
1138 xcheckf(ctx, err, "get mailbox")
1139 mailboxIDs[f.MailboxID] = true
1140 mailboxPrefixes = []string{mb.Name + "/"}
1141 }
1142 return
1143}
1144
1145// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1146// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1147func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1148 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1149 if len(mailboxPrefixes) == 0 {
1150 return
1151 }
1152 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1153 for _, p := range mailboxPrefixes {
1154 if strings.HasPrefix(mb.Name, p) {
1155 mailboxIDs[mb.ID] = true
1156 break
1157 }
1158 }
1159 return nil
1160 })
1161 xcheckf(ctx, err, "gathering mailboxes")
1162}
1163
1164// matchesMailbox returns whether a mailbox matches the view.
1165func (v view) matchesMailbox(mailboxID int64) bool {
1166 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1167}
1168
1169// inRange returns whether m is within the range for the view, whether a change for
1170// this message should be sent to the client so it can update its state.
1171func (v view) inRange(m store.Message) bool {
1172 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1173}
1174
1175// matches checks if the message, identified by either messageID or mailboxID+UID,
1176// is in the current "view" (i.e. passing the filters, and if checkRange is set
1177// also if within the range of sent messages based on sort order and the last seen
1178// message). getmsg retrieves the message, which may be necessary depending on the
1179// active filters. Used to determine if a store.Change with a new message should be
1180// sent, and for the destination and anchor messages in view requests.
1181func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1182 var m store.Message
1183 ensureMessage := func() bool {
1184 if m.ID == 0 && rerr == nil {
1185 m, rerr = getmsg(messageID, mailboxID, uid)
1186 }
1187 return rerr == nil
1188 }
1189
1190 q := v.Request.Query
1191
1192 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1193
1194 // Check filters.
1195 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1196 return false, rerr
1197 }
1198 // note: anchorMessageID is not relevant for matching.
1199 flagfilter := q.flagFilterFn()
1200 if flagfilter != nil && !flagfilter(flags, keywords) {
1201 return false, rerr
1202 }
1203
1204 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1205 return false, rerr
1206 }
1207 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1208 return false, rerr
1209 }
1210
1211 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1212 return false, rerr
1213 }
1214 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1215 return false, rerr
1216 }
1217
1218 state := msgState{acc: acc}
1219 defer func() {
1220 if rerr == nil && state.err != nil {
1221 rerr = state.err
1222 }
1223 state.clear()
1224 }()
1225
1226 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1227 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1228 return false, rerr
1229 }
1230
1231 envFilter := q.envFilterFn(log, &state)
1232 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1233 return false, rerr
1234 }
1235
1236 headerFilter := q.headerFilterFn(log, &state)
1237 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1238 return false, rerr
1239 }
1240
1241 wordsFilter := q.wordsFilterFn(log, &state)
1242 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1243 return false, rerr
1244 }
1245
1246 // Now check that we are either within the sorting order, or "last" was sent.
1247 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1248 return true, rerr
1249 }
1250 return false, rerr
1251}
1252
1253type msgResp struct {
1254 err error // If set, an error happened and fields below are not set.
1255 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1256 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1257 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1258 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1259}
1260
1261// viewRequestTx executes a request (query with filters, pagination) by
1262// launching a new goroutine with queryMessages, receiving results as msgResp,
1263// and sending Event* to the SSE connection.
1264//
1265// It always closes tx.
1266func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1267 defer func() {
1268 err := tx.Rollback()
1269 log.Check(err, "rolling back query transaction")
1270
1271 donec <- v.Request.ID
1272
1273 x := recover() // Should not happen, but don't take program down if it does.
1274 if x != nil {
1275 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1276 debug.PrintStack()
1277 metrics.PanicInc(metrics.Webmailrequest)
1278 }
1279 }()
1280
1281 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1282 var parsedMessage *ParsedMessage
1283 var viewEnd bool
1284
1285 var immediate bool // No waiting, flush immediate.
1286 t := time.NewTimer(300 * time.Millisecond)
1287 defer t.Stop()
1288
1289 sendViewMsgs := func(force bool) {
1290 if len(msgitems) == 0 && !force {
1291 return
1292 }
1293
1294 immediate = false
1295 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1296 msgitems = nil
1297 parsedMessage = nil
1298 t.Reset(300 * time.Millisecond)
1299 }
1300
1301 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1302
1303 mrc := make(chan msgResp, 1)
1304 go queryMessages(ctx, log, acc, tx, v, mrc)
1305
1306 for {
1307 select {
1308 case mr, ok := <-mrc:
1309 if !ok {
1310 sendViewMsgs(false)
1311 // Empty message list signals this query is done.
1312 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1313 return
1314 }
1315 if mr.err != nil {
1316 sendViewMsgs(false)
1317 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1318 return
1319 }
1320 if mr.reset {
1321 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1322 continue
1323 }
1324 if mr.viewEnd {
1325 viewEnd = true
1326 sendViewMsgs(true)
1327 return
1328 }
1329
1330 msgitems = append(msgitems, mr.mil)
1331 if mr.pm != nil {
1332 parsedMessage = mr.pm
1333 }
1334 if immediate {
1335 sendViewMsgs(true)
1336 }
1337
1338 case <-t.C:
1339 if len(msgitems) == 0 {
1340 // Nothing to send yet. We'll send immediately when the next message comes in.
1341 immediate = true
1342 } else {
1343 sendViewMsgs(false)
1344 }
1345 }
1346 }
1347}
1348
1349// queryMessages executes a query, with filter, pagination, destination message id
1350// to fetch (the message that the client had in view and wants to display again).
1351// It sends on msgc, with several types of messages: errors, whether the view is
1352// reset due to missing AnchorMessageID, and when the end of the view was reached
1353// and/or for a message.
1354func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1355 defer func() {
1356 x := recover() // Should not happen, but don't take program down if it does.
1357 if x != nil {
1358 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1359 debug.PrintStack()
1360 mrc <- msgResp{err: fmt.Errorf("query failed")}
1361 metrics.PanicInc(metrics.Webmailquery)
1362 }
1363
1364 close(mrc)
1365 }()
1366
1367 query := v.Request.Query
1368 page := v.Request.Page
1369
1370 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1371
1372 checkMessage := func(id int64) (valid bool, rerr error) {
1373 m := store.Message{ID: id}
1374 err := tx.Get(&m)
1375 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1376 return false, nil
1377 } else if err != nil {
1378 return false, err
1379 } else {
1380 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1381 return m, nil
1382 })
1383 }
1384 }
1385
1386 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1387 if page.AnchorMessageID > 0 {
1388 // Check if message exists and (still) matches the filter.
1389 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1390 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1391 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1392 return
1393 } else if !valid {
1394 mrc <- msgResp{reset: true}
1395 page.AnchorMessageID = 0
1396 }
1397 }
1398
1399 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1400 // it instead of continuing to send message till the end of the view.
1401 if page.DestMessageID > 0 {
1402 if valid, err := checkMessage(page.DestMessageID); err != nil {
1403 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1404 return
1405 } else if !valid {
1406 page.DestMessageID = 0
1407 }
1408 }
1409
1410 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1411
1412 q := bstore.QueryTx[store.Message](tx)
1413 q.FilterEqual("Expunged", false)
1414 if len(v.mailboxIDs) > 0 {
1415 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1416 // Should result in fast indexed query.
1417 for mbID := range v.mailboxIDs {
1418 q.FilterNonzero(store.Message{MailboxID: mbID})
1419 }
1420 } else {
1421 idsAny := make([]any, 0, len(v.mailboxIDs))
1422 for mbID := range v.mailboxIDs {
1423 idsAny = append(idsAny, mbID)
1424 }
1425 if v.matchMailboxIDs {
1426 q.FilterEqual("MailboxID", idsAny...)
1427 } else {
1428 q.FilterNotEqual("MailboxID", idsAny...)
1429 }
1430 }
1431 }
1432
1433 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1434 if page.AnchorMessageID > 0 {
1435 var seen = false
1436 q.FilterFn(func(m store.Message) bool {
1437 if seen {
1438 return true
1439 }
1440 seen = m.ID == page.AnchorMessageID
1441 return false
1442 })
1443 }
1444
1445 // We may be added filters the the query below. The FilterFn signature does not
1446 // implement reporting errors, or anything else, just a bool. So when making the
1447 // filter functions, we give them a place to store parsed message state, and an
1448 // error. We check the error during and after query execution.
1449 state := msgState{acc: acc}
1450 defer state.clear()
1451
1452 flagfilter := query.flagFilterFn()
1453 if flagfilter != nil {
1454 q.FilterFn(func(m store.Message) bool {
1455 return flagfilter(m.Flags, m.Keywords)
1456 })
1457 }
1458
1459 if query.Filter.Oldest != nil {
1460 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1461 }
1462 if query.Filter.Newest != nil {
1463 q.FilterLessEqual("Received", *query.Filter.Newest)
1464 }
1465
1466 if query.Filter.SizeMin > 0 {
1467 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1468 }
1469 if query.Filter.SizeMax > 0 {
1470 q.FilterLessEqual("Size", query.Filter.SizeMax)
1471 }
1472
1473 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1474 if attachmentFilter != nil {
1475 q.FilterFn(attachmentFilter)
1476 }
1477
1478 envFilter := query.envFilterFn(log, &state)
1479 if envFilter != nil {
1480 q.FilterFn(envFilter)
1481 }
1482
1483 headerFilter := query.headerFilterFn(log, &state)
1484 if headerFilter != nil {
1485 q.FilterFn(headerFilter)
1486 }
1487
1488 wordsFilter := query.wordsFilterFn(log, &state)
1489 if wordsFilter != nil {
1490 q.FilterFn(wordsFilter)
1491 }
1492
1493 var moreHeaders []string // From store.Settings.ShowHeaders
1494
1495 if query.OrderAsc {
1496 q.SortAsc("Received")
1497 } else {
1498 q.SortDesc("Received")
1499 }
1500 found := page.DestMessageID <= 0
1501 end := true
1502 have := 0
1503 err := q.ForEach(func(m store.Message) error {
1504 // Check for an error in one of the filters, propagate it.
1505 if state.err != nil {
1506 return state.err
1507 }
1508
1509 if have >= page.Count && found || have > 10000 {
1510 end = false
1511 return bstore.StopForEach
1512 }
1513
1514 if _, ok := v.threadIDs[m.ThreadID]; ok {
1515 // Message was already returned as part of a thread.
1516 return nil
1517 }
1518
1519 var pm *ParsedMessage
1520 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1521 // For threads, if there was no DestMessageID, we may be getting the newest
1522 // message. For an initial view, this isn't necessarily the first the user is
1523 // expected to read first, that would be the first unread, which we'll get below
1524 // when gathering the thread.
1525 found = true
1526 xpm, err := parsedMessage(log, m, &state, true, false, false)
1527 if err != nil && errors.Is(err, message.ErrHeader) {
1528 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1529 } else if err != nil {
1530 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1531 } else {
1532 pm = &xpm
1533 }
1534 }
1535
1536 var err error
1537 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1538 if err != nil {
1539 return fmt.Errorf("ensuring more headers: %v", err)
1540 }
1541
1542 mi, err := messageItem(log, m, &state, moreHeaders)
1543 if err != nil {
1544 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1545 }
1546 mil := []MessageItem{mi}
1547 if query.Threading != ThreadOff {
1548 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders)
1549 if err != nil {
1550 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1551 }
1552 if xpm != nil {
1553 pm = xpm
1554 found = true
1555 }
1556 mil = append(mil, more...)
1557 v.threadIDs[m.ThreadID] = struct{}{}
1558
1559 // Calculate how many messages the frontend is going to show, and only count those as returned.
1560 collapsed := map[int64]bool{}
1561 for _, mi := range mil {
1562 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1563 }
1564 unread := map[int64]bool{} // Propagated to thread root.
1565 if query.Threading == ThreadUnread {
1566 for _, mi := range mil {
1567 mm := mi.Message
1568 if mm.Seen {
1569 continue
1570 }
1571 unread[mm.ID] = true
1572 for _, id := range mm.ThreadParentIDs {
1573 unread[id] = true
1574 }
1575 }
1576 }
1577 for _, mi := range mil {
1578 mm := mi.Message
1579 threadRoot := true
1580 rootID := mm.ID
1581 for _, id := range mm.ThreadParentIDs {
1582 if _, ok := collapsed[id]; ok {
1583 threadRoot = false
1584 rootID = id
1585 }
1586 }
1587 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1588 have++
1589 }
1590 }
1591 } else {
1592 have++
1593 }
1594 if pm != nil && len(pm.envelope.From) == 1 {
1595 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1596 if err != nil {
1597 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1598 }
1599 }
1600 mrc <- msgResp{mil: mil, pm: pm}
1601 return nil
1602 })
1603 // Check for an error in one of the filters again. Check in ForEach would not
1604 // trigger if the last message has the error.
1605 if err == nil && state.err != nil {
1606 err = state.err
1607 }
1608 if err != nil {
1609 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1610 return
1611 }
1612 if end {
1613 mrc <- msgResp{viewEnd: true}
1614 }
1615}
1616
1617func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string) ([]MessageItem, *ParsedMessage, error) {
1618 if m.ThreadID == 0 {
1619 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1620 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1621 }
1622
1623 // Fetch other messages for this thread.
1624 qt := bstore.QueryTx[store.Message](tx)
1625 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1626 qt.FilterEqual("Expunged", false)
1627 qt.FilterNotEqual("ID", m.ID)
1628 qt.SortAsc("ID")
1629 tml, err := qt.List()
1630 if err != nil {
1631 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1632 }
1633
1634 var mil []MessageItem
1635 var pm *ParsedMessage
1636 var firstUnread bool
1637 for _, tm := range tml {
1638 err := func() error {
1639 xstate := msgState{acc: acc}
1640 defer xstate.clear()
1641
1642 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1643 if err != nil {
1644 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1645 }
1646 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1647 return tm, nil
1648 })
1649 if err != nil {
1650 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1651 }
1652 mil = append(mil, mi)
1653
1654 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1655 firstUnread = !tm.Seen
1656 xpm, err := parsedMessage(log, tm, &xstate, true, false, false)
1657 if err != nil && errors.Is(err, message.ErrHeader) {
1658 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1659 } else if err != nil {
1660 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1661 } else {
1662 pm = &xpm
1663 }
1664 }
1665 return nil
1666 }()
1667 if err != nil {
1668 return nil, nil, err
1669 }
1670 }
1671
1672 // Finally, the message that caused us to gather this thread (which is likely the
1673 // most recent message in the thread) could be the only unread message.
1674 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1675 xstate := msgState{acc: acc}
1676 defer xstate.clear()
1677 xpm, err := parsedMessage(log, m, &xstate, true, false, false)
1678 if err != nil && errors.Is(err, message.ErrHeader) {
1679 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1680 } else if err != nil {
1681 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1682 } else {
1683 pm = &xpm
1684 }
1685 }
1686
1687 return mil, pm, nil
1688}
1689
1690// While checking the filters on a message, we may need to get more message
1691// details as each filter passes. We check the filters that need the basic
1692// information first, and load and cache more details for the next filters.
1693// msgState holds parsed details for a message, it is updated while filtering,
1694// with more information or reset for a next message.
1695type msgState struct {
1696 acc *store.Account // Never changes during lifetime.
1697 err error // Once set, doesn't get cleared.
1698 m store.Message
1699 part *message.Part // Will be without Reader when msgr is nil.
1700 msgr *store.MsgReader
1701}
1702
1703func (ms *msgState) clear() {
1704 if ms.msgr != nil {
1705 ms.msgr.Close()
1706 ms.msgr = nil
1707 }
1708 *ms = msgState{acc: ms.acc, err: ms.err}
1709}
1710
1711func (ms *msgState) ensureMsg(m store.Message) {
1712 if m.ID != ms.m.ID {
1713 ms.clear()
1714 }
1715 ms.m = m
1716}
1717
1718func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1719 ms.ensureMsg(m)
1720
1721 if ms.err == nil {
1722 if ms.part == nil {
1723 if m.ParsedBuf == nil {
1724 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1725 return false
1726 }
1727 var p message.Part
1728 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1729 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1730 return false
1731 }
1732 ms.part = &p
1733 }
1734 if withMsgReader && ms.msgr == nil {
1735 ms.msgr = ms.acc.MessageReader(m)
1736 ms.part.SetReaderAt(ms.msgr)
1737 }
1738 }
1739 return ms.part != nil
1740}
1741
1742// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1743// filters for a query. A nil function is returned if there are no flags to filter
1744// on.
1745func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1746 labels := map[string]bool{}
1747 for _, k := range q.Filter.Labels {
1748 labels[k] = true
1749 }
1750 for _, k := range q.NotFilter.Labels {
1751 labels[k] = false
1752 }
1753
1754 if len(labels) == 0 {
1755 return nil
1756 }
1757
1758 var mask, flags store.Flags
1759 systemflags := map[string][]*bool{
1760 `\answered`: {&mask.Answered, &flags.Answered},
1761 `\flagged`: {&mask.Flagged, &flags.Flagged},
1762 `\deleted`: {&mask.Deleted, &flags.Deleted},
1763 `\seen`: {&mask.Seen, &flags.Seen},
1764 `\draft`: {&mask.Draft, &flags.Draft},
1765 `$junk`: {&mask.Junk, &flags.Junk},
1766 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1767 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1768 `$phishing`: {&mask.Phishing, &flags.Phishing},
1769 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1770 }
1771 keywords := map[string]bool{}
1772 for k, v := range labels {
1773 k = strings.ToLower(k)
1774 if mf, ok := systemflags[k]; ok {
1775 *mf[0] = true
1776 *mf[1] = v
1777 } else {
1778 keywords[k] = v
1779 }
1780 }
1781 return func(msgFlags store.Flags, msgKeywords []string) bool {
1782 var f store.Flags
1783 if f.Set(mask, msgFlags) != flags {
1784 return false
1785 }
1786 for k, v := range keywords {
1787 if slices.Contains(msgKeywords, k) != v {
1788 return false
1789 }
1790 }
1791 return true
1792 }
1793}
1794
1795// attachmentFilterFn returns a function that filters for the attachment-related
1796// filter from the query. A nil function is returned if there are attachment
1797// filters.
1798func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1799 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1800 return nil
1801 }
1802
1803 return func(m store.Message) bool {
1804 if !state.ensurePart(m, false) {
1805 return false
1806 }
1807 types, err := attachmentTypes(log, m, state)
1808 if err != nil {
1809 state.err = err
1810 return false
1811 }
1812 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1813 }
1814}
1815
1816var attachmentMimetypes = map[string]AttachmentType{
1817 "application/pdf": AttachmentPDF,
1818 "application/zip": AttachmentArchive,
1819 "application/x-rar-compressed": AttachmentArchive,
1820 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1821 "application/vnd.ms-excel": AttachmentSpreadsheet,
1822 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1823 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1824 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1825 "application/vnd.ms-powerpoint": AttachmentPresentation,
1826 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1827}
1828var attachmentExtensions = map[string]AttachmentType{
1829 ".pdf": AttachmentPDF,
1830 ".zip": AttachmentArchive,
1831 ".tar": AttachmentArchive,
1832 ".tgz": AttachmentArchive,
1833 ".tar.gz": AttachmentArchive,
1834 ".tbz2": AttachmentArchive,
1835 ".tar.bz2": AttachmentArchive,
1836 ".tar.lz": AttachmentArchive,
1837 ".tlz": AttachmentArchive,
1838 ".tar.xz": AttachmentArchive,
1839 ".txz": AttachmentArchive,
1840 ".tar.zst": AttachmentArchive,
1841 ".tar.lz4": AttachmentArchive,
1842 ".7z": AttachmentArchive,
1843 ".rar": AttachmentArchive,
1844 ".ods": AttachmentSpreadsheet,
1845 ".xls": AttachmentSpreadsheet,
1846 ".xlsx": AttachmentSpreadsheet,
1847 ".odt": AttachmentDocument,
1848 ".doc": AttachmentDocument,
1849 ".docx": AttachmentDocument,
1850 ".odp": AttachmentPresentation,
1851 ".ppt": AttachmentPresentation,
1852 ".pptx": AttachmentPresentation,
1853}
1854
1855func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1856 types := map[AttachmentType]bool{}
1857
1858 pm, err := parsedMessage(log, m, state, false, false, false)
1859 if err != nil {
1860 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1861 }
1862 for _, a := range pm.attachments {
1863 if a.Part.MediaType == "IMAGE" {
1864 types[AttachmentImage] = true
1865 continue
1866 }
1867 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1868 if t, ok := attachmentMimetypes[mt]; ok {
1869 types[t] = true
1870 continue
1871 }
1872 _, filename, err := a.Part.DispositionFilename()
1873 if err != nil && errors.Is(err, message.ErrParamEncoding) {
1874 log.Debugx("parsing disposition/filename", err)
1875 } else if err != nil {
1876 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1877 }
1878 if ext := filepath.Ext(filename); ext != "" {
1879 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1880 types[t] = true
1881 }
1882 }
1883 }
1884
1885 if len(types) == 0 {
1886 types[AttachmentNone] = true
1887 } else {
1888 types[AttachmentAny] = true
1889 }
1890 return types, nil
1891}
1892
1893// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1894// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1895// clash with SMTP envelope) for the query. A nil function is returned if no
1896// filtering is needed.
1897func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1898 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1899 return nil
1900 }
1901
1902 lower := func(l []string) []string {
1903 if len(l) == 0 {
1904 return nil
1905 }
1906 r := make([]string, len(l))
1907 for i, s := range l {
1908 r[i] = strings.ToLower(s)
1909 }
1910 return r
1911 }
1912
1913 filterSubject := lower(q.Filter.Subject)
1914 notFilterSubject := lower(q.NotFilter.Subject)
1915 filterFrom := lower(q.Filter.From)
1916 notFilterFrom := lower(q.NotFilter.From)
1917 filterTo := lower(q.Filter.To)
1918 notFilterTo := lower(q.NotFilter.To)
1919
1920 return func(m store.Message) bool {
1921 if !state.ensurePart(m, false) {
1922 return false
1923 }
1924
1925 var env message.Envelope
1926 if state.part.Envelope != nil {
1927 env = *state.part.Envelope
1928 }
1929
1930 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1931 subject := strings.ToLower(env.Subject)
1932 for _, s := range filterSubject {
1933 if !strings.Contains(subject, s) {
1934 return false
1935 }
1936 }
1937 for _, s := range notFilterSubject {
1938 if strings.Contains(subject, s) {
1939 return false
1940 }
1941 }
1942 }
1943
1944 contains := func(textLower []string, l []message.Address, all bool) bool {
1945 next:
1946 for _, s := range textLower {
1947 for _, a := range l {
1948 name := strings.ToLower(a.Name)
1949 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1950 if strings.Contains(name, s) || strings.Contains(addr, s) {
1951 if !all {
1952 return true
1953 }
1954 continue next
1955 }
1956 }
1957 if all {
1958 return false
1959 }
1960 }
1961 return all
1962 }
1963
1964 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1965 return false
1966 }
1967 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1968 return false
1969 }
1970 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1971 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1972 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1973 return false
1974 }
1975 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1976 return false
1977 }
1978 }
1979 return true
1980 }
1981}
1982
1983// headerFilterFn returns a function that filters for the header filters in the
1984// query. A nil function is returned if there are no header filters.
1985func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1986 if len(q.Filter.Headers) == 0 {
1987 return nil
1988 }
1989
1990 lowerValues := make([]string, len(q.Filter.Headers))
1991 for i, t := range q.Filter.Headers {
1992 lowerValues[i] = strings.ToLower(t[1])
1993 }
1994
1995 return func(m store.Message) bool {
1996 if !state.ensurePart(m, true) {
1997 return false
1998 }
1999 hdr, err := state.part.Header()
2000 if err != nil {
2001 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2002 return false
2003 }
2004
2005 next:
2006 for i, t := range q.Filter.Headers {
2007 k := t[0]
2008 v := lowerValues[i]
2009 l := hdr.Values(k)
2010 if v == "" && len(l) > 0 {
2011 continue
2012 }
2013 for _, e := range l {
2014 if strings.Contains(strings.ToLower(e), v) {
2015 continue next
2016 }
2017 }
2018 return false
2019 }
2020 return true
2021 }
2022}
2023
2024// wordFiltersFn returns a function that applies the word filters of the query. A
2025// nil function is returned when query does not contain a word filter.
2026func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2027 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2028 return nil
2029 }
2030
2031 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2032
2033 return func(m store.Message) bool {
2034 if !state.ensurePart(m, true) {
2035 return false
2036 }
2037
2038 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2039 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
2040 return false
2041 } else {
2042 return ok
2043 }
2044 }
2045}
2046