1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36)
37
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
40type Request struct {
41 ID int64
42
43 SSEID int64 // SSE connection.
44
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
47 ViewID int64
48
49 // If set, this request and its view are canceled. A new view must be started.
50 Cancel bool
51
52 Query Query
53 Page Page
54}
55
56type ThreadMode string
57
58const (
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
62)
63
64// Query is a request for messages that match filters, in a given order.
65type Query struct {
66 OrderAsc bool // Order by received ascending or desending.
67 Threading ThreadMode
68 Filter Filter
69 NotFilter NotFilter
70}
71
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
74
75const (
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
85)
86
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
89type Filter struct {
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
91 MailboxID int64
92
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
95
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
98 // MailboxID.
99 MailboxName string
100
101 Words []string // Case insensitive substring match for each string.
102 From []string
103 To []string // Including Cc and Bcc.
104 Oldest *time.Time
105 Newest *time.Time
106 Subject []string
107 Attachments AttachmentType
108 Labels []string
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
110 SizeMin int64
111 SizeMax int64
112}
113
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
116 Words []string
117 From []string
118 To []string
119 Subject []string
120 Attachments AttachmentType
121 Labels []string
122}
123
124// Page holds pagination parameters for a request.
125type Page struct {
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
129
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
131 // one request.
132 Count int
133
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
136 // message if found.
137 DestMessageID int64
138}
139
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143// included.
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
147 Domain dns.Domain
148}
149
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
154 Date time.Time
155 Subject string
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
159 To []MessageAddress
160 CC []MessageAddress
161 BCC []MessageAddress
162 InReplyTo string
163 MessageID string
164}
165
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
168// messages.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
173 IsSigned bool
174 IsEncrypted bool
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
178}
179
180// ParsedMessage has more parsed/derived information about a message, intended
181// for rendering the (contents of the) message. Information from MessageItem is
182// not duplicated.
183type ParsedMessage struct {
184 ID int64
185 Part message.Part
186 Headers map[string][]string
187 ViewMode store.ViewMode
188
189 Texts []string // Contents of text parts, can be empty.
190
191 // Whether there is an HTML part. The webclient renders HTML message parts through
192 // an iframe and a separate request with strict CSP headers to prevent script
193 // execution and loading of external resources, which isn't possible when loading
194 // in iframe with inline HTML because not all browsers support the iframe csp
195 // attribute.
196 HasHTML bool
197
198 ListReplyAddress *MessageAddress // From List-Post.
199
200 TextPaths [][]int // Paths to text parts.
201 HTMLPath []int // Path to HTML part.
202
203 // Information used by MessageItem, not exported in this type.
204 envelope MessageEnvelope
205 attachments []Attachment
206 isSigned bool
207 isEncrypted bool
208 firstLine string
209}
210
211// EventStart is the first message sent on an SSE connection, giving the client
212// basic data to populate its UI. After this event, messages will follow quickly in
213// an EventViewMsgs event.
214type EventStart struct {
215 SSEID int64
216 LoginAddress MessageAddress
217 Addresses []MessageAddress
218 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
219 MailboxName string
220 Mailboxes []store.Mailbox
221 RejectsMailbox string
222 Settings store.Settings
223 AccountPath string // If nonempty, the path on same host to webaccount interface.
224 Version string
225}
226
227// DomainAddressConfig has the address (localpart) configuration for a domain, so
228// the webmail client can decide if an address matches the addresses of the
229// account.
230type DomainAddressConfig struct {
231 LocalpartCatchallSeparator string // Can be empty.
232 LocalpartCaseSensitive bool
233}
234
235// EventViewMsgs contains messages for a view, possibly a continuation of an
236// earlier list of messages.
237type EventViewMsgs struct {
238 ViewID int64
239 RequestID int64
240
241 // If empty, this was the last message for the request. If non-empty, a list of
242 // thread messages. Each with the first message being the reason this thread is
243 // included and can be used as AnchorID in followup requests. If the threading mode
244 // is "off" in the query, there will always be only a single message. If a thread
245 // is sent, all messages in the thread are sent, including those that don't match
246 // the query (e.g. from another mailbox). Threads can be displayed based on the
247 // ThreadParentIDs field, with possibly slightly different display based on field
248 // ThreadMissingLink.
249 MessageItems [][]MessageItem
250
251 // If set, will match the target page.DestMessageID from the request.
252 ParsedMessage *ParsedMessage
253
254 // If set, there are no more messages in this view at this moment. Messages can be
255 // added, typically via Change messages, e.g. for new deliveries.
256 ViewEnd bool
257}
258
259// EventViewErr indicates an error during a query for messages. The request is
260// aborted, no more request-related messages will be sent until the next request.
261type EventViewErr struct {
262 ViewID int64
263 RequestID int64
264 Err string // To be displayed in client.
265 err error // Original message, for checking against context.Canceled.
266}
267
268// EventViewReset indicates that a request for the next set of messages in a few
269// could not be fulfilled, e.g. because the anchor message does not exist anymore.
270// The client should clear its list of messages. This can happen before
271// EventViewMsgs events are sent.
272type EventViewReset struct {
273 ViewID int64
274 RequestID int64
275}
276
277// EventViewChanges contain one or more changes relevant for the client, either
278// with new mailbox total/unseen message counts, or messages added/removed/modified
279// (flags) for the current view.
280type EventViewChanges struct {
281 ViewID int64
282 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
283}
284
285// ChangeMsgAdd adds a new message and possibly its thread to the view.
286type ChangeMsgAdd struct {
287 store.ChangeAddUID
288 MessageItems []MessageItem
289}
290
291// ChangeMsgRemove removes one or more messages from the view.
292type ChangeMsgRemove struct {
293 store.ChangeRemoveUIDs
294}
295
296// ChangeMsgFlags updates flags for one message.
297type ChangeMsgFlags struct {
298 store.ChangeFlags
299}
300
301// ChangeMsgThread updates muted/collapsed fields for one message.
302type ChangeMsgThread struct {
303 store.ChangeThread
304}
305
306// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
307type ChangeMailboxRemove struct {
308 store.ChangeRemoveMailbox
309}
310
311// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
312type ChangeMailboxAdd struct {
313 Mailbox store.Mailbox
314}
315
316// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
317// It could be under a new parent.
318type ChangeMailboxRename struct {
319 store.ChangeRenameMailbox
320}
321
322// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
323type ChangeMailboxCounts struct {
324 store.ChangeMailboxCounts
325}
326
327// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
328type ChangeMailboxSpecialUse struct {
329 store.ChangeMailboxSpecialUse
330}
331
332// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
333// a message was added with a keyword that wasn't in the mailbox yet.
334type ChangeMailboxKeywords struct {
335 store.ChangeMailboxKeywords
336}
337
338// View holds the information about the returned data for a query. It is used to
339// determine whether mailbox changes should be sent to the client, we only send
340// addition/removal/flag-changes of messages that are in view, or would extend it
341// if the view is at the end of the results.
342type view struct {
343 Request Request
344
345 // Received of last message we sent to the client. We use it to decide if a newly
346 // delivered message is within the view and the client should get a notification.
347 LastMessageReceived time.Time
348
349 // If set, the last message in the query view has been sent. There is no need to do
350 // another query, it will not return more data. Used to decide if an event for a
351 // new message should be sent.
352 End bool
353
354 // Whether message must or must not match mailboxIDs.
355 matchMailboxIDs bool
356 // Mailboxes to match, can be multiple, for matching children. If empty, there is
357 // no filter on mailboxes.
358 mailboxIDs map[int64]bool
359
360 // Threads sent to client. New messages for this thread are also sent, regardless
361 // of regular query matching, so also for other mailboxes. If the user (re)moved
362 // all messages of a thread, they may still receive events for the thread. Only
363 // filled when query with threading not off.
364 threadIDs map[int64]struct{}
365}
366
367// sses tracks all sse connections, and access to them.
368var sses = struct {
369 sync.Mutex
370 gen int64
371 m map[int64]sse
372}{m: map[int64]sse{}}
373
374// sse represents an sse connection.
375type sse struct {
376 ID int64 // Also returned in EventStart and used in Request to identify the request.
377 AccountName string // Used to check the authenticated user has access to the SSE connection.
378 Request chan Request // Goroutine will receive requests from here, coming from API calls.
379}
380
381// called by the goroutine when the connection is closed or breaks.
382func (sse sse) unregister() {
383 sses.Lock()
384 defer sses.Unlock()
385 delete(sses.m, sse.ID)
386
387 // Drain any pending requests, preventing blocked goroutines from API calls.
388 for {
389 select {
390 case <-sse.Request:
391 default:
392 return
393 }
394 }
395}
396
397func sseRegister(accountName string) sse {
398 sses.Lock()
399 defer sses.Unlock()
400 sses.gen++
401 v := sse{sses.gen, accountName, make(chan Request, 1)}
402 sses.m[v.ID] = v
403 return v
404}
405
406// sseGet returns a reference to an existing connection if it exists and user
407// has access.
408func sseGet(id int64, accountName string) (sse, bool) {
409 sses.Lock()
410 defer sses.Unlock()
411 s := sses.m[id]
412 if s.AccountName != accountName {
413 return sse{}, false
414 }
415 return s, true
416}
417
418// ssetoken is a temporary token that has not yet been used to start an SSE
419// connection. Created by Token, consumed by a new SSE connection.
420type ssetoken struct {
421 token string // Uniquely generated.
422 accName string
423 address string // Address used to authenticate in call that created the token.
424 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425 validUntil time.Time
426}
427
428// ssetokens maintains unused tokens. We have just one, but it's a type so we
429// can define methods.
430type ssetokens struct {
431 sync.Mutex
432 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
433 tokens map[string]ssetoken // Token to details, for finding account for a token.
434}
435
436var sseTokens = ssetokens{
437 accountTokens: map[string][]ssetoken{},
438 tokens: map[string]ssetoken{},
439}
440
441// xgenerate creates and saves a new token. It ensures no more than 10 tokens
442// per account exist, removing old ones if needed.
443func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
444 buf := make([]byte, 16)
445 _, err := cryptrand.Read(buf)
446 xcheckf(ctx, err, "generating token")
447 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
448
449 x.Lock()
450 defer x.Unlock()
451 n := len(x.accountTokens[accName])
452 if n >= 10 {
453 for _, ost := range x.accountTokens[accName][:n-9] {
454 delete(x.tokens, ost.token)
455 }
456 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
457 x.accountTokens[accName] = x.accountTokens[accName][:9]
458 }
459 x.accountTokens[accName] = append(x.accountTokens[accName], st)
460 x.tokens[st.token] = st
461 return st.token
462}
463
464// check verifies a token, and consumes it if valid.
465func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
466 x.Lock()
467 defer x.Unlock()
468
469 st, ok := x.tokens[token]
470 if !ok {
471 return "", "", "", false, nil
472 }
473 delete(x.tokens, token)
474 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
475 return "", "", "", false, errors.New("internal error, could not find token in account")
476 } else {
477 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
478 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
479 if len(x.accountTokens[st.accName]) == 0 {
480 delete(x.accountTokens, st.accName)
481 }
482 }
483 if time.Now().After(st.validUntil) {
484 return "", "", "", false, nil
485 }
486 return st.accName, st.address, st.sessionToken, true, nil
487}
488
489// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
490type ioErr struct {
491 err error
492}
493
494// ensure we have a non-nil moreHeaders, taking it from Settings.
495func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
496 if moreHeaders != nil {
497 return moreHeaders, nil
498 }
499
500 s := store.Settings{ID: 1}
501 if err := tx.Get(&s); err != nil {
502 return nil, fmt.Errorf("get settings: %v", err)
503 }
504 moreHeaders = s.ShowHeaders
505 if moreHeaders == nil {
506 moreHeaders = []string{} // Ensure we won't get Settings again next call.
507 }
508 return moreHeaders, nil
509}
510
511// serveEvents serves an SSE connection. Authentication is done through a query
512// string parameter "singleUseToken", a one-time-use token returned by the Token
513// API call.
514func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
515 if r.Method != "GET" {
516 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
517 return
518 }
519
520 flusher, ok := w.(http.Flusher)
521 if !ok {
522 log.Error("internal error: ResponseWriter not a http.Flusher")
523 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
524 return
525 }
526
527 q := r.URL.Query()
528 token := q.Get("singleUseToken")
529 if token == "" {
530 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
531 return
532 }
533 accName, address, sessionToken, ok, err := sseTokens.check(token)
534 if err != nil {
535 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
536 return
537 }
538 if !ok {
539 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
540 return
541 }
542 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
543 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
544 return
545 }
546
547 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
548 // incoming responses with its slow-network similation.
549 var waitMin, waitMax time.Duration
550 waitMinMsec := q.Get("waitMinMsec")
551 waitMaxMsec := q.Get("waitMaxMsec")
552 if waitMinMsec != "" && waitMaxMsec != "" {
553 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
554 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
555 return
556 } else {
557 waitMin = time.Duration(v) * time.Millisecond
558 }
559
560 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
561 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
562 return
563 } else {
564 waitMax = time.Duration(v) * time.Millisecond
565 }
566 }
567
568 // Parse the request with initial mailbox/search criteria.
569 var req Request
570 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
571 dec.DisallowUnknownFields()
572 if err := dec.Decode(&req); err != nil {
573 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
574 return
575 } else if req.Page.Count <= 0 {
576 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
577 return
578 }
579 if req.Query.Threading == "" {
580 req.Query.Threading = ThreadOff
581 }
582
583 var writer *eventWriter
584
585 metricSSEConnections.Inc()
586 defer metricSSEConnections.Dec()
587
588 // Below here, error handling cause through xcheckf, which panics with
589 // *sherpa.Error, after which we send an error event to the client. We can also get
590 // an *ioErr when the connection is broken.
591 defer func() {
592 x := recover()
593 if x == nil {
594 return
595 }
596 if err, ok := x.(*sherpa.Error); ok {
597 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
598 } else if _, ok := x.(ioErr); ok {
599 return
600 } else {
601 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
602 debug.PrintStack()
603 metrics.PanicInc(metrics.Webmail)
604 panic(x)
605 }
606 }()
607
608 h := w.Header()
609 h.Set("Content-Type", "text/event-stream")
610 h.Set("Cache-Control", "no-cache")
611
612 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
613 // keys), so should be quite compressible.
614 var out writeFlusher
615 gz := mox.AcceptsGzip(r)
616 if gz {
617 h.Set("Content-Encoding", "gzip")
618 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
619 } else {
620 out = nopFlusher{w}
621 }
622 out = httpFlusher{out, flusher}
623
624 // We'll be writing outgoing SSE events through writer.
625 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
626 defer writer.close()
627
628 // Fetch initial data.
629 acc, err := store.OpenAccount(log, accName, true)
630 xcheckf(ctx, err, "open account")
631 defer func() {
632 err := acc.Close()
633 log.Check(err, "closing account")
634 }()
635 comm := store.RegisterComm(acc)
636 defer comm.Unregister()
637
638 // List addresses that the client can use to send email from.
639 accConf, _ := acc.Conf()
640 loginAddr, err := smtp.ParseAddress(address)
641 xcheckf(ctx, err, "parsing login address")
642 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
643 xcheckf(ctx, err, "looking up destination for login address")
644 loginName := accConf.FullName
645 if dest.FullName != "" {
646 loginName = dest.FullName
647 }
648 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
649 var addresses []MessageAddress
650 for a, dest := range accConf.Destinations {
651 name := dest.FullName
652 if name == "" {
653 name = accConf.FullName
654 }
655 var ma MessageAddress
656 if strings.HasPrefix(a, "@") {
657 dom, err := dns.ParseDomain(a[1:])
658 xcheckf(ctx, err, "parsing destination address for account")
659 ma = MessageAddress{Domain: dom}
660 } else {
661 addr, err := smtp.ParseAddress(a)
662 xcheckf(ctx, err, "parsing destination address for account")
663 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
664 }
665 addresses = append(addresses, ma)
666 }
667 // User is allowed to send using alias address as message From address. Webmail
668 // will choose it when replying to a message sent to that address.
669 aliasAddrs := map[MessageAddress]bool{}
670 for _, a := range accConf.Aliases {
671 if a.Alias.AllowMsgFrom {
672 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
673 if !aliasAddrs[ma] {
674 addresses = append(addresses, ma)
675 }
676 aliasAddrs[ma] = true
677 }
678 }
679
680 // We implicitly start a query. We use the reqctx for the transaction, because the
681 // transaction is passed to the query, which can be canceled.
682 reqctx, reqctxcancel := context.WithCancel(ctx)
683 defer func() {
684 // We also cancel in cancelDrain later on, but there is a brief window where the
685 // context wouldn't be canceled.
686 if reqctxcancel != nil {
687 reqctxcancel()
688 reqctxcancel = nil
689 }
690 }()
691
692 // qtx is kept around during connection initialization, until we pass it off to the
693 // goroutine that starts querying for messages.
694 var qtx *bstore.Tx
695 defer func() {
696 if qtx != nil {
697 err := qtx.Rollback()
698 log.Check(err, "rolling back")
699 }
700 }()
701
702 var mbl []store.Mailbox
703 settings := store.Settings{ID: 1}
704
705 // We only take the rlock when getting the tx.
706 acc.WithRLock(func() {
707 // Now a read-only transaction we'll use during the query.
708 qtx, err = acc.DB.Begin(reqctx, false)
709 xcheckf(ctx, err, "begin transaction")
710
711 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
712 xcheckf(ctx, err, "list mailboxes")
713
714 err = qtx.Get(&settings)
715 xcheckf(ctx, err, "get settings")
716 })
717
718 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
719 var zerofilter Filter
720 var zeronotfilter NotFilter
721 var mailbox store.Mailbox
722 var mailboxPrefixes []string
723 var matchMailboxes bool
724 mailboxIDs := map[int64]bool{}
725 mailboxName := req.Query.Filter.MailboxName
726 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
727 if mailboxName == "" {
728 mailboxName = "Inbox"
729 }
730
731 var inbox store.Mailbox
732 for _, e := range mbl {
733 if e.Name == mailboxName {
734 mailbox = e
735 }
736 if e.Name == "Inbox" {
737 inbox = e
738 }
739 }
740 if mailbox.ID == 0 {
741 mailbox = inbox
742 }
743 if mailbox.ID == 0 {
744 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
745 }
746 req.Query.Filter.MailboxID = mailbox.ID
747 req.Query.Filter.MailboxName = ""
748 mailboxPrefixes = []string{mailbox.Name + "/"}
749 matchMailboxes = true
750 mailboxIDs[mailbox.ID] = true
751 } else {
752 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
753 }
754 if req.Query.Filter.MailboxChildrenIncluded {
755 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
756 }
757
758 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
759
760 sse := sseRegister(acc.Name)
761 defer sse.unregister()
762
763 // Per-domain localpart config so webclient can decide if an address belongs to the account.
764 domainAddressConfigs := map[string]DomainAddressConfig{}
765 for _, a := range addresses {
766 dom, _ := mox.Conf.Domain(a.Domain)
767 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
768 }
769
770 // Write first event, allowing client to fill its UI with mailboxes.
771 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
772 writer.xsendEvent(ctx, log, "start", start)
773
774 // The goroutine doing the querying will send messages on these channels, which
775 // result in an event being written on the SSE connection.
776 viewMsgsc := make(chan EventViewMsgs)
777 viewErrc := make(chan EventViewErr)
778 viewResetc := make(chan EventViewReset)
779 donec := make(chan int64) // When request is done.
780
781 // Start a view, it determines if we send a change to the client. And start an
782 // implicit query for messages, we'll send the messages to the client which can
783 // fill its ui with messages.
784 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
785 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
786 qtx = nil // viewRequestTx closes qtx
787
788 // When canceling a query, we must drain its messages until it says it is done.
789 // Otherwise the sending goroutine would hang indefinitely on a channel send.
790 cancelDrain := func() {
791 if reqctxcancel != nil {
792 // Cancel the goroutine doing the querying.
793 reqctxcancel()
794 reqctx = nil
795 reqctxcancel = nil
796 } else {
797 return
798 }
799
800 // Drain events until done.
801 for {
802 select {
803 case <-viewMsgsc:
804 case <-viewErrc:
805 case <-viewResetc:
806 case <-donec:
807 return
808 }
809 }
810 }
811
812 // If we stop and a query is in progress, we must drain the channel it will send on.
813 defer cancelDrain()
814
815 // Changes broadcasted by other connections on this account. If applicable for the
816 // connection/view, we send events.
817 xprocessChanges := func(changes []store.Change) {
818 taggedChanges := [][2]any{}
819
820 // We get a transaction first time we need it.
821 var xtx *bstore.Tx
822 defer func() {
823 if xtx != nil {
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
826 }
827 }()
828 ensureTx := func() error {
829 if xtx != nil {
830 return nil
831 }
832 acc.RLock()
833 defer acc.RUnlock()
834 var err error
835 xtx, err = acc.DB.Begin(ctx, false)
836 return err
837 }
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
843 }
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
845 }
846
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
850 err := ensureTx()
851 xcheckf(ctx, err, "transaction")
852
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
855 return moreHeaders
856 }
857
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
863 uidsAny[i] = uid
864 }
865 err := ensureTx()
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
875 }
876 return nil
877 })
878 xcheckf(ctx, err, "fetching messages for change")
879 return changedUIDs
880 }
881
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
891 if !ok && !thread {
892 continue
893 }
894
895 state := msgState{acc: acc}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
897 state.clear()
898 xcheckf(ctx, err, "make messageitem")
899 mi.MatchQuery = ok
900
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
903 err := ensureTx()
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders())
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
909 }
910
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
912
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
916 }
917
918 case store.ChangeRemoveUIDs:
919 // We may send changes for uids the client doesn't know, that's fine.
920 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
921 if len(changedUIDs) == 0 {
922 continue
923 }
924 ch := ChangeMsgRemove{c}
925 ch.UIDs = changedUIDs
926 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
927
928 case store.ChangeFlags:
929 // We may send changes for uids the client doesn't know, that's fine.
930 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
931 if len(changedUIDs) == 0 {
932 continue
933 }
934 ch := ChangeMsgFlags{c}
935 ch.UID = changedUIDs[0]
936 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
937
938 case store.ChangeThread:
939 // Change in muted/collaped state, just always ship it.
940 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
941
942 case store.ChangeRemoveMailbox:
943 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
944
945 case store.ChangeAddMailbox:
946 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
947
948 case store.ChangeRenameMailbox:
949 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
950
951 case store.ChangeMailboxCounts:
952 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
953
954 case store.ChangeMailboxSpecialUse:
955 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
956
957 case store.ChangeMailboxKeywords:
958 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
959
960 case store.ChangeAddSubscription:
961 // Webmail does not care about subscriptions.
962
963 default:
964 panic(fmt.Sprintf("missing case for change %T", c))
965 }
966 }
967
968 if len(taggedChanges) > 0 {
969 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
970 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
971 }
972 }
973
974 timer := time.NewTimer(5 * time.Minute) // For keepalives.
975 defer timer.Stop()
976 for {
977 if writer.wrote {
978 timer.Reset(5 * time.Minute)
979 writer.wrote = false
980 }
981
982 pending := comm.Pending
983 if reqctx != nil {
984 pending = nil
985 }
986
987 select {
988 case <-mox.Shutdown.Done():
989 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
990 // Work around go vet, it doesn't see defer cancelDrain.
991 if reqctxcancel != nil {
992 reqctxcancel()
993 }
994 return
995
996 case <-timer.C:
997 _, err := fmt.Fprintf(out, ": keepalive\n\n")
998 if err != nil {
999 log.Errorx("write keepalive", err)
1000 // Work around go vet, it doesn't see defer cancelDrain.
1001 if reqctxcancel != nil {
1002 reqctxcancel()
1003 }
1004 return
1005 }
1006 out.Flush()
1007 writer.wrote = true
1008
1009 case vm := <-viewMsgsc:
1010 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1011 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1012 }
1013 if vm.ViewEnd {
1014 v.End = true
1015 }
1016 if len(vm.MessageItems) > 0 {
1017 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1018 }
1019 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1020
1021 case ve := <-viewErrc:
1022 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1023 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1024 }
1025 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
1026 // Work around go vet, it doesn't see defer cancelDrain.
1027 if reqctxcancel != nil {
1028 reqctxcancel()
1029 }
1030 return
1031 }
1032 writer.xsendEvent(ctx, log, "viewErr", ve)
1033
1034 case vr := <-viewResetc:
1035 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1036 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1037 }
1038 writer.xsendEvent(ctx, log, "viewReset", vr)
1039
1040 case id := <-donec:
1041 if id != v.Request.ID {
1042 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1043 }
1044 if reqctxcancel != nil {
1045 reqctxcancel()
1046 }
1047 reqctx = nil
1048 reqctxcancel = nil
1049
1050 case req := <-sse.Request:
1051 if reqctx != nil {
1052 cancelDrain()
1053 }
1054 if req.Cancel {
1055 v = view{req, time.Time{}, false, false, nil, nil}
1056 continue
1057 }
1058
1059 reqctx, reqctxcancel = context.WithCancel(ctx)
1060
1061 stop := func() (stop bool) {
1062 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1063 var rtx *bstore.Tx
1064 var err error
1065 defer func() {
1066 if rtx != nil {
1067 err = rtx.Rollback()
1068 log.Check(err, "rolling back transaction")
1069 }
1070 }()
1071 acc.WithRLock(func() {
1072 rtx, err = acc.DB.Begin(reqctx, false)
1073 })
1074 if err != nil {
1075 reqctxcancel()
1076 reqctx = nil
1077 reqctxcancel = nil
1078
1079 if errors.Is(err, context.Canceled) {
1080 return true
1081 }
1082 err := fmt.Errorf("begin transaction: %v", err)
1083 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1084 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1085 return false
1086 }
1087
1088 // Reset view state for new query.
1089 if req.ViewID != v.Request.ViewID {
1090 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1091 if req.Query.Filter.MailboxChildrenIncluded {
1092 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1093 }
1094 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1095 } else {
1096 v.Request = req
1097 }
1098 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1099 rtx = nil
1100 return false
1101 }()
1102 if stop {
1103 return
1104 }
1105
1106 case <-pending:
1107 xprocessChanges(comm.Get())
1108
1109 case <-ctx.Done():
1110 // Work around go vet, it doesn't see defer cancelDrain.
1111 if reqctxcancel != nil {
1112 reqctxcancel()
1113 }
1114 return
1115 }
1116 }
1117}
1118
1119// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1120// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1121// mailboxIDs must or must not match. mailboxPrefixes is for use with
1122// xgatherMailboxIDs to gather children of the mailboxIDs.
1123func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1124 matchMailboxes = true
1125 mailboxIDs = map[int64]bool{}
1126 if f.MailboxID == -1 {
1127 matchMailboxes = false
1128 // Add the trash, junk and account rejects mailbox.
1129 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1130 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1131 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1132 mailboxIDs[mb.ID] = true
1133 }
1134 return nil
1135 })
1136 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1137 } else if f.MailboxID > 0 {
1138 mb := store.Mailbox{ID: f.MailboxID}
1139 err := tx.Get(&mb)
1140 xcheckf(ctx, err, "get mailbox")
1141 mailboxIDs[f.MailboxID] = true
1142 mailboxPrefixes = []string{mb.Name + "/"}
1143 }
1144 return
1145}
1146
1147// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1148// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1149func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1150 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1151 if len(mailboxPrefixes) == 0 {
1152 return
1153 }
1154 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1155 for _, p := range mailboxPrefixes {
1156 if strings.HasPrefix(mb.Name, p) {
1157 mailboxIDs[mb.ID] = true
1158 break
1159 }
1160 }
1161 return nil
1162 })
1163 xcheckf(ctx, err, "gathering mailboxes")
1164}
1165
1166// matchesMailbox returns whether a mailbox matches the view.
1167func (v view) matchesMailbox(mailboxID int64) bool {
1168 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1169}
1170
1171// inRange returns whether m is within the range for the view, whether a change for
1172// this message should be sent to the client so it can update its state.
1173func (v view) inRange(m store.Message) bool {
1174 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1175}
1176
1177// matches checks if the message, identified by either messageID or mailboxID+UID,
1178// is in the current "view" (i.e. passing the filters, and if checkRange is set
1179// also if within the range of sent messages based on sort order and the last seen
1180// message). getmsg retrieves the message, which may be necessary depending on the
1181// active filters. Used to determine if a store.Change with a new message should be
1182// sent, and for the destination and anchor messages in view requests.
1183func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1184 var m store.Message
1185 ensureMessage := func() bool {
1186 if m.ID == 0 && rerr == nil {
1187 m, rerr = getmsg(messageID, mailboxID, uid)
1188 }
1189 return rerr == nil
1190 }
1191
1192 q := v.Request.Query
1193
1194 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1195
1196 // Check filters.
1197 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1198 return false, rerr
1199 }
1200 // note: anchorMessageID is not relevant for matching.
1201 flagfilter := q.flagFilterFn()
1202 if flagfilter != nil && !flagfilter(flags, keywords) {
1203 return false, rerr
1204 }
1205
1206 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1207 return false, rerr
1208 }
1209 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1210 return false, rerr
1211 }
1212
1213 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1214 return false, rerr
1215 }
1216 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1217 return false, rerr
1218 }
1219
1220 state := msgState{acc: acc}
1221 defer func() {
1222 if rerr == nil && state.err != nil {
1223 rerr = state.err
1224 }
1225 state.clear()
1226 }()
1227
1228 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1229 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1230 return false, rerr
1231 }
1232
1233 envFilter := q.envFilterFn(log, &state)
1234 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1235 return false, rerr
1236 }
1237
1238 headerFilter := q.headerFilterFn(log, &state)
1239 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1240 return false, rerr
1241 }
1242
1243 wordsFilter := q.wordsFilterFn(log, &state)
1244 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1245 return false, rerr
1246 }
1247
1248 // Now check that we are either within the sorting order, or "last" was sent.
1249 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1250 return true, rerr
1251 }
1252 return false, rerr
1253}
1254
1255type msgResp struct {
1256 err error // If set, an error happened and fields below are not set.
1257 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1258 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1259 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1260 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1261}
1262
1263// viewRequestTx executes a request (query with filters, pagination) by
1264// launching a new goroutine with queryMessages, receiving results as msgResp,
1265// and sending Event* to the SSE connection.
1266//
1267// It always closes tx.
1268func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1269 defer func() {
1270 err := tx.Rollback()
1271 log.Check(err, "rolling back query transaction")
1272
1273 donec <- v.Request.ID
1274
1275 x := recover() // Should not happen, but don't take program down if it does.
1276 if x != nil {
1277 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1278 debug.PrintStack()
1279 metrics.PanicInc(metrics.Webmailrequest)
1280 }
1281 }()
1282
1283 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1284 var parsedMessage *ParsedMessage
1285 var viewEnd bool
1286
1287 var immediate bool // No waiting, flush immediate.
1288 t := time.NewTimer(300 * time.Millisecond)
1289 defer t.Stop()
1290
1291 sendViewMsgs := func(force bool) {
1292 if len(msgitems) == 0 && !force {
1293 return
1294 }
1295
1296 immediate = false
1297 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1298 msgitems = nil
1299 parsedMessage = nil
1300 t.Reset(300 * time.Millisecond)
1301 }
1302
1303 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1304
1305 mrc := make(chan msgResp, 1)
1306 go queryMessages(ctx, log, acc, tx, v, mrc)
1307
1308 for {
1309 select {
1310 case mr, ok := <-mrc:
1311 if !ok {
1312 sendViewMsgs(false)
1313 // Empty message list signals this query is done.
1314 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1315 return
1316 }
1317 if mr.err != nil {
1318 sendViewMsgs(false)
1319 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1320 return
1321 }
1322 if mr.reset {
1323 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1324 continue
1325 }
1326 if mr.viewEnd {
1327 viewEnd = true
1328 sendViewMsgs(true)
1329 return
1330 }
1331
1332 msgitems = append(msgitems, mr.mil)
1333 if mr.pm != nil {
1334 parsedMessage = mr.pm
1335 }
1336 if immediate {
1337 sendViewMsgs(true)
1338 }
1339
1340 case <-t.C:
1341 if len(msgitems) == 0 {
1342 // Nothing to send yet. We'll send immediately when the next message comes in.
1343 immediate = true
1344 } else {
1345 sendViewMsgs(false)
1346 }
1347 }
1348 }
1349}
1350
1351// queryMessages executes a query, with filter, pagination, destination message id
1352// to fetch (the message that the client had in view and wants to display again).
1353// It sends on msgc, with several types of messages: errors, whether the view is
1354// reset due to missing AnchorMessageID, and when the end of the view was reached
1355// and/or for a message.
1356func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1357 defer func() {
1358 x := recover() // Should not happen, but don't take program down if it does.
1359 if x != nil {
1360 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1361 debug.PrintStack()
1362 mrc <- msgResp{err: fmt.Errorf("query failed")}
1363 metrics.PanicInc(metrics.Webmailquery)
1364 }
1365
1366 close(mrc)
1367 }()
1368
1369 query := v.Request.Query
1370 page := v.Request.Page
1371
1372 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1373
1374 checkMessage := func(id int64) (valid bool, rerr error) {
1375 m := store.Message{ID: id}
1376 err := tx.Get(&m)
1377 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1378 return false, nil
1379 } else if err != nil {
1380 return false, err
1381 } else {
1382 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1383 return m, nil
1384 })
1385 }
1386 }
1387
1388 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1389 if page.AnchorMessageID > 0 {
1390 // Check if message exists and (still) matches the filter.
1391 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1392 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1393 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1394 return
1395 } else if !valid {
1396 mrc <- msgResp{reset: true}
1397 page.AnchorMessageID = 0
1398 }
1399 }
1400
1401 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1402 // it instead of continuing to send message till the end of the view.
1403 if page.DestMessageID > 0 {
1404 if valid, err := checkMessage(page.DestMessageID); err != nil {
1405 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1406 return
1407 } else if !valid {
1408 page.DestMessageID = 0
1409 }
1410 }
1411
1412 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1413
1414 q := bstore.QueryTx[store.Message](tx)
1415 q.FilterEqual("Expunged", false)
1416 if len(v.mailboxIDs) > 0 {
1417 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1418 // Should result in fast indexed query.
1419 for mbID := range v.mailboxIDs {
1420 q.FilterNonzero(store.Message{MailboxID: mbID})
1421 }
1422 } else {
1423 idsAny := make([]any, 0, len(v.mailboxIDs))
1424 for mbID := range v.mailboxIDs {
1425 idsAny = append(idsAny, mbID)
1426 }
1427 if v.matchMailboxIDs {
1428 q.FilterEqual("MailboxID", idsAny...)
1429 } else {
1430 q.FilterNotEqual("MailboxID", idsAny...)
1431 }
1432 }
1433 }
1434
1435 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1436 if page.AnchorMessageID > 0 {
1437 var seen = false
1438 q.FilterFn(func(m store.Message) bool {
1439 if seen {
1440 return true
1441 }
1442 seen = m.ID == page.AnchorMessageID
1443 return false
1444 })
1445 }
1446
1447 // We may be added filters the the query below. The FilterFn signature does not
1448 // implement reporting errors, or anything else, just a bool. So when making the
1449 // filter functions, we give them a place to store parsed message state, and an
1450 // error. We check the error during and after query execution.
1451 state := msgState{acc: acc}
1452 defer state.clear()
1453
1454 flagfilter := query.flagFilterFn()
1455 if flagfilter != nil {
1456 q.FilterFn(func(m store.Message) bool {
1457 return flagfilter(m.Flags, m.Keywords)
1458 })
1459 }
1460
1461 if query.Filter.Oldest != nil {
1462 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1463 }
1464 if query.Filter.Newest != nil {
1465 q.FilterLessEqual("Received", *query.Filter.Newest)
1466 }
1467
1468 if query.Filter.SizeMin > 0 {
1469 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1470 }
1471 if query.Filter.SizeMax > 0 {
1472 q.FilterLessEqual("Size", query.Filter.SizeMax)
1473 }
1474
1475 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1476 if attachmentFilter != nil {
1477 q.FilterFn(attachmentFilter)
1478 }
1479
1480 envFilter := query.envFilterFn(log, &state)
1481 if envFilter != nil {
1482 q.FilterFn(envFilter)
1483 }
1484
1485 headerFilter := query.headerFilterFn(log, &state)
1486 if headerFilter != nil {
1487 q.FilterFn(headerFilter)
1488 }
1489
1490 wordsFilter := query.wordsFilterFn(log, &state)
1491 if wordsFilter != nil {
1492 q.FilterFn(wordsFilter)
1493 }
1494
1495 var moreHeaders []string // From store.Settings.ShowHeaders
1496
1497 if query.OrderAsc {
1498 q.SortAsc("Received")
1499 } else {
1500 q.SortDesc("Received")
1501 }
1502 found := page.DestMessageID <= 0
1503 end := true
1504 have := 0
1505 err := q.ForEach(func(m store.Message) error {
1506 // Check for an error in one of the filters, propagate it.
1507 if state.err != nil {
1508 return state.err
1509 }
1510
1511 if have >= page.Count && found || have > 10000 {
1512 end = false
1513 return bstore.StopForEach
1514 }
1515
1516 if _, ok := v.threadIDs[m.ThreadID]; ok {
1517 // Message was already returned as part of a thread.
1518 return nil
1519 }
1520
1521 var pm *ParsedMessage
1522 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1523 // For threads, if there was no DestMessageID, we may be getting the newest
1524 // message. For an initial view, this isn't necessarily the first the user is
1525 // expected to read first, that would be the first unread, which we'll get below
1526 // when gathering the thread.
1527 found = true
1528 xpm, err := parsedMessage(log, m, &state, true, false, false)
1529 if err != nil && errors.Is(err, message.ErrHeader) {
1530 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1531 } else if err != nil {
1532 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1533 } else {
1534 pm = &xpm
1535 }
1536 }
1537
1538 var err error
1539 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1540 if err != nil {
1541 return fmt.Errorf("ensuring more headers: %v", err)
1542 }
1543
1544 mi, err := messageItem(log, m, &state, moreHeaders)
1545 if err != nil {
1546 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1547 }
1548 mil := []MessageItem{mi}
1549 if query.Threading != ThreadOff {
1550 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders)
1551 if err != nil {
1552 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1553 }
1554 if xpm != nil {
1555 pm = xpm
1556 found = true
1557 }
1558 mil = append(mil, more...)
1559 v.threadIDs[m.ThreadID] = struct{}{}
1560
1561 // Calculate how many messages the frontend is going to show, and only count those as returned.
1562 collapsed := map[int64]bool{}
1563 for _, mi := range mil {
1564 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1565 }
1566 unread := map[int64]bool{} // Propagated to thread root.
1567 if query.Threading == ThreadUnread {
1568 for _, mi := range mil {
1569 mm := mi.Message
1570 if mm.Seen {
1571 continue
1572 }
1573 unread[mm.ID] = true
1574 for _, id := range mm.ThreadParentIDs {
1575 unread[id] = true
1576 }
1577 }
1578 }
1579 for _, mi := range mil {
1580 mm := mi.Message
1581 threadRoot := true
1582 rootID := mm.ID
1583 for _, id := range mm.ThreadParentIDs {
1584 if _, ok := collapsed[id]; ok {
1585 threadRoot = false
1586 rootID = id
1587 }
1588 }
1589 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1590 have++
1591 }
1592 }
1593 } else {
1594 have++
1595 }
1596 if pm != nil && len(pm.envelope.From) == 1 {
1597 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1598 if err != nil {
1599 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1600 }
1601 }
1602 mrc <- msgResp{mil: mil, pm: pm}
1603 return nil
1604 })
1605 // Check for an error in one of the filters again. Check in ForEach would not
1606 // trigger if the last message has the error.
1607 if err == nil && state.err != nil {
1608 err = state.err
1609 }
1610 if err != nil {
1611 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1612 return
1613 }
1614 if end {
1615 mrc <- msgResp{viewEnd: true}
1616 }
1617}
1618
1619func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string) ([]MessageItem, *ParsedMessage, error) {
1620 if m.ThreadID == 0 {
1621 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1622 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1623 }
1624
1625 // Fetch other messages for this thread.
1626 qt := bstore.QueryTx[store.Message](tx)
1627 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1628 qt.FilterEqual("Expunged", false)
1629 qt.FilterNotEqual("ID", m.ID)
1630 qt.SortAsc("ID")
1631 tml, err := qt.List()
1632 if err != nil {
1633 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1634 }
1635
1636 var mil []MessageItem
1637 var pm *ParsedMessage
1638 var firstUnread bool
1639 for _, tm := range tml {
1640 err := func() error {
1641 xstate := msgState{acc: acc}
1642 defer xstate.clear()
1643
1644 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1645 if err != nil {
1646 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1647 }
1648 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1649 return tm, nil
1650 })
1651 if err != nil {
1652 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1653 }
1654 mil = append(mil, mi)
1655
1656 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1657 firstUnread = !tm.Seen
1658 xpm, err := parsedMessage(log, tm, &xstate, true, false, false)
1659 if err != nil && errors.Is(err, message.ErrHeader) {
1660 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1661 } else if err != nil {
1662 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1663 } else {
1664 pm = &xpm
1665 }
1666 }
1667 return nil
1668 }()
1669 if err != nil {
1670 return nil, nil, err
1671 }
1672 }
1673
1674 // Finally, the message that caused us to gather this thread (which is likely the
1675 // most recent message in the thread) could be the only unread message.
1676 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1677 xstate := msgState{acc: acc}
1678 defer xstate.clear()
1679 xpm, err := parsedMessage(log, m, &xstate, true, false, false)
1680 if err != nil && errors.Is(err, message.ErrHeader) {
1681 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1682 } else if err != nil {
1683 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1684 } else {
1685 pm = &xpm
1686 }
1687 }
1688
1689 return mil, pm, nil
1690}
1691
1692// While checking the filters on a message, we may need to get more message
1693// details as each filter passes. We check the filters that need the basic
1694// information first, and load and cache more details for the next filters.
1695// msgState holds parsed details for a message, it is updated while filtering,
1696// with more information or reset for a next message.
1697type msgState struct {
1698 acc *store.Account // Never changes during lifetime.
1699 err error // Once set, doesn't get cleared.
1700 m store.Message
1701 part *message.Part // Will be without Reader when msgr is nil.
1702 msgr *store.MsgReader
1703}
1704
1705func (ms *msgState) clear() {
1706 if ms.msgr != nil {
1707 ms.msgr.Close()
1708 ms.msgr = nil
1709 }
1710 *ms = msgState{acc: ms.acc, err: ms.err}
1711}
1712
1713func (ms *msgState) ensureMsg(m store.Message) {
1714 if m.ID != ms.m.ID {
1715 ms.clear()
1716 }
1717 ms.m = m
1718}
1719
1720func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1721 ms.ensureMsg(m)
1722
1723 if ms.err == nil {
1724 if ms.part == nil {
1725 if m.ParsedBuf == nil {
1726 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1727 return false
1728 }
1729 var p message.Part
1730 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1731 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1732 return false
1733 }
1734 ms.part = &p
1735 }
1736 if withMsgReader && ms.msgr == nil {
1737 ms.msgr = ms.acc.MessageReader(m)
1738 ms.part.SetReaderAt(ms.msgr)
1739 }
1740 }
1741 return ms.part != nil
1742}
1743
1744// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1745// filters for a query. A nil function is returned if there are no flags to filter
1746// on.
1747func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1748 labels := map[string]bool{}
1749 for _, k := range q.Filter.Labels {
1750 labels[k] = true
1751 }
1752 for _, k := range q.NotFilter.Labels {
1753 labels[k] = false
1754 }
1755
1756 if len(labels) == 0 {
1757 return nil
1758 }
1759
1760 var mask, flags store.Flags
1761 systemflags := map[string][]*bool{
1762 `\answered`: {&mask.Answered, &flags.Answered},
1763 `\flagged`: {&mask.Flagged, &flags.Flagged},
1764 `\deleted`: {&mask.Deleted, &flags.Deleted},
1765 `\seen`: {&mask.Seen, &flags.Seen},
1766 `\draft`: {&mask.Draft, &flags.Draft},
1767 `$junk`: {&mask.Junk, &flags.Junk},
1768 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1769 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1770 `$phishing`: {&mask.Phishing, &flags.Phishing},
1771 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1772 }
1773 keywords := map[string]bool{}
1774 for k, v := range labels {
1775 k = strings.ToLower(k)
1776 if mf, ok := systemflags[k]; ok {
1777 *mf[0] = true
1778 *mf[1] = v
1779 } else {
1780 keywords[k] = v
1781 }
1782 }
1783 return func(msgFlags store.Flags, msgKeywords []string) bool {
1784 var f store.Flags
1785 if f.Set(mask, msgFlags) != flags {
1786 return false
1787 }
1788 for k, v := range keywords {
1789 if slices.Contains(msgKeywords, k) != v {
1790 return false
1791 }
1792 }
1793 return true
1794 }
1795}
1796
1797// attachmentFilterFn returns a function that filters for the attachment-related
1798// filter from the query. A nil function is returned if there are attachment
1799// filters.
1800func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1801 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1802 return nil
1803 }
1804
1805 return func(m store.Message) bool {
1806 if !state.ensurePart(m, true) {
1807 return false
1808 }
1809 types, err := attachmentTypes(log, m, state)
1810 if err != nil {
1811 state.err = err
1812 return false
1813 }
1814 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1815 }
1816}
1817
1818var attachmentMimetypes = map[string]AttachmentType{
1819 "application/pdf": AttachmentPDF,
1820 "application/zip": AttachmentArchive,
1821 "application/x-rar-compressed": AttachmentArchive,
1822 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1823 "application/vnd.ms-excel": AttachmentSpreadsheet,
1824 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1825 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1826 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1827 "application/vnd.ms-powerpoint": AttachmentPresentation,
1828 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1829}
1830var attachmentExtensions = map[string]AttachmentType{
1831 ".pdf": AttachmentPDF,
1832 ".zip": AttachmentArchive,
1833 ".tar": AttachmentArchive,
1834 ".tgz": AttachmentArchive,
1835 ".tar.gz": AttachmentArchive,
1836 ".tbz2": AttachmentArchive,
1837 ".tar.bz2": AttachmentArchive,
1838 ".tar.lz": AttachmentArchive,
1839 ".tlz": AttachmentArchive,
1840 ".tar.xz": AttachmentArchive,
1841 ".txz": AttachmentArchive,
1842 ".tar.zst": AttachmentArchive,
1843 ".tar.lz4": AttachmentArchive,
1844 ".7z": AttachmentArchive,
1845 ".rar": AttachmentArchive,
1846 ".ods": AttachmentSpreadsheet,
1847 ".xls": AttachmentSpreadsheet,
1848 ".xlsx": AttachmentSpreadsheet,
1849 ".odt": AttachmentDocument,
1850 ".doc": AttachmentDocument,
1851 ".docx": AttachmentDocument,
1852 ".odp": AttachmentPresentation,
1853 ".ppt": AttachmentPresentation,
1854 ".pptx": AttachmentPresentation,
1855}
1856
1857func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1858 types := map[AttachmentType]bool{}
1859
1860 pm, err := parsedMessage(log, m, state, false, false, false)
1861 if err != nil {
1862 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1863 }
1864 for _, a := range pm.attachments {
1865 if a.Part.MediaType == "IMAGE" {
1866 types[AttachmentImage] = true
1867 continue
1868 }
1869 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1870 if t, ok := attachmentMimetypes[mt]; ok {
1871 types[t] = true
1872 continue
1873 }
1874 _, filename, err := a.Part.DispositionFilename()
1875 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1876 log.Debugx("parsing disposition/filename", err)
1877 } else if err != nil {
1878 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1879 }
1880 if ext := filepath.Ext(filename); ext != "" {
1881 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1882 types[t] = true
1883 }
1884 }
1885 }
1886
1887 if len(types) == 0 {
1888 types[AttachmentNone] = true
1889 } else {
1890 types[AttachmentAny] = true
1891 }
1892 return types, nil
1893}
1894
1895// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1896// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1897// clash with SMTP envelope) for the query. A nil function is returned if no
1898// filtering is needed.
1899func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1900 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1901 return nil
1902 }
1903
1904 lower := func(l []string) []string {
1905 if len(l) == 0 {
1906 return nil
1907 }
1908 r := make([]string, len(l))
1909 for i, s := range l {
1910 r[i] = strings.ToLower(s)
1911 }
1912 return r
1913 }
1914
1915 filterSubject := lower(q.Filter.Subject)
1916 notFilterSubject := lower(q.NotFilter.Subject)
1917 filterFrom := lower(q.Filter.From)
1918 notFilterFrom := lower(q.NotFilter.From)
1919 filterTo := lower(q.Filter.To)
1920 notFilterTo := lower(q.NotFilter.To)
1921
1922 return func(m store.Message) bool {
1923 if !state.ensurePart(m, false) {
1924 return false
1925 }
1926
1927 var env message.Envelope
1928 if state.part.Envelope != nil {
1929 env = *state.part.Envelope
1930 }
1931
1932 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1933 subject := strings.ToLower(env.Subject)
1934 for _, s := range filterSubject {
1935 if !strings.Contains(subject, s) {
1936 return false
1937 }
1938 }
1939 for _, s := range notFilterSubject {
1940 if strings.Contains(subject, s) {
1941 return false
1942 }
1943 }
1944 }
1945
1946 contains := func(textLower []string, l []message.Address, all bool) bool {
1947 next:
1948 for _, s := range textLower {
1949 for _, a := range l {
1950 name := strings.ToLower(a.Name)
1951 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1952 if strings.Contains(name, s) || strings.Contains(addr, s) {
1953 if !all {
1954 return true
1955 }
1956 continue next
1957 }
1958 }
1959 if all {
1960 return false
1961 }
1962 }
1963 return all
1964 }
1965
1966 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1967 return false
1968 }
1969 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1970 return false
1971 }
1972 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1973 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1974 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1975 return false
1976 }
1977 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1978 return false
1979 }
1980 }
1981 return true
1982 }
1983}
1984
1985// headerFilterFn returns a function that filters for the header filters in the
1986// query. A nil function is returned if there are no header filters.
1987func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1988 if len(q.Filter.Headers) == 0 {
1989 return nil
1990 }
1991
1992 lowerValues := make([]string, len(q.Filter.Headers))
1993 for i, t := range q.Filter.Headers {
1994 lowerValues[i] = strings.ToLower(t[1])
1995 }
1996
1997 return func(m store.Message) bool {
1998 if !state.ensurePart(m, true) {
1999 return false
2000 }
2001 hdr, err := state.part.Header()
2002 if err != nil {
2003 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2004 return false
2005 }
2006
2007 next:
2008 for i, t := range q.Filter.Headers {
2009 k := t[0]
2010 v := lowerValues[i]
2011 l := hdr.Values(k)
2012 if v == "" && len(l) > 0 {
2013 continue
2014 }
2015 for _, e := range l {
2016 if strings.Contains(strings.ToLower(e), v) {
2017 continue next
2018 }
2019 }
2020 return false
2021 }
2022 return true
2023 }
2024}
2025
2026// wordFiltersFn returns a function that applies the word filters of the query. A
2027// nil function is returned when query does not contain a word filter.
2028func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2029 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2030 return nil
2031 }
2032
2033 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2034
2035 return func(m store.Message) bool {
2036 if !state.ensurePart(m, true) {
2037 return false
2038 }
2039
2040 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2041 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
2042 return false
2043 } else {
2044 return ok
2045 }
2046 }
2047}
2048