1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
35)
36
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
39type Request struct {
40 ID int64
41
42 SSEID int64 // SSE connection.
43
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
46 ViewID int64
47
48 // If set, this request and its view are canceled. A new view must be started.
49 Cancel bool
50
51 Query Query
52 Page Page
53}
54
55type ThreadMode string
56
57const (
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
61)
62
63// Query is a request for messages that match filters, in a given order.
64type Query struct {
65 OrderAsc bool // Order by received ascending or desending.
66 Threading ThreadMode
67 Filter Filter
68 NotFilter NotFilter
69}
70
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
73
74const (
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
84)
85
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
88type Filter struct {
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
90 MailboxID int64
91
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
94
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
97 // MailboxID.
98 MailboxName string
99
100 Words []string // Case insensitive substring match for each string.
101 From []string
102 To []string // Including Cc and Bcc.
103 Oldest *time.Time
104 Newest *time.Time
105 Subject []string
106 Attachments AttachmentType
107 Labels []string
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
109 SizeMin int64
110 SizeMax int64
111}
112
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
115 Words []string
116 From []string
117 To []string
118 Subject []string
119 Attachments AttachmentType
120 Labels []string
121}
122
123// Page holds pagination parameters for a request.
124type Page struct {
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
128
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
130 // one request.
131 Count int
132
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
135 // message if found.
136 DestMessageID int64
137}
138
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
140
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
142// included.
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
146 Domain dns.Domain
147}
148
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
153 Date time.Time
154 Subject string
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
158 To []MessageAddress
159 CC []MessageAddress
160 BCC []MessageAddress
161 InReplyTo string
162 MessageID string
163}
164
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
167// messages.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
172 IsSigned bool
173 IsEncrypted bool
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
176}
177
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
180// not duplicated.
181type ParsedMessage struct {
182 ID int64
183 Part message.Part
184 Headers map[string][]string
185 ViewMode store.ViewMode
186
187 Texts []string // Contents of text parts, can be empty.
188
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
193 // attribute.
194 HasHTML bool
195
196 ListReplyAddress *MessageAddress // From List-Post.
197
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
200
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
204 isSigned bool
205 isEncrypted bool
206}
207
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
212 SSEID int64
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 MailboxName string
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
221 Version string
222}
223
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
226// account.
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
230}
231
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
235 ViewID int64
236 RequestID int64
237
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
247
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
250
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
253 ViewEnd bool
254}
255
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
259 ViewID int64
260 RequestID int64
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
263}
264
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
270 ViewID int64
271 RequestID int64
272}
273
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
278 ViewID int64
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
280}
281
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
284 store.ChangeAddUID
285 MessageItems []MessageItem
286}
287
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
291}
292
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
295 store.ChangeFlags
296}
297
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
300 store.ChangeThread
301}
302
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
306}
307
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
311}
312
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
317}
318
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
322}
323
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
327}
328
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
333}
334
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
339type view struct {
340 Request Request
341
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
345
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
349 End bool
350
351 // Whether message must or must not match mailboxIDs.
352 matchMailboxIDs bool
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
356
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
362}
363
364// sses tracks all sse connections, and access to them.
365var sses = struct {
366 sync.Mutex
367 gen int64
368 m map[int64]sse
369}{m: map[int64]sse{}}
370
371// sse represents an sse connection.
372type sse struct {
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
376}
377
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
380 sses.Lock()
381 defer sses.Unlock()
382 delete(sses.m, sse.ID)
383
384 // Drain any pending requests, preventing blocked goroutines from API calls.
385 for {
386 select {
387 case <-sse.Request:
388 default:
389 return
390 }
391 }
392}
393
394func sseRegister(accountName string) sse {
395 sses.Lock()
396 defer sses.Unlock()
397 sses.gen++
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
399 sses.m[v.ID] = v
400 return v
401}
402
403// sseGet returns a reference to an existing connection if it exists and user
404// has access.
405func sseGet(id int64, accountName string) (sse, bool) {
406 sses.Lock()
407 defer sses.Unlock()
408 s := sses.m[id]
409 if s.AccountName != accountName {
410 return sse{}, false
411 }
412 return s, true
413}
414
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
419 accName string
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422 validUntil time.Time
423}
424
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
428 sync.Mutex
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
431}
432
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
436}
437
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
445
446 x.Lock()
447 defer x.Unlock()
448 n := len(x.accountTokens[accName])
449 if n >= 10 {
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
452 }
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
455 }
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
458 return st.token
459}
460
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
463 x.Lock()
464 defer x.Unlock()
465
466 st, ok := x.tokens[token]
467 if !ok {
468 return "", "", "", false, nil
469 }
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
473 } else {
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
478 }
479 }
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
482 }
483 return st.accName, st.address, st.sessionToken, true, nil
484}
485
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
487type ioErr struct {
488 err error
489}
490
491// ensure we have a non-nil moreHeaders, taking it from Settings.
492func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
493 if moreHeaders != nil {
494 return moreHeaders, nil
495 }
496
497 s := store.Settings{ID: 1}
498 if err := tx.Get(&s); err != nil {
499 return nil, fmt.Errorf("get settings: %v", err)
500 }
501 moreHeaders = s.ShowHeaders
502 if moreHeaders == nil {
503 moreHeaders = []string{} // Ensure we won't get Settings again next call.
504 }
505 return moreHeaders, nil
506}
507
508// serveEvents serves an SSE connection. Authentication is done through a query
509// string parameter "singleUseToken", a one-time-use token returned by the Token
510// API call.
511func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
512 if r.Method != "GET" {
513 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
514 return
515 }
516
517 flusher, ok := w.(http.Flusher)
518 if !ok {
519 log.Error("internal error: ResponseWriter not a http.Flusher")
520 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
521 return
522 }
523
524 q := r.URL.Query()
525 token := q.Get("singleUseToken")
526 if token == "" {
527 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
528 return
529 }
530 accName, address, sessionToken, ok, err := sseTokens.check(token)
531 if err != nil {
532 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
533 return
534 }
535 if !ok {
536 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
537 return
538 }
539 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
540 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
541 return
542 }
543
544 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
545 // incoming responses with its slow-network similation.
546 var waitMin, waitMax time.Duration
547 waitMinMsec := q.Get("waitMinMsec")
548 waitMaxMsec := q.Get("waitMaxMsec")
549 if waitMinMsec != "" && waitMaxMsec != "" {
550 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
551 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
552 return
553 } else {
554 waitMin = time.Duration(v) * time.Millisecond
555 }
556
557 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
558 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
559 return
560 } else {
561 waitMax = time.Duration(v) * time.Millisecond
562 }
563 }
564
565 // Parse the request with initial mailbox/search criteria.
566 var req Request
567 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
568 dec.DisallowUnknownFields()
569 if err := dec.Decode(&req); err != nil {
570 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
571 return
572 } else if req.Page.Count <= 0 {
573 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
574 return
575 }
576 if req.Query.Threading == "" {
577 req.Query.Threading = ThreadOff
578 }
579
580 var writer *eventWriter
581
582 metricSSEConnections.Inc()
583 defer metricSSEConnections.Dec()
584
585 // Below here, error handling cause through xcheckf, which panics with
586 // *sherpa.Error, after which we send an error event to the client. We can also get
587 // an *ioErr when the connection is broken.
588 defer func() {
589 x := recover()
590 if x == nil {
591 return
592 }
593 if err, ok := x.(*sherpa.Error); ok {
594 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
595 } else if _, ok := x.(ioErr); ok {
596 return
597 } else {
598 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
599 debug.PrintStack()
600 metrics.PanicInc(metrics.Webmail)
601 panic(x)
602 }
603 }()
604
605 h := w.Header()
606 h.Set("Content-Type", "text/event-stream")
607 h.Set("Cache-Control", "no-cache")
608
609 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
610 // keys), so should be quite compressible.
611 var out writeFlusher
612 gz := mox.AcceptsGzip(r)
613 if gz {
614 h.Set("Content-Encoding", "gzip")
615 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
616 } else {
617 out = nopFlusher{w}
618 }
619 out = httpFlusher{out, flusher}
620
621 // We'll be writing outgoing SSE events through writer.
622 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
623 defer writer.close()
624
625 // Fetch initial data.
626 acc, err := store.OpenAccount(log, accName, true)
627 xcheckf(ctx, err, "open account")
628 defer func() {
629 err := acc.Close()
630 log.Check(err, "closing account")
631 }()
632 comm := store.RegisterComm(acc)
633 defer comm.Unregister()
634
635 // List addresses that the client can use to send email from.
636 accConf, _ := acc.Conf()
637 loginAddr, err := smtp.ParseAddress(address)
638 xcheckf(ctx, err, "parsing login address")
639 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
640 xcheckf(ctx, err, "looking up destination for login address")
641 loginName := accConf.FullName
642 if dest.FullName != "" {
643 loginName = dest.FullName
644 }
645 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
646 var addresses []MessageAddress
647 for a, dest := range accConf.Destinations {
648 name := dest.FullName
649 if name == "" {
650 name = accConf.FullName
651 }
652 var ma MessageAddress
653 if strings.HasPrefix(a, "@") {
654 dom, err := dns.ParseDomain(a[1:])
655 xcheckf(ctx, err, "parsing destination address for account")
656 ma = MessageAddress{Domain: dom}
657 } else {
658 addr, err := smtp.ParseAddress(a)
659 xcheckf(ctx, err, "parsing destination address for account")
660 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
661 }
662 addresses = append(addresses, ma)
663 }
664 // User is allowed to send using alias address as message From address. Webmail
665 // will choose it when replying to a message sent to that address.
666 aliasAddrs := map[MessageAddress]bool{}
667 for _, a := range accConf.Aliases {
668 if a.Alias.AllowMsgFrom {
669 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
670 if !aliasAddrs[ma] {
671 addresses = append(addresses, ma)
672 }
673 aliasAddrs[ma] = true
674 }
675 }
676
677 // We implicitly start a query. We use the reqctx for the transaction, because the
678 // transaction is passed to the query, which can be canceled.
679 reqctx, reqctxcancel := context.WithCancel(ctx)
680 defer func() {
681 // We also cancel in cancelDrain later on, but there is a brief window where the
682 // context wouldn't be canceled.
683 if reqctxcancel != nil {
684 reqctxcancel()
685 reqctxcancel = nil
686 }
687 }()
688
689 // qtx is kept around during connection initialization, until we pass it off to the
690 // goroutine that starts querying for messages.
691 var qtx *bstore.Tx
692 defer func() {
693 if qtx != nil {
694 err := qtx.Rollback()
695 log.Check(err, "rolling back")
696 }
697 }()
698
699 var mbl []store.Mailbox
700 settings := store.Settings{ID: 1}
701
702 // We only take the rlock when getting the tx.
703 acc.WithRLock(func() {
704 // Now a read-only transaction we'll use during the query.
705 qtx, err = acc.DB.Begin(reqctx, false)
706 xcheckf(ctx, err, "begin transaction")
707
708 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
709 xcheckf(ctx, err, "list mailboxes")
710
711 err = qtx.Get(&settings)
712 xcheckf(ctx, err, "get settings")
713 })
714
715 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
716 var zerofilter Filter
717 var zeronotfilter NotFilter
718 var mailbox store.Mailbox
719 var mailboxPrefixes []string
720 var matchMailboxes bool
721 mailboxIDs := map[int64]bool{}
722 mailboxName := req.Query.Filter.MailboxName
723 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
724 if mailboxName == "" {
725 mailboxName = "Inbox"
726 }
727
728 var inbox store.Mailbox
729 for _, e := range mbl {
730 if e.Name == mailboxName {
731 mailbox = e
732 }
733 if e.Name == "Inbox" {
734 inbox = e
735 }
736 }
737 if mailbox.ID == 0 {
738 mailbox = inbox
739 }
740 if mailbox.ID == 0 {
741 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
742 }
743 req.Query.Filter.MailboxID = mailbox.ID
744 req.Query.Filter.MailboxName = ""
745 mailboxPrefixes = []string{mailbox.Name + "/"}
746 matchMailboxes = true
747 mailboxIDs[mailbox.ID] = true
748 } else {
749 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
750 }
751 if req.Query.Filter.MailboxChildrenIncluded {
752 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
753 }
754
755 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
756
757 sse := sseRegister(acc.Name)
758 defer sse.unregister()
759
760 // Per-domain localpart config so webclient can decide if an address belongs to the account.
761 domainAddressConfigs := map[string]DomainAddressConfig{}
762 for _, a := range addresses {
763 dom, _ := mox.Conf.Domain(a.Domain)
764 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
765 }
766
767 // Write first event, allowing client to fill its UI with mailboxes.
768 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
769 writer.xsendEvent(ctx, log, "start", start)
770
771 // The goroutine doing the querying will send messages on these channels, which
772 // result in an event being written on the SSE connection.
773 viewMsgsc := make(chan EventViewMsgs)
774 viewErrc := make(chan EventViewErr)
775 viewResetc := make(chan EventViewReset)
776 donec := make(chan int64) // When request is done.
777
778 // Start a view, it determines if we send a change to the client. And start an
779 // implicit query for messages, we'll send the messages to the client which can
780 // fill its ui with messages.
781 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
782 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
783 qtx = nil // viewRequestTx closes qtx
784
785 // When canceling a query, we must drain its messages until it says it is done.
786 // Otherwise the sending goroutine would hang indefinitely on a channel send.
787 cancelDrain := func() {
788 if reqctxcancel != nil {
789 // Cancel the goroutine doing the querying.
790 reqctxcancel()
791 reqctx = nil
792 reqctxcancel = nil
793 } else {
794 return
795 }
796
797 // Drain events until done.
798 for {
799 select {
800 case <-viewMsgsc:
801 case <-viewErrc:
802 case <-viewResetc:
803 case <-donec:
804 return
805 }
806 }
807 }
808
809 // If we stop and a query is in progress, we must drain the channel it will send on.
810 defer cancelDrain()
811
812 // Changes broadcasted by other connections on this account. If applicable for the
813 // connection/view, we send events.
814 xprocessChanges := func(changes []store.Change) {
815 taggedChanges := [][2]any{}
816
817 newPreviews := map[int64]string{}
818 defer storeNewPreviews(ctx, log, acc, newPreviews)
819
820 // We get a transaction first time we need it.
821 var xtx *bstore.Tx
822 defer func() {
823 if xtx != nil {
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
826 }
827 }()
828 ensureTx := func() error {
829 if xtx != nil {
830 return nil
831 }
832 acc.RLock()
833 defer acc.RUnlock()
834 var err error
835 xtx, err = acc.DB.Begin(ctx, false)
836 return err
837 }
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
843 }
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
845 }
846
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
850 err := ensureTx()
851 xcheckf(ctx, err, "transaction")
852
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
855 return moreHeaders
856 }
857
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
863 uidsAny[i] = uid
864 }
865 err := ensureTx()
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
875 }
876 return nil
877 })
878 xcheckf(ctx, err, "fetching messages for change")
879 return changedUIDs
880 }
881
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
891 if !ok && !thread {
892 continue
893 }
894
895 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
897 state.clear()
898 xcheckf(ctx, err, "make messageitem")
899 mi.MatchQuery = ok
900
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
903 err := ensureTx()
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
909 }
910
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
912
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
916 }
917
918 case store.ChangeRemoveUIDs:
919 comm.RemovalSeen(c)
920
921 // We may send changes for uids the client doesn't know, that's fine.
922 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
923 if len(changedUIDs) == 0 {
924 continue
925 }
926 ch := ChangeMsgRemove{c}
927 ch.UIDs = changedUIDs
928 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
929
930 case store.ChangeFlags:
931 // We may send changes for uids the client doesn't know, that's fine.
932 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
933 if len(changedUIDs) == 0 {
934 continue
935 }
936 ch := ChangeMsgFlags{c}
937 ch.UID = changedUIDs[0]
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
939
940 case store.ChangeThread:
941 // Change in muted/collaped state, just always ship it.
942 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
943
944 case store.ChangeRemoveMailbox:
945 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
946
947 case store.ChangeAddMailbox:
948 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
949
950 case store.ChangeRenameMailbox:
951 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
952
953 case store.ChangeMailboxCounts:
954 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
955
956 case store.ChangeMailboxSpecialUse:
957 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
958
959 case store.ChangeMailboxKeywords:
960 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
961
962 case store.ChangeAddSubscription:
963 // Webmail does not care about subscriptions.
964
965 case store.ChangeAnnotation:
966 // Nothing.
967
968 default:
969 panic(fmt.Sprintf("missing case for change %T", c))
970 }
971 }
972
973 if len(taggedChanges) > 0 {
974 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
975 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
976 }
977 }
978
979 timer := time.NewTimer(5 * time.Minute) // For keepalives.
980 defer timer.Stop()
981 for {
982 if writer.wrote {
983 timer.Reset(5 * time.Minute)
984 writer.wrote = false
985 }
986
987 pending := comm.Pending
988 if reqctx != nil {
989 pending = nil
990 }
991
992 select {
993 case <-mox.Shutdown.Done():
994 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
995 // Work around go vet, it doesn't see defer cancelDrain.
996 if reqctxcancel != nil {
997 reqctxcancel()
998 }
999 return
1000
1001 case <-timer.C:
1002 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1003 if err == nil {
1004 err = out.Flush()
1005 }
1006 if err != nil {
1007 log.Errorx("write keepalive", err)
1008 // Work around go vet, it doesn't see defer cancelDrain.
1009 if reqctxcancel != nil {
1010 reqctxcancel()
1011 }
1012 return
1013 }
1014 writer.wrote = true
1015
1016 case vm := <-viewMsgsc:
1017 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1018 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1019 }
1020 if vm.ViewEnd {
1021 v.End = true
1022 }
1023 if len(vm.MessageItems) > 0 {
1024 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1025 }
1026 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1027
1028 case ve := <-viewErrc:
1029 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1030 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1031 }
1032 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1033 // Work around go vet, it doesn't see defer cancelDrain.
1034 if reqctxcancel != nil {
1035 reqctxcancel()
1036 }
1037 return
1038 }
1039 writer.xsendEvent(ctx, log, "viewErr", ve)
1040
1041 case vr := <-viewResetc:
1042 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1043 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1044 }
1045 writer.xsendEvent(ctx, log, "viewReset", vr)
1046
1047 case id := <-donec:
1048 if id != v.Request.ID {
1049 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1050 }
1051 if reqctxcancel != nil {
1052 reqctxcancel()
1053 }
1054 reqctx = nil
1055 reqctxcancel = nil
1056
1057 case req := <-sse.Request:
1058 if reqctx != nil {
1059 cancelDrain()
1060 }
1061 if req.Cancel {
1062 v = view{req, time.Time{}, false, false, nil, nil}
1063 continue
1064 }
1065
1066 reqctx, reqctxcancel = context.WithCancel(ctx)
1067
1068 stop := func() (stop bool) {
1069 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1070 var rtx *bstore.Tx
1071 var err error
1072 defer func() {
1073 if rtx != nil {
1074 err = rtx.Rollback()
1075 log.Check(err, "rolling back transaction")
1076 }
1077 }()
1078 acc.WithRLock(func() {
1079 rtx, err = acc.DB.Begin(reqctx, false)
1080 })
1081 if err != nil {
1082 reqctxcancel()
1083 reqctx = nil
1084 reqctxcancel = nil
1085
1086 if errors.Is(err, context.Canceled) {
1087 return true
1088 }
1089 err := fmt.Errorf("begin transaction: %v", err)
1090 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1091 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1092 return false
1093 }
1094
1095 // Reset view state for new query.
1096 if req.ViewID != v.Request.ViewID {
1097 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1098 if req.Query.Filter.MailboxChildrenIncluded {
1099 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1100 }
1101 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1102 } else {
1103 v.Request = req
1104 }
1105 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1106 rtx = nil
1107 return false
1108 }()
1109 if stop {
1110 return
1111 }
1112
1113 case <-pending:
1114 xprocessChanges(comm.Get())
1115
1116 case <-ctx.Done():
1117 // Work around go vet, it doesn't see defer cancelDrain.
1118 if reqctxcancel != nil {
1119 reqctxcancel()
1120 }
1121 return
1122 }
1123 }
1124}
1125
1126// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1127// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1128// mailboxIDs must or must not match. mailboxPrefixes is for use with
1129// xgatherMailboxIDs to gather children of the mailboxIDs.
1130func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1131 matchMailboxes = true
1132 mailboxIDs = map[int64]bool{}
1133 if f.MailboxID == -1 {
1134 matchMailboxes = false
1135 // Add the trash, junk and account rejects mailbox.
1136 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1137 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1138 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1139 mailboxIDs[mb.ID] = true
1140 }
1141 return nil
1142 })
1143 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1144 } else if f.MailboxID > 0 {
1145 mb, err := store.MailboxID(tx, f.MailboxID)
1146 xcheckf(ctx, err, "get mailbox")
1147 mailboxIDs[f.MailboxID] = true
1148 mailboxPrefixes = []string{mb.Name + "/"}
1149 }
1150 return
1151}
1152
1153// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1154// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1155func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1156 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1157 if len(mailboxPrefixes) == 0 {
1158 return
1159 }
1160 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1161 for _, p := range mailboxPrefixes {
1162 if strings.HasPrefix(mb.Name, p) {
1163 mailboxIDs[mb.ID] = true
1164 break
1165 }
1166 }
1167 return nil
1168 })
1169 xcheckf(ctx, err, "gathering mailboxes")
1170}
1171
1172// matchesMailbox returns whether a mailbox matches the view.
1173func (v view) matchesMailbox(mailboxID int64) bool {
1174 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1175}
1176
1177// inRange returns whether m is within the range for the view, whether a change for
1178// this message should be sent to the client so it can update its state.
1179func (v view) inRange(m store.Message) bool {
1180 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1181}
1182
1183// matches checks if the message, identified by either messageID or mailboxID+UID,
1184// is in the current "view" (i.e. passing the filters, and if checkRange is set
1185// also if within the range of sent messages based on sort order and the last seen
1186// message). getmsg retrieves the message, which may be necessary depending on the
1187// active filters. Used to determine if a store.Change with a new message should be
1188// sent, and for the destination and anchor messages in view requests.
1189func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1190 var m store.Message
1191 ensureMessage := func() bool {
1192 if m.ID == 0 && rerr == nil {
1193 m, rerr = getmsg(messageID, mailboxID, uid)
1194 }
1195 return rerr == nil
1196 }
1197
1198 q := v.Request.Query
1199
1200 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1201
1202 // Check filters.
1203 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1204 return false, rerr
1205 }
1206 // note: anchorMessageID is not relevant for matching.
1207 flagfilter := q.flagFilterFn()
1208 if flagfilter != nil && !flagfilter(flags, keywords) {
1209 return false, rerr
1210 }
1211
1212 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1213 return false, rerr
1214 }
1215 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1216 return false, rerr
1217 }
1218
1219 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1220 return false, rerr
1221 }
1222 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1223 return false, rerr
1224 }
1225
1226 state := msgState{acc: acc, log: log}
1227 defer func() {
1228 if rerr == nil && state.err != nil {
1229 rerr = state.err
1230 }
1231 state.clear()
1232 }()
1233
1234 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1235 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1236 return false, rerr
1237 }
1238
1239 envFilter := q.envFilterFn(log, &state)
1240 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1241 return false, rerr
1242 }
1243
1244 headerFilter := q.headerFilterFn(log, &state)
1245 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1246 return false, rerr
1247 }
1248
1249 wordsFilter := q.wordsFilterFn(log, &state)
1250 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1251 return false, rerr
1252 }
1253
1254 // Now check that we are either within the sorting order, or "last" was sent.
1255 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1256 return true, rerr
1257 }
1258 return false, rerr
1259}
1260
1261type msgResp struct {
1262 err error // If set, an error happened and fields below are not set.
1263 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1264 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1265 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1266 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1267}
1268
1269func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1270 if len(newPreviews) == 0 {
1271 return
1272 }
1273
1274 defer func() {
1275 x := recover()
1276 if x != nil {
1277 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1278 debug.PrintStack()
1279 metrics.PanicInc(metrics.Store)
1280 }
1281 }()
1282
1283 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1284 for id, preview := range newPreviews {
1285 m := store.Message{ID: id}
1286 if err := tx.Get(&m); err != nil {
1287 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1288 } else if !m.Expunged {
1289 m.Preview = &preview
1290 if err := tx.Update(&m); err != nil {
1291 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1292 }
1293 }
1294 }
1295 return nil
1296 })
1297 log.Check(err, "saving new previews with messages")
1298}
1299
1300// viewRequestTx executes a request (query with filters, pagination) by
1301// launching a new goroutine with queryMessages, receiving results as msgResp,
1302// and sending Event* to the SSE connection.
1303//
1304// It always closes tx.
1305func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1306 // Newly generated previews which we'll save when the operation is done.
1307 newPreviews := map[int64]string{}
1308
1309 defer func() {
1310 err := tx.Rollback()
1311 log.Check(err, "rolling back query transaction")
1312
1313 donec <- v.Request.ID
1314
1315 // ctx can be canceled, we still want to store the previews.
1316 storeNewPreviews(context.Background(), log, acc, newPreviews)
1317
1318 x := recover() // Should not happen, but don't take program down if it does.
1319 if x != nil {
1320 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1321 debug.PrintStack()
1322 metrics.PanicInc(metrics.Webmailrequest)
1323 }
1324 }()
1325
1326 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1327 var parsedMessage *ParsedMessage
1328 var viewEnd bool
1329
1330 var immediate bool // No waiting, flush immediate.
1331 t := time.NewTimer(300 * time.Millisecond)
1332 defer t.Stop()
1333
1334 sendViewMsgs := func(force bool) {
1335 if len(msgitems) == 0 && !force {
1336 return
1337 }
1338
1339 immediate = false
1340 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1341 msgitems = nil
1342 parsedMessage = nil
1343 t.Reset(300 * time.Millisecond)
1344 }
1345
1346 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1347
1348 mrc := make(chan msgResp, 1)
1349 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1350
1351 for {
1352 select {
1353 case mr, ok := <-mrc:
1354 if !ok {
1355 sendViewMsgs(false)
1356 // Empty message list signals this query is done.
1357 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1358 return
1359 }
1360 if mr.err != nil {
1361 sendViewMsgs(false)
1362 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1363 return
1364 }
1365 if mr.reset {
1366 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1367 continue
1368 }
1369 if mr.viewEnd {
1370 viewEnd = true
1371 sendViewMsgs(true)
1372 return
1373 }
1374
1375 msgitems = append(msgitems, mr.mil)
1376 if mr.pm != nil {
1377 parsedMessage = mr.pm
1378 }
1379 if immediate {
1380 sendViewMsgs(true)
1381 }
1382
1383 case <-t.C:
1384 if len(msgitems) == 0 {
1385 // Nothing to send yet. We'll send immediately when the next message comes in.
1386 immediate = true
1387 } else {
1388 sendViewMsgs(false)
1389 }
1390 }
1391 }
1392}
1393
1394// queryMessages executes a query, with filter, pagination, destination message id
1395// to fetch (the message that the client had in view and wants to display again).
1396// It sends on msgc, with several types of messages: errors, whether the view is
1397// reset due to missing AnchorMessageID, and when the end of the view was reached
1398// and/or for a message.
1399// newPreviews is filled with previews, the caller must save them.
1400func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1401 defer func() {
1402 x := recover() // Should not happen, but don't take program down if it does.
1403 if x != nil {
1404 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1405 debug.PrintStack()
1406 mrc <- msgResp{err: fmt.Errorf("query failed")}
1407 metrics.PanicInc(metrics.Webmailquery)
1408 }
1409
1410 close(mrc)
1411 }()
1412
1413 query := v.Request.Query
1414 page := v.Request.Page
1415
1416 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1417
1418 checkMessage := func(id int64) (valid bool, rerr error) {
1419 m := store.Message{ID: id}
1420 err := tx.Get(&m)
1421 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1422 return false, nil
1423 } else if err != nil {
1424 return false, err
1425 } else {
1426 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1427 return m, nil
1428 })
1429 }
1430 }
1431
1432 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1433 if page.AnchorMessageID > 0 {
1434 // Check if message exists and (still) matches the filter.
1435 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1436 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1437 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1438 return
1439 } else if !valid {
1440 mrc <- msgResp{reset: true}
1441 page.AnchorMessageID = 0
1442 }
1443 }
1444
1445 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1446 // it instead of continuing to send message till the end of the view.
1447 if page.DestMessageID > 0 {
1448 if valid, err := checkMessage(page.DestMessageID); err != nil {
1449 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1450 return
1451 } else if !valid {
1452 page.DestMessageID = 0
1453 }
1454 }
1455
1456 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1457
1458 q := bstore.QueryTx[store.Message](tx)
1459 q.FilterEqual("Expunged", false)
1460 if len(v.mailboxIDs) > 0 {
1461 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1462 // Should result in fast indexed query.
1463 for mbID := range v.mailboxIDs {
1464 q.FilterNonzero(store.Message{MailboxID: mbID})
1465 }
1466 } else {
1467 idsAny := make([]any, 0, len(v.mailboxIDs))
1468 for mbID := range v.mailboxIDs {
1469 idsAny = append(idsAny, mbID)
1470 }
1471 if v.matchMailboxIDs {
1472 q.FilterEqual("MailboxID", idsAny...)
1473 } else {
1474 q.FilterNotEqual("MailboxID", idsAny...)
1475 }
1476 }
1477 }
1478
1479 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1480 if page.AnchorMessageID > 0 {
1481 var seen = false
1482 q.FilterFn(func(m store.Message) bool {
1483 if seen {
1484 return true
1485 }
1486 seen = m.ID == page.AnchorMessageID
1487 return false
1488 })
1489 }
1490
1491 // We may be added filters the the query below. The FilterFn signature does not
1492 // implement reporting errors, or anything else, just a bool. So when making the
1493 // filter functions, we give them a place to store parsed message state, and an
1494 // error. We check the error during and after query execution.
1495 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1496 defer state.clear()
1497
1498 flagfilter := query.flagFilterFn()
1499 if flagfilter != nil {
1500 q.FilterFn(func(m store.Message) bool {
1501 return flagfilter(m.Flags, m.Keywords)
1502 })
1503 }
1504
1505 if query.Filter.Oldest != nil {
1506 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1507 }
1508 if query.Filter.Newest != nil {
1509 q.FilterLessEqual("Received", *query.Filter.Newest)
1510 }
1511
1512 if query.Filter.SizeMin > 0 {
1513 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1514 }
1515 if query.Filter.SizeMax > 0 {
1516 q.FilterLessEqual("Size", query.Filter.SizeMax)
1517 }
1518
1519 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1520 if attachmentFilter != nil {
1521 q.FilterFn(attachmentFilter)
1522 }
1523
1524 envFilter := query.envFilterFn(log, &state)
1525 if envFilter != nil {
1526 q.FilterFn(envFilter)
1527 }
1528
1529 headerFilter := query.headerFilterFn(log, &state)
1530 if headerFilter != nil {
1531 q.FilterFn(headerFilter)
1532 }
1533
1534 wordsFilter := query.wordsFilterFn(log, &state)
1535 if wordsFilter != nil {
1536 q.FilterFn(wordsFilter)
1537 }
1538
1539 var moreHeaders []string // From store.Settings.ShowHeaders
1540
1541 if query.OrderAsc {
1542 q.SortAsc("Received")
1543 } else {
1544 q.SortDesc("Received")
1545 }
1546 found := page.DestMessageID <= 0
1547 end := true
1548 have := 0
1549 err := q.ForEach(func(m store.Message) error {
1550 // Check for an error in one of the filters, propagate it.
1551 if state.err != nil {
1552 return state.err
1553 }
1554
1555 if have >= page.Count && found || have > 10000 {
1556 end = false
1557 return bstore.StopForEach
1558 }
1559
1560 if _, ok := v.threadIDs[m.ThreadID]; ok {
1561 // Message was already returned as part of a thread.
1562 return nil
1563 }
1564
1565 var pm *ParsedMessage
1566 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1567 // For threads, if there was no DestMessageID, we may be getting the newest
1568 // message. For an initial view, this isn't necessarily the first the user is
1569 // expected to read first, that would be the first unread, which we'll get below
1570 // when gathering the thread.
1571 found = true
1572 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1573 if err != nil && errors.Is(err, message.ErrHeader) {
1574 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1575 } else if err != nil {
1576 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1577 } else {
1578 pm = &xpm
1579 }
1580 }
1581
1582 var err error
1583 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1584 if err != nil {
1585 return fmt.Errorf("ensuring more headers: %v", err)
1586 }
1587
1588 mi, err := messageItem(log, m, &state, moreHeaders)
1589 if err != nil {
1590 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1591 }
1592 mil := []MessageItem{mi}
1593 if query.Threading != ThreadOff {
1594 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1595 if err != nil {
1596 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1597 }
1598 if xpm != nil {
1599 pm = xpm
1600 found = true
1601 }
1602 mil = append(mil, more...)
1603 v.threadIDs[m.ThreadID] = struct{}{}
1604
1605 // Calculate how many messages the frontend is going to show, and only count those as returned.
1606 collapsed := map[int64]bool{}
1607 for _, mi := range mil {
1608 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1609 }
1610 unread := map[int64]bool{} // Propagated to thread root.
1611 if query.Threading == ThreadUnread {
1612 for _, mi := range mil {
1613 mm := mi.Message
1614 if mm.Seen {
1615 continue
1616 }
1617 unread[mm.ID] = true
1618 for _, id := range mm.ThreadParentIDs {
1619 unread[id] = true
1620 }
1621 }
1622 }
1623 for _, mi := range mil {
1624 mm := mi.Message
1625 threadRoot := true
1626 rootID := mm.ID
1627 for _, id := range mm.ThreadParentIDs {
1628 if _, ok := collapsed[id]; ok {
1629 threadRoot = false
1630 rootID = id
1631 }
1632 }
1633 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1634 have++
1635 }
1636 }
1637 } else {
1638 have++
1639 }
1640 if pm != nil && len(pm.envelope.From) == 1 {
1641 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1642 if err != nil {
1643 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1644 }
1645 }
1646 mrc <- msgResp{mil: mil, pm: pm}
1647 return nil
1648 })
1649 // Check for an error in one of the filters again. Check in ForEach would not
1650 // trigger if the last message has the error.
1651 if err == nil && state.err != nil {
1652 err = state.err
1653 }
1654 if err != nil {
1655 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1656 return
1657 }
1658 if end {
1659 mrc <- msgResp{viewEnd: true}
1660 }
1661}
1662
1663func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1664 if m.ThreadID == 0 {
1665 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1666 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1667 }
1668
1669 // Fetch other messages for this thread.
1670 qt := bstore.QueryTx[store.Message](tx)
1671 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1672 qt.FilterEqual("Expunged", false)
1673 qt.FilterNotEqual("ID", m.ID)
1674 qt.SortAsc("ID")
1675 tml, err := qt.List()
1676 if err != nil {
1677 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1678 }
1679
1680 var mil []MessageItem
1681 var pm *ParsedMessage
1682 var firstUnread bool
1683 for _, tm := range tml {
1684 err := func() error {
1685 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1686 defer xstate.clear()
1687
1688 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1689 if err != nil {
1690 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1691 }
1692 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1693 return tm, nil
1694 })
1695 if err != nil {
1696 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1697 }
1698 mil = append(mil, mi)
1699
1700 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1701 firstUnread = !tm.Seen
1702 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1703 if err != nil && errors.Is(err, message.ErrHeader) {
1704 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1705 } else if err != nil {
1706 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1707 } else {
1708 pm = &xpm
1709 }
1710 }
1711 return nil
1712 }()
1713 if err != nil {
1714 return nil, nil, err
1715 }
1716 }
1717
1718 // Finally, the message that caused us to gather this thread (which is likely the
1719 // most recent message in the thread) could be the only unread message.
1720 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1721 xstate := msgState{acc: acc, log: log}
1722 defer xstate.clear()
1723 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1724 if err != nil && errors.Is(err, message.ErrHeader) {
1725 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1726 } else if err != nil {
1727 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1728 } else {
1729 pm = &xpm
1730 }
1731 }
1732
1733 return mil, pm, nil
1734}
1735
1736// While checking the filters on a message, we may need to get more message
1737// details as each filter passes. We check the filters that need the basic
1738// information first, and load and cache more details for the next filters.
1739// msgState holds parsed details for a message, it is updated while filtering,
1740// with more information or reset for a next message.
1741type msgState struct {
1742 acc *store.Account // Never changes during lifetime.
1743 err error // Once set, doesn't get cleared.
1744 m store.Message
1745 part *message.Part // Will be without Reader when msgr is nil.
1746 msgr *store.MsgReader
1747 log mlog.Log
1748
1749 // If not nil, messages will get their Preview field filled when nil, and message
1750 // id and preview added to newPreviews, and saved in a separate write transaction
1751 // when the operation is done.
1752 newPreviews map[int64]string
1753}
1754
1755func (ms *msgState) clear() {
1756 if ms.msgr != nil {
1757 err := ms.msgr.Close()
1758 ms.log.Check(err, "closing message reader from state")
1759 ms.msgr = nil
1760 }
1761 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1762}
1763
1764func (ms *msgState) ensureMsg(m store.Message) {
1765 if m.ID != ms.m.ID {
1766 ms.clear()
1767 }
1768 ms.m = m
1769}
1770
1771func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1772 ms.ensureMsg(m)
1773
1774 if ms.err == nil {
1775 if ms.part == nil {
1776 if m.ParsedBuf == nil {
1777 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1778 return false
1779 }
1780 var p message.Part
1781 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1782 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1783 return false
1784 }
1785 ms.part = &p
1786 }
1787 if withMsgReader && ms.msgr == nil {
1788 ms.msgr = ms.acc.MessageReader(m)
1789 ms.part.SetReaderAt(ms.msgr)
1790 }
1791 }
1792 return ms.part != nil
1793}
1794
1795// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1796// filters for a query. A nil function is returned if there are no flags to filter
1797// on.
1798func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1799 labels := map[string]bool{}
1800 for _, k := range q.Filter.Labels {
1801 labels[k] = true
1802 }
1803 for _, k := range q.NotFilter.Labels {
1804 labels[k] = false
1805 }
1806
1807 if len(labels) == 0 {
1808 return nil
1809 }
1810
1811 var mask, flags store.Flags
1812 systemflags := map[string][]*bool{
1813 `\answered`: {&mask.Answered, &flags.Answered},
1814 `\flagged`: {&mask.Flagged, &flags.Flagged},
1815 `\deleted`: {&mask.Deleted, &flags.Deleted},
1816 `\seen`: {&mask.Seen, &flags.Seen},
1817 `\draft`: {&mask.Draft, &flags.Draft},
1818 `$junk`: {&mask.Junk, &flags.Junk},
1819 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1820 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1821 `$phishing`: {&mask.Phishing, &flags.Phishing},
1822 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1823 }
1824 keywords := map[string]bool{}
1825 for k, v := range labels {
1826 k = strings.ToLower(k)
1827 if mf, ok := systemflags[k]; ok {
1828 *mf[0] = true
1829 *mf[1] = v
1830 } else {
1831 keywords[k] = v
1832 }
1833 }
1834 return func(msgFlags store.Flags, msgKeywords []string) bool {
1835 var f store.Flags
1836 if f.Set(mask, msgFlags) != flags {
1837 return false
1838 }
1839 for k, v := range keywords {
1840 if slices.Contains(msgKeywords, k) != v {
1841 return false
1842 }
1843 }
1844 return true
1845 }
1846}
1847
1848// attachmentFilterFn returns a function that filters for the attachment-related
1849// filter from the query. A nil function is returned if there are attachment
1850// filters.
1851func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1852 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1853 return nil
1854 }
1855
1856 return func(m store.Message) bool {
1857 if !state.ensurePart(m, true) {
1858 return false
1859 }
1860 types, err := attachmentTypes(log, m, state)
1861 if err != nil {
1862 state.err = err
1863 return false
1864 }
1865 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1866 }
1867}
1868
1869var attachmentMimetypes = map[string]AttachmentType{
1870 "application/pdf": AttachmentPDF,
1871 "application/zip": AttachmentArchive,
1872 "application/x-rar-compressed": AttachmentArchive,
1873 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1874 "application/vnd.ms-excel": AttachmentSpreadsheet,
1875 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1876 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1877 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1878 "application/vnd.ms-powerpoint": AttachmentPresentation,
1879 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1880}
1881var attachmentExtensions = map[string]AttachmentType{
1882 ".pdf": AttachmentPDF,
1883 ".zip": AttachmentArchive,
1884 ".tar": AttachmentArchive,
1885 ".tgz": AttachmentArchive,
1886 ".tar.gz": AttachmentArchive,
1887 ".tbz2": AttachmentArchive,
1888 ".tar.bz2": AttachmentArchive,
1889 ".tar.lz": AttachmentArchive,
1890 ".tlz": AttachmentArchive,
1891 ".tar.xz": AttachmentArchive,
1892 ".txz": AttachmentArchive,
1893 ".tar.zst": AttachmentArchive,
1894 ".tar.lz4": AttachmentArchive,
1895 ".7z": AttachmentArchive,
1896 ".rar": AttachmentArchive,
1897 ".ods": AttachmentSpreadsheet,
1898 ".xls": AttachmentSpreadsheet,
1899 ".xlsx": AttachmentSpreadsheet,
1900 ".odt": AttachmentDocument,
1901 ".doc": AttachmentDocument,
1902 ".docx": AttachmentDocument,
1903 ".odp": AttachmentPresentation,
1904 ".ppt": AttachmentPresentation,
1905 ".pptx": AttachmentPresentation,
1906}
1907
1908func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1909 types := map[AttachmentType]bool{}
1910
1911 pm, err := parsedMessage(log, &m, state, false, false, false)
1912 if err != nil {
1913 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1914 }
1915 for _, a := range pm.attachments {
1916 if a.Part.MediaType == "IMAGE" {
1917 types[AttachmentImage] = true
1918 continue
1919 }
1920 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1921 if t, ok := attachmentMimetypes[mt]; ok {
1922 types[t] = true
1923 continue
1924 }
1925 _, filename, err := a.Part.DispositionFilename()
1926 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1927 log.Debugx("parsing disposition/filename", err)
1928 } else if err != nil {
1929 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1930 }
1931 if ext := filepath.Ext(filename); ext != "" {
1932 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1933 types[t] = true
1934 }
1935 }
1936 }
1937
1938 if len(types) == 0 {
1939 types[AttachmentNone] = true
1940 } else {
1941 types[AttachmentAny] = true
1942 }
1943 return types, nil
1944}
1945
1946// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1947// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1948// clash with SMTP envelope) for the query. A nil function is returned if no
1949// filtering is needed.
1950func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1951 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1952 return nil
1953 }
1954
1955 lower := func(l []string) []string {
1956 if len(l) == 0 {
1957 return nil
1958 }
1959 r := make([]string, len(l))
1960 for i, s := range l {
1961 r[i] = strings.ToLower(s)
1962 }
1963 return r
1964 }
1965
1966 filterSubject := lower(q.Filter.Subject)
1967 notFilterSubject := lower(q.NotFilter.Subject)
1968 filterFrom := lower(q.Filter.From)
1969 notFilterFrom := lower(q.NotFilter.From)
1970 filterTo := lower(q.Filter.To)
1971 notFilterTo := lower(q.NotFilter.To)
1972
1973 return func(m store.Message) bool {
1974 if !state.ensurePart(m, false) {
1975 return false
1976 }
1977
1978 var env message.Envelope
1979 if state.part.Envelope != nil {
1980 env = *state.part.Envelope
1981 }
1982
1983 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1984 subject := strings.ToLower(env.Subject)
1985 for _, s := range filterSubject {
1986 if !strings.Contains(subject, s) {
1987 return false
1988 }
1989 }
1990 for _, s := range notFilterSubject {
1991 if strings.Contains(subject, s) {
1992 return false
1993 }
1994 }
1995 }
1996
1997 contains := func(textLower []string, l []message.Address, all bool) bool {
1998 next:
1999 for _, s := range textLower {
2000 for _, a := range l {
2001 name := strings.ToLower(a.Name)
2002 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2003 if strings.Contains(name, s) || strings.Contains(addr, s) {
2004 if !all {
2005 return true
2006 }
2007 continue next
2008 }
2009 }
2010 if all {
2011 return false
2012 }
2013 }
2014 return all
2015 }
2016
2017 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2018 return false
2019 }
2020 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2021 return false
2022 }
2023 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2024 to := slices.Concat(env.To, env.CC, env.BCC)
2025 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2026 return false
2027 }
2028 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2029 return false
2030 }
2031 }
2032 return true
2033 }
2034}
2035
2036// headerFilterFn returns a function that filters for the header filters in the
2037// query. A nil function is returned if there are no header filters.
2038func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2039 if len(q.Filter.Headers) == 0 {
2040 return nil
2041 }
2042
2043 lowerValues := make([]string, len(q.Filter.Headers))
2044 for i, t := range q.Filter.Headers {
2045 lowerValues[i] = strings.ToLower(t[1])
2046 }
2047
2048 return func(m store.Message) bool {
2049 if !state.ensurePart(m, true) {
2050 return false
2051 }
2052 hdr, err := state.part.Header()
2053 if err != nil {
2054 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2055 return false
2056 }
2057
2058 next:
2059 for i, t := range q.Filter.Headers {
2060 k := t[0]
2061 v := lowerValues[i]
2062 l := hdr.Values(k)
2063 if v == "" && len(l) > 0 {
2064 continue
2065 }
2066 for _, e := range l {
2067 if strings.Contains(strings.ToLower(e), v) {
2068 continue next
2069 }
2070 }
2071 return false
2072 }
2073 return true
2074 }
2075}
2076
2077// wordFiltersFn returns a function that applies the word filters of the query. A
2078// nil function is returned when query does not contain a word filter.
2079func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2080 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2081 return nil
2082 }
2083
2084 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2085
2086 return func(m store.Message) bool {
2087 if !state.ensurePart(m, true) {
2088 return false
2089 }
2090
2091 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2092 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
2093 return false
2094 } else {
2095 return ok
2096 }
2097 }
2098}
2099