1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
35)
36
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
39type Request struct {
40 ID int64
41
42 SSEID int64 // SSE connection.
43
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
46 ViewID int64
47
48 // If set, this request and its view are canceled. A new view must be started.
49 Cancel bool
50
51 Query Query
52 Page Page
53}
54
55type ThreadMode string
56
57const (
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
61)
62
63// Query is a request for messages that match filters, in a given order.
64type Query struct {
65 OrderAsc bool // Order by received ascending or desending.
66 Threading ThreadMode
67 Filter Filter
68 NotFilter NotFilter
69}
70
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
73
74const (
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
84)
85
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
88type Filter struct {
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
90 MailboxID int64
91
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
94
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
97 // MailboxID.
98 MailboxName string
99
100 Words []string // Case insensitive substring match for each string.
101 From []string
102 To []string // Including Cc and Bcc.
103 Oldest *time.Time
104 Newest *time.Time
105 Subject []string
106 Attachments AttachmentType
107 Labels []string
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
109 SizeMin int64
110 SizeMax int64
111}
112
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
115 Words []string
116 From []string
117 To []string
118 Subject []string
119 Attachments AttachmentType
120 Labels []string
121}
122
123// Page holds pagination parameters for a request.
124type Page struct {
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
128
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
130 // one request.
131 Count int
132
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
135 // message if found.
136 DestMessageID int64
137}
138
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
140
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
142// included.
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
146 Domain dns.Domain
147}
148
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
153 Date time.Time
154 Subject string
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
158 To []MessageAddress
159 CC []MessageAddress
160 BCC []MessageAddress
161 InReplyTo string
162 MessageID string
163}
164
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
167// messages.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
172 IsSigned bool
173 IsEncrypted bool
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
176}
177
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
180// not duplicated.
181type ParsedMessage struct {
182 ID int64
183 Part message.Part
184 Headers map[string][]string
185 ViewMode store.ViewMode
186
187 Texts []string // Contents of text parts, can be empty.
188
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
193 // attribute.
194 HasHTML bool
195
196 ListReplyAddress *MessageAddress // From List-Post.
197
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
200
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
204 isSigned bool
205 isEncrypted bool
206}
207
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
212 SSEID int64
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 MailboxName string
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
221 Version string
222}
223
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
226// account.
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
230}
231
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
235 ViewID int64
236 RequestID int64
237
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
247
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
250
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
253 ViewEnd bool
254}
255
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
259 ViewID int64
260 RequestID int64
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
263}
264
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
270 ViewID int64
271 RequestID int64
272}
273
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
278 ViewID int64
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
280}
281
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
284 store.ChangeAddUID
285 MessageItems []MessageItem
286}
287
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
291}
292
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
295 store.ChangeFlags
296}
297
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
300 store.ChangeThread
301}
302
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
306}
307
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
311}
312
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
317}
318
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
322}
323
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
327}
328
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
333}
334
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
339type view struct {
340 Request Request
341
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
345
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
349 End bool
350
351 // Whether message must or must not match mailboxIDs.
352 matchMailboxIDs bool
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
356
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
362}
363
364// sses tracks all sse connections, and access to them.
365var sses = struct {
366 sync.Mutex
367 gen int64
368 m map[int64]sse
369}{m: map[int64]sse{}}
370
371// sse represents an sse connection.
372type sse struct {
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
376}
377
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
380 sses.Lock()
381 defer sses.Unlock()
382 delete(sses.m, sse.ID)
383
384 // Drain any pending requests, preventing blocked goroutines from API calls.
385 for {
386 select {
387 case <-sse.Request:
388 default:
389 return
390 }
391 }
392}
393
394func sseRegister(accountName string) sse {
395 sses.Lock()
396 defer sses.Unlock()
397 sses.gen++
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
399 sses.m[v.ID] = v
400 return v
401}
402
403// sseGet returns a reference to an existing connection if it exists and user
404// has access.
405func sseGet(id int64, accountName string) (sse, bool) {
406 sses.Lock()
407 defer sses.Unlock()
408 s := sses.m[id]
409 if s.AccountName != accountName {
410 return sse{}, false
411 }
412 return s, true
413}
414
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
419 accName string
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422 validUntil time.Time
423}
424
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
428 sync.Mutex
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
431}
432
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
436}
437
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 var buf [16]byte
442 cryptrand.Read(buf[:])
443 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf[:]), accName, address, sessionToken, time.Now().Add(time.Minute)}
444
445 x.Lock()
446 defer x.Unlock()
447 n := len(x.accountTokens[accName])
448 if n >= 10 {
449 for _, ost := range x.accountTokens[accName][:n-9] {
450 delete(x.tokens, ost.token)
451 }
452 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
453 x.accountTokens[accName] = x.accountTokens[accName][:9]
454 }
455 x.accountTokens[accName] = append(x.accountTokens[accName], st)
456 x.tokens[st.token] = st
457 return st.token
458}
459
460// check verifies a token, and consumes it if valid.
461func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
462 x.Lock()
463 defer x.Unlock()
464
465 st, ok := x.tokens[token]
466 if !ok {
467 return "", "", "", false, nil
468 }
469 delete(x.tokens, token)
470 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
471 return "", "", "", false, errors.New("internal error, could not find token in account")
472 } else {
473 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
474 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
475 if len(x.accountTokens[st.accName]) == 0 {
476 delete(x.accountTokens, st.accName)
477 }
478 }
479 if time.Now().After(st.validUntil) {
480 return "", "", "", false, nil
481 }
482 return st.accName, st.address, st.sessionToken, true, nil
483}
484
485// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
486type ioErr struct {
487 err error
488}
489
490// ensure we have a non-nil moreHeaders, taking it from Settings.
491func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
492 if moreHeaders != nil {
493 return moreHeaders, nil
494 }
495
496 s := store.Settings{ID: 1}
497 if err := tx.Get(&s); err != nil {
498 return nil, fmt.Errorf("get settings: %v", err)
499 }
500 moreHeaders = s.ShowHeaders
501 if moreHeaders == nil {
502 moreHeaders = []string{} // Ensure we won't get Settings again next call.
503 }
504 return moreHeaders, nil
505}
506
507// serveEvents serves an SSE connection. Authentication is done through a query
508// string parameter "singleUseToken", a one-time-use token returned by the Token
509// API call.
510func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
511 if r.Method != "GET" {
512 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
513 return
514 }
515
516 flusher, ok := w.(http.Flusher)
517 if !ok {
518 log.Error("internal error: ResponseWriter not a http.Flusher")
519 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
520 return
521 }
522
523 q := r.URL.Query()
524 token := q.Get("singleUseToken")
525 if token == "" {
526 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
527 return
528 }
529 accName, address, sessionToken, ok, err := sseTokens.check(token)
530 if err != nil {
531 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
532 return
533 }
534 if !ok {
535 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
536 return
537 }
538 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
539 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
540 return
541 }
542
543 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
544 // incoming responses with its slow-network similation.
545 var waitMin, waitMax time.Duration
546 waitMinMsec := q.Get("waitMinMsec")
547 waitMaxMsec := q.Get("waitMaxMsec")
548 if waitMinMsec != "" && waitMaxMsec != "" {
549 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
550 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
551 return
552 } else {
553 waitMin = time.Duration(v) * time.Millisecond
554 }
555
556 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
557 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
558 return
559 } else {
560 waitMax = time.Duration(v) * time.Millisecond
561 }
562 }
563
564 // Parse the request with initial mailbox/search criteria.
565 var req Request
566 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
567 dec.DisallowUnknownFields()
568 if err := dec.Decode(&req); err != nil {
569 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
570 return
571 } else if req.Page.Count <= 0 {
572 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
573 return
574 }
575 if req.Query.Threading == "" {
576 req.Query.Threading = ThreadOff
577 }
578
579 var writer *eventWriter
580
581 metricSSEConnections.Inc()
582 defer metricSSEConnections.Dec()
583
584 // Below here, error handling cause through xcheckf, which panics with
585 // *sherpa.Error, after which we send an error event to the client. We can also get
586 // an *ioErr when the connection is broken.
587 defer func() {
588 x := recover()
589 if x == nil {
590 return
591 }
592 if err, ok := x.(*sherpa.Error); ok {
593 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
594 } else if _, ok := x.(ioErr); ok {
595 return
596 } else {
597 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
598 debug.PrintStack()
599 metrics.PanicInc(metrics.Webmail)
600 panic(x)
601 }
602 }()
603
604 h := w.Header()
605 h.Set("Content-Type", "text/event-stream")
606 h.Set("Cache-Control", "no-cache")
607
608 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
609 // keys), so should be quite compressible.
610 var out writeFlusher
611 gz := mox.AcceptsGzip(r)
612 if gz {
613 h.Set("Content-Encoding", "gzip")
614 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
615 } else {
616 out = nopFlusher{w}
617 }
618 out = httpFlusher{out, flusher}
619
620 // We'll be writing outgoing SSE events through writer.
621 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
622 defer writer.close()
623
624 // Fetch initial data.
625 acc, err := store.OpenAccount(log, accName, true)
626 xcheckf(ctx, err, "open account")
627 defer func() {
628 err := acc.Close()
629 log.Check(err, "closing account")
630 }()
631 comm := store.RegisterComm(acc)
632 defer comm.Unregister()
633
634 // List addresses that the client can use to send email from.
635 accConf, _ := acc.Conf()
636 loginAddr, err := smtp.ParseAddress(address)
637 xcheckf(ctx, err, "parsing login address")
638 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
639 xcheckf(ctx, err, "looking up destination for login address")
640 loginName := accConf.FullName
641 if dest.FullName != "" {
642 loginName = dest.FullName
643 }
644 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
645 var addresses []MessageAddress
646 for a, dest := range accConf.Destinations {
647 name := dest.FullName
648 if name == "" {
649 name = accConf.FullName
650 }
651 var ma MessageAddress
652 if strings.HasPrefix(a, "@") {
653 dom, err := dns.ParseDomain(a[1:])
654 xcheckf(ctx, err, "parsing destination address for account")
655 ma = MessageAddress{Domain: dom}
656 } else {
657 addr, err := smtp.ParseAddress(a)
658 xcheckf(ctx, err, "parsing destination address for account")
659 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
660 }
661 addresses = append(addresses, ma)
662 }
663 // User is allowed to send using alias address as message From address. Webmail
664 // will choose it when replying to a message sent to that address.
665 aliasAddrs := map[MessageAddress]bool{}
666 for _, a := range accConf.Aliases {
667 if a.Alias.AllowMsgFrom {
668 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
669 if !aliasAddrs[ma] {
670 addresses = append(addresses, ma)
671 }
672 aliasAddrs[ma] = true
673 }
674 }
675
676 // We implicitly start a query. We use the reqctx for the transaction, because the
677 // transaction is passed to the query, which can be canceled.
678 reqctx, reqctxcancel := context.WithCancel(ctx)
679 defer func() {
680 // We also cancel in cancelDrain later on, but there is a brief window where the
681 // context wouldn't be canceled.
682 if reqctxcancel != nil {
683 reqctxcancel()
684 reqctxcancel = nil
685 }
686 }()
687
688 // qtx is kept around during connection initialization, until we pass it off to the
689 // goroutine that starts querying for messages.
690 var qtx *bstore.Tx
691 defer func() {
692 if qtx != nil {
693 err := qtx.Rollback()
694 log.Check(err, "rolling back")
695 }
696 }()
697
698 var mbl []store.Mailbox
699 settings := store.Settings{ID: 1}
700
701 // We only take the rlock when getting the tx.
702 acc.WithRLock(func() {
703 // Now a read-only transaction we'll use during the query.
704 qtx, err = acc.DB.Begin(reqctx, false)
705 xcheckf(ctx, err, "begin transaction")
706
707 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
708 xcheckf(ctx, err, "list mailboxes")
709
710 err = qtx.Get(&settings)
711 xcheckf(ctx, err, "get settings")
712 })
713
714 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
715 var zerofilter Filter
716 var zeronotfilter NotFilter
717 var mailbox store.Mailbox
718 var mailboxPrefixes []string
719 var matchMailboxes bool
720 mailboxIDs := map[int64]bool{}
721 mailboxName := req.Query.Filter.MailboxName
722 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
723 if mailboxName == "" {
724 mailboxName = "Inbox"
725 }
726
727 var inbox store.Mailbox
728 for _, e := range mbl {
729 if e.Name == mailboxName {
730 mailbox = e
731 }
732 if e.Name == "Inbox" {
733 inbox = e
734 }
735 }
736 if mailbox.ID == 0 {
737 mailbox = inbox
738 }
739 if mailbox.ID == 0 {
740 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
741 }
742 req.Query.Filter.MailboxID = mailbox.ID
743 req.Query.Filter.MailboxName = ""
744 mailboxPrefixes = []string{mailbox.Name + "/"}
745 matchMailboxes = true
746 mailboxIDs[mailbox.ID] = true
747 } else {
748 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
749 }
750 if req.Query.Filter.MailboxChildrenIncluded {
751 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
752 }
753
754 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
755
756 sse := sseRegister(acc.Name)
757 defer sse.unregister()
758
759 // Per-domain localpart config so webclient can decide if an address belongs to the account.
760 domainAddressConfigs := map[string]DomainAddressConfig{}
761 for _, a := range addresses {
762 dom, _ := mox.Conf.Domain(a.Domain)
763 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
764 }
765
766 // Write first event, allowing client to fill its UI with mailboxes.
767 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
768 writer.xsendEvent(ctx, log, "start", start)
769
770 // The goroutine doing the querying will send messages on these channels, which
771 // result in an event being written on the SSE connection.
772 viewMsgsc := make(chan EventViewMsgs)
773 viewErrc := make(chan EventViewErr)
774 viewResetc := make(chan EventViewReset)
775 donec := make(chan int64) // When request is done.
776
777 // Start a view, it determines if we send a change to the client. And start an
778 // implicit query for messages, we'll send the messages to the client which can
779 // fill its ui with messages.
780 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
781 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
782 qtx = nil // viewRequestTx closes qtx
783
784 // When canceling a query, we must drain its messages until it says it is done.
785 // Otherwise the sending goroutine would hang indefinitely on a channel send.
786 cancelDrain := func() {
787 if reqctxcancel != nil {
788 // Cancel the goroutine doing the querying.
789 reqctxcancel()
790 reqctx = nil
791 reqctxcancel = nil
792 } else {
793 return
794 }
795
796 // Drain events until done.
797 for {
798 select {
799 case <-viewMsgsc:
800 case <-viewErrc:
801 case <-viewResetc:
802 case <-donec:
803 return
804 }
805 }
806 }
807
808 // If we stop and a query is in progress, we must drain the channel it will send on.
809 defer cancelDrain()
810
811 // Changes broadcasted by other connections on this account. If applicable for the
812 // connection/view, we send events.
813 xprocessChanges := func(changes []store.Change) {
814 taggedChanges := [][2]any{}
815
816 newPreviews := map[int64]string{}
817 defer storeNewPreviews(ctx, log, acc, newPreviews)
818
819 // We get a transaction first time we need it.
820 var xtx *bstore.Tx
821 defer func() {
822 if xtx != nil {
823 err := xtx.Rollback()
824 log.Check(err, "rolling back transaction")
825 }
826 }()
827 ensureTx := func() error {
828 if xtx != nil {
829 return nil
830 }
831 acc.RLock()
832 defer acc.RUnlock()
833 var err error
834 xtx, err = acc.DB.Begin(ctx, false)
835 return err
836 }
837 // This getmsg will now only be called mailboxID+UID, not with messageID set.
838 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
839 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
840 if err := ensureTx(); err != nil {
841 return store.Message{}, fmt.Errorf("transaction: %v", err)
842 }
843 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
844 }
845
846 // Additional headers from settings to add to MessageItems.
847 var moreHeaders []string
848 xmoreHeaders := func() []string {
849 err := ensureTx()
850 xcheckf(ctx, err, "transaction")
851
852 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
853 xcheckf(ctx, err, "ensuring more headers")
854 return moreHeaders
855 }
856
857 // Return uids that are within range in view. Because the end has been reached, or
858 // because the UID is not after the last message.
859 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
860 uidsAny := make([]any, len(uids))
861 for i, uid := range uids {
862 uidsAny[i] = uid
863 }
864 err := ensureTx()
865 xcheckf(ctx, err, "transaction")
866 q := bstore.QueryTx[store.Message](xtx)
867 q.FilterNonzero(store.Message{MailboxID: mailboxID})
868 q.FilterEqual("UID", uidsAny...)
869 mbOK := v.matchesMailbox(mailboxID)
870 err = q.ForEach(func(m store.Message) error {
871 _, thread := v.threadIDs[m.ThreadID]
872 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
873 changedUIDs = append(changedUIDs, m.UID)
874 }
875 return nil
876 })
877 xcheckf(ctx, err, "fetching messages for change")
878 return changedUIDs
879 }
880
881 // Forward changes that are relevant to the current view.
882 for _, change := range changes {
883 switch c := change.(type) {
884 case store.ChangeAddUID:
885 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
886 xcheckf(ctx, err, "matching new message against view")
887 m, err := getmsg(0, c.MailboxID, c.UID)
888 xcheckf(ctx, err, "get message")
889 _, thread := v.threadIDs[m.ThreadID]
890 if !ok && !thread {
891 continue
892 }
893
894 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
895 mi, err := messageItem(log, m, &state, xmoreHeaders())
896 state.clear()
897 xcheckf(ctx, err, "make messageitem")
898 mi.MatchQuery = ok
899
900 mil := []MessageItem{mi}
901 if !thread && req.Query.Threading != ThreadOff {
902 err := ensureTx()
903 xcheckf(ctx, err, "transaction")
904 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
905 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
906 mil = append(mil, more...)
907 v.threadIDs[m.ThreadID] = struct{}{}
908 }
909
910 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
911
912 // If message extends the view, store it as such.
913 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
914 v.LastMessageReceived = m.Received
915 }
916
917 case store.ChangeRemoveUIDs:
918 comm.RemovalSeen(c)
919
920 // We may send changes for uids the client doesn't know, that's fine.
921 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
922 if len(changedUIDs) == 0 {
923 continue
924 }
925 ch := ChangeMsgRemove{c}
926 ch.UIDs = changedUIDs
927 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
928
929 case store.ChangeFlags:
930 // We may send changes for uids the client doesn't know, that's fine.
931 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
932 if len(changedUIDs) == 0 {
933 continue
934 }
935 ch := ChangeMsgFlags{c}
936 ch.UID = changedUIDs[0]
937 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
938
939 case store.ChangeThread:
940 // Change in muted/collaped state, just always ship it.
941 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
942
943 case store.ChangeRemoveMailbox:
944 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
945
946 case store.ChangeAddMailbox:
947 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
948
949 case store.ChangeRenameMailbox:
950 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
951
952 case store.ChangeMailboxCounts:
953 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
954
955 case store.ChangeMailboxSpecialUse:
956 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
957
958 case store.ChangeMailboxKeywords:
959 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
960
961 case store.ChangeAddSubscription, store.ChangeRemoveSubscription:
962 // Webmail does not care about subscriptions.
963
964 case store.ChangeAnnotation:
965 // Nothing.
966
967 default:
968 panic(fmt.Sprintf("missing case for change %T", c))
969 }
970 }
971
972 if len(taggedChanges) > 0 {
973 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
974 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
975 }
976 }
977
978 timer := time.NewTimer(5 * time.Minute) // For keepalives.
979 defer timer.Stop()
980 for {
981 if writer.wrote {
982 timer.Reset(5 * time.Minute)
983 writer.wrote = false
984 }
985
986 pending := comm.Pending
987 if reqctx != nil {
988 pending = nil
989 }
990
991 select {
992 case <-mox.Shutdown.Done():
993 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
994 // Work around go vet, it doesn't see defer cancelDrain.
995 if reqctxcancel != nil {
996 reqctxcancel()
997 }
998 return
999
1000 case <-timer.C:
1001 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1002 if err == nil {
1003 err = out.Flush()
1004 }
1005 if err != nil {
1006 log.Errorx("write keepalive", err)
1007 // Work around go vet, it doesn't see defer cancelDrain.
1008 if reqctxcancel != nil {
1009 reqctxcancel()
1010 }
1011 return
1012 }
1013 writer.wrote = true
1014
1015 case vm := <-viewMsgsc:
1016 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1017 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1018 }
1019 if vm.ViewEnd {
1020 v.End = true
1021 }
1022 if len(vm.MessageItems) > 0 {
1023 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1024 }
1025 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1026
1027 case ve := <-viewErrc:
1028 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1029 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1030 }
1031 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1032 // Work around go vet, it doesn't see defer cancelDrain.
1033 if reqctxcancel != nil {
1034 reqctxcancel()
1035 }
1036 return
1037 }
1038 writer.xsendEvent(ctx, log, "viewErr", ve)
1039
1040 case vr := <-viewResetc:
1041 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1042 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1043 }
1044 writer.xsendEvent(ctx, log, "viewReset", vr)
1045
1046 case id := <-donec:
1047 if id != v.Request.ID {
1048 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1049 }
1050 if reqctxcancel != nil {
1051 reqctxcancel()
1052 }
1053 reqctx = nil
1054 reqctxcancel = nil
1055
1056 case req := <-sse.Request:
1057 if reqctx != nil {
1058 cancelDrain()
1059 }
1060 if req.Cancel {
1061 v = view{req, time.Time{}, false, false, nil, nil}
1062 continue
1063 }
1064
1065 reqctx, reqctxcancel = context.WithCancel(ctx)
1066
1067 stop := func() (stop bool) {
1068 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1069 var rtx *bstore.Tx
1070 var err error
1071 defer func() {
1072 if rtx != nil {
1073 err = rtx.Rollback()
1074 log.Check(err, "rolling back transaction")
1075 }
1076 }()
1077 acc.WithRLock(func() {
1078 rtx, err = acc.DB.Begin(reqctx, false)
1079 })
1080 if err != nil {
1081 reqctxcancel()
1082 reqctx = nil
1083 reqctxcancel = nil
1084
1085 if errors.Is(err, context.Canceled) {
1086 return true
1087 }
1088 err := fmt.Errorf("begin transaction: %v", err)
1089 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1090 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1091 return false
1092 }
1093
1094 // Reset view state for new query.
1095 if req.ViewID != v.Request.ViewID {
1096 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1097 if req.Query.Filter.MailboxChildrenIncluded {
1098 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1099 }
1100 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1101 } else {
1102 v.Request = req
1103 }
1104 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1105 rtx = nil
1106 return false
1107 }()
1108 if stop {
1109 return
1110 }
1111
1112 case <-pending:
1113 overflow, changes := comm.Get()
1114 if overflow {
1115 writer.xsendEvent(ctx, log, "fatalErr", "out of sync, too many pending changes")
1116 return
1117 }
1118 xprocessChanges(changes)
1119
1120 case <-ctx.Done():
1121 // Work around go vet, it doesn't see defer cancelDrain.
1122 if reqctxcancel != nil {
1123 reqctxcancel()
1124 }
1125 return
1126 }
1127 }
1128}
1129
1130// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1131// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1132// mailboxIDs must or must not match. mailboxPrefixes is for use with
1133// xgatherMailboxIDs to gather children of the mailboxIDs.
1134func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1135 matchMailboxes = true
1136 mailboxIDs = map[int64]bool{}
1137 if f.MailboxID == -1 {
1138 matchMailboxes = false
1139 // Add the trash, junk and account rejects mailbox.
1140 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1141 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1142 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1143 mailboxIDs[mb.ID] = true
1144 }
1145 return nil
1146 })
1147 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1148 } else if f.MailboxID > 0 {
1149 mb, err := store.MailboxID(tx, f.MailboxID)
1150 xcheckf(ctx, err, "get mailbox")
1151 mailboxIDs[f.MailboxID] = true
1152 mailboxPrefixes = []string{mb.Name + "/"}
1153 }
1154 return
1155}
1156
1157// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1158// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1159func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1160 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1161 if len(mailboxPrefixes) == 0 {
1162 return
1163 }
1164 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1165 for _, p := range mailboxPrefixes {
1166 if strings.HasPrefix(mb.Name, p) {
1167 mailboxIDs[mb.ID] = true
1168 break
1169 }
1170 }
1171 return nil
1172 })
1173 xcheckf(ctx, err, "gathering mailboxes")
1174}
1175
1176// matchesMailbox returns whether a mailbox matches the view.
1177func (v view) matchesMailbox(mailboxID int64) bool {
1178 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1179}
1180
1181// inRange returns whether m is within the range for the view, whether a change for
1182// this message should be sent to the client so it can update its state.
1183func (v view) inRange(m store.Message) bool {
1184 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1185}
1186
1187// matches checks if the message, identified by either messageID or mailboxID+UID,
1188// is in the current "view" (i.e. passing the filters, and if checkRange is set
1189// also if within the range of sent messages based on sort order and the last seen
1190// message). getmsg retrieves the message, which may be necessary depending on the
1191// active filters. Used to determine if a store.Change with a new message should be
1192// sent, and for the destination and anchor messages in view requests.
1193func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1194 var m store.Message
1195 ensureMessage := func() bool {
1196 if m.ID == 0 && rerr == nil {
1197 m, rerr = getmsg(messageID, mailboxID, uid)
1198 }
1199 return rerr == nil
1200 }
1201
1202 q := v.Request.Query
1203
1204 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1205
1206 // Check filters.
1207 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1208 return false, rerr
1209 }
1210 // note: anchorMessageID is not relevant for matching.
1211 flagfilter := q.flagFilterFn()
1212 if flagfilter != nil && !flagfilter(flags, keywords) {
1213 return false, rerr
1214 }
1215
1216 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1217 return false, rerr
1218 }
1219 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1220 return false, rerr
1221 }
1222
1223 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1224 return false, rerr
1225 }
1226 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1227 return false, rerr
1228 }
1229
1230 state := msgState{acc: acc, log: log}
1231 defer func() {
1232 if rerr == nil && state.err != nil {
1233 rerr = state.err
1234 }
1235 state.clear()
1236 }()
1237
1238 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1239 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1240 return false, rerr
1241 }
1242
1243 envFilter := q.envFilterFn(log, &state)
1244 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1245 return false, rerr
1246 }
1247
1248 headerFilter := q.headerFilterFn(log, &state)
1249 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1250 return false, rerr
1251 }
1252
1253 wordsFilter := q.wordsFilterFn(log, &state)
1254 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1255 return false, rerr
1256 }
1257
1258 // Now check that we are either within the sorting order, or "last" was sent.
1259 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1260 return true, rerr
1261 }
1262 return false, rerr
1263}
1264
1265type msgResp struct {
1266 err error // If set, an error happened and fields below are not set.
1267 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1268 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1269 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1270 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1271}
1272
1273func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1274 if len(newPreviews) == 0 {
1275 return
1276 }
1277
1278 defer func() {
1279 x := recover()
1280 if x != nil {
1281 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1282 debug.PrintStack()
1283 metrics.PanicInc(metrics.Store)
1284 }
1285 }()
1286
1287 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1288 for id, preview := range newPreviews {
1289 m := store.Message{ID: id}
1290 if err := tx.Get(&m); err != nil {
1291 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1292 } else if !m.Expunged {
1293 m.Preview = &preview
1294 if err := tx.Update(&m); err != nil {
1295 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1296 }
1297 }
1298 }
1299 return nil
1300 })
1301 log.Check(err, "saving new previews with messages")
1302}
1303
1304// viewRequestTx executes a request (query with filters, pagination) by
1305// launching a new goroutine with queryMessages, receiving results as msgResp,
1306// and sending Event* to the SSE connection.
1307//
1308// It always closes tx.
1309func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1310 // Newly generated previews which we'll save when the operation is done.
1311 newPreviews := map[int64]string{}
1312
1313 defer func() {
1314 err := tx.Rollback()
1315 log.Check(err, "rolling back query transaction")
1316
1317 donec <- v.Request.ID
1318
1319 // ctx can be canceled, we still want to store the previews.
1320 storeNewPreviews(context.Background(), log, acc, newPreviews)
1321
1322 x := recover() // Should not happen, but don't take program down if it does.
1323 if x != nil {
1324 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1325 debug.PrintStack()
1326 metrics.PanicInc(metrics.Webmailrequest)
1327 }
1328 }()
1329
1330 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1331 var parsedMessage *ParsedMessage
1332 var viewEnd bool
1333
1334 var immediate bool // No waiting, flush immediate.
1335 t := time.NewTimer(300 * time.Millisecond)
1336 defer t.Stop()
1337
1338 sendViewMsgs := func(force bool) {
1339 if len(msgitems) == 0 && !force {
1340 return
1341 }
1342
1343 immediate = false
1344 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1345 msgitems = nil
1346 parsedMessage = nil
1347 t.Reset(300 * time.Millisecond)
1348 }
1349
1350 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1351
1352 mrc := make(chan msgResp, 1)
1353 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1354
1355 for {
1356 select {
1357 case mr, ok := <-mrc:
1358 if !ok {
1359 sendViewMsgs(false)
1360 // Empty message list signals this query is done.
1361 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1362 return
1363 }
1364 if mr.err != nil {
1365 sendViewMsgs(false)
1366 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1367 return
1368 }
1369 if mr.reset {
1370 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1371 continue
1372 }
1373 if mr.viewEnd {
1374 viewEnd = true
1375 sendViewMsgs(true)
1376 return
1377 }
1378
1379 msgitems = append(msgitems, mr.mil)
1380 if mr.pm != nil {
1381 parsedMessage = mr.pm
1382 }
1383 if immediate {
1384 sendViewMsgs(true)
1385 }
1386
1387 case <-t.C:
1388 if len(msgitems) == 0 {
1389 // Nothing to send yet. We'll send immediately when the next message comes in.
1390 immediate = true
1391 } else {
1392 sendViewMsgs(false)
1393 }
1394 }
1395 }
1396}
1397
1398// queryMessages executes a query, with filter, pagination, destination message id
1399// to fetch (the message that the client had in view and wants to display again).
1400// It sends on msgc, with several types of messages: errors, whether the view is
1401// reset due to missing AnchorMessageID, and when the end of the view was reached
1402// and/or for a message.
1403// newPreviews is filled with previews, the caller must save them.
1404func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1405 defer func() {
1406 x := recover() // Should not happen, but don't take program down if it does.
1407 if x != nil {
1408 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1409 debug.PrintStack()
1410 mrc <- msgResp{err: fmt.Errorf("query failed")}
1411 metrics.PanicInc(metrics.Webmailquery)
1412 }
1413
1414 close(mrc)
1415 }()
1416
1417 query := v.Request.Query
1418 page := v.Request.Page
1419
1420 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1421
1422 checkMessage := func(id int64) (valid bool, rerr error) {
1423 m := store.Message{ID: id}
1424 err := tx.Get(&m)
1425 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1426 return false, nil
1427 } else if err != nil {
1428 return false, err
1429 } else {
1430 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1431 return m, nil
1432 })
1433 }
1434 }
1435
1436 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1437 if page.AnchorMessageID > 0 {
1438 // Check if message exists and (still) matches the filter.
1439 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1440 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1441 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1442 return
1443 } else if !valid {
1444 mrc <- msgResp{reset: true}
1445 page.AnchorMessageID = 0
1446 }
1447 }
1448
1449 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1450 // it instead of continuing to send message till the end of the view.
1451 if page.DestMessageID > 0 {
1452 if valid, err := checkMessage(page.DestMessageID); err != nil {
1453 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1454 return
1455 } else if !valid {
1456 page.DestMessageID = 0
1457 }
1458 }
1459
1460 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1461
1462 q := bstore.QueryTx[store.Message](tx)
1463 q.FilterEqual("Expunged", false)
1464 if len(v.mailboxIDs) > 0 {
1465 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1466 // Should result in fast indexed query.
1467 for mbID := range v.mailboxIDs {
1468 q.FilterNonzero(store.Message{MailboxID: mbID})
1469 }
1470 } else {
1471 idsAny := make([]any, 0, len(v.mailboxIDs))
1472 for mbID := range v.mailboxIDs {
1473 idsAny = append(idsAny, mbID)
1474 }
1475 if v.matchMailboxIDs {
1476 q.FilterEqual("MailboxID", idsAny...)
1477 } else {
1478 q.FilterNotEqual("MailboxID", idsAny...)
1479 }
1480 }
1481 }
1482
1483 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1484 if page.AnchorMessageID > 0 {
1485 var seen = false
1486 q.FilterFn(func(m store.Message) bool {
1487 if seen {
1488 return true
1489 }
1490 seen = m.ID == page.AnchorMessageID
1491 return false
1492 })
1493 }
1494
1495 // We may be added filters the the query below. The FilterFn signature does not
1496 // implement reporting errors, or anything else, just a bool. So when making the
1497 // filter functions, we give them a place to store parsed message state, and an
1498 // error. We check the error during and after query execution.
1499 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1500 defer state.clear()
1501
1502 flagfilter := query.flagFilterFn()
1503 if flagfilter != nil {
1504 q.FilterFn(func(m store.Message) bool {
1505 return flagfilter(m.Flags, m.Keywords)
1506 })
1507 }
1508
1509 if query.Filter.Oldest != nil {
1510 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1511 }
1512 if query.Filter.Newest != nil {
1513 q.FilterLessEqual("Received", *query.Filter.Newest)
1514 }
1515
1516 if query.Filter.SizeMin > 0 {
1517 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1518 }
1519 if query.Filter.SizeMax > 0 {
1520 q.FilterLessEqual("Size", query.Filter.SizeMax)
1521 }
1522
1523 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1524 if attachmentFilter != nil {
1525 q.FilterFn(attachmentFilter)
1526 }
1527
1528 envFilter := query.envFilterFn(log, &state)
1529 if envFilter != nil {
1530 q.FilterFn(envFilter)
1531 }
1532
1533 headerFilter := query.headerFilterFn(log, &state)
1534 if headerFilter != nil {
1535 q.FilterFn(headerFilter)
1536 }
1537
1538 wordsFilter := query.wordsFilterFn(log, &state)
1539 if wordsFilter != nil {
1540 q.FilterFn(wordsFilter)
1541 }
1542
1543 var moreHeaders []string // From store.Settings.ShowHeaders
1544
1545 if query.OrderAsc {
1546 q.SortAsc("Received")
1547 } else {
1548 q.SortDesc("Received")
1549 }
1550 found := page.DestMessageID <= 0
1551 end := true
1552 have := 0
1553 err := q.ForEach(func(m store.Message) error {
1554 // Check for an error in one of the filters, propagate it.
1555 if state.err != nil {
1556 return state.err
1557 }
1558
1559 if have >= page.Count && found || have > 10000 {
1560 end = false
1561 return bstore.StopForEach
1562 }
1563
1564 if _, ok := v.threadIDs[m.ThreadID]; ok {
1565 // Message was already returned as part of a thread.
1566 return nil
1567 }
1568
1569 var pm *ParsedMessage
1570 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1571 // For threads, if there was no DestMessageID, we may be getting the newest
1572 // message. For an initial view, this isn't necessarily the first the user is
1573 // expected to read first, that would be the first unread, which we'll get below
1574 // when gathering the thread.
1575 found = true
1576 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1577 if err != nil && errors.Is(err, message.ErrHeader) {
1578 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1579 } else if err != nil {
1580 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1581 } else {
1582 pm = &xpm
1583 }
1584 }
1585
1586 var err error
1587 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1588 if err != nil {
1589 return fmt.Errorf("ensuring more headers: %v", err)
1590 }
1591
1592 mi, err := messageItem(log, m, &state, moreHeaders)
1593 if err != nil {
1594 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1595 }
1596 mil := []MessageItem{mi}
1597 if query.Threading != ThreadOff {
1598 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1599 if err != nil {
1600 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1601 }
1602 if xpm != nil {
1603 pm = xpm
1604 found = true
1605 }
1606 mil = append(mil, more...)
1607 v.threadIDs[m.ThreadID] = struct{}{}
1608
1609 // Calculate how many messages the frontend is going to show, and only count those as returned.
1610 collapsed := map[int64]bool{}
1611 for _, mi := range mil {
1612 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1613 }
1614 unread := map[int64]bool{} // Propagated to thread root.
1615 if query.Threading == ThreadUnread {
1616 for _, mi := range mil {
1617 mm := mi.Message
1618 if mm.Seen {
1619 continue
1620 }
1621 unread[mm.ID] = true
1622 for _, id := range mm.ThreadParentIDs {
1623 unread[id] = true
1624 }
1625 }
1626 }
1627 for _, mi := range mil {
1628 mm := mi.Message
1629 threadRoot := true
1630 rootID := mm.ID
1631 for _, id := range mm.ThreadParentIDs {
1632 if _, ok := collapsed[id]; ok {
1633 threadRoot = false
1634 rootID = id
1635 }
1636 }
1637 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1638 have++
1639 }
1640 }
1641 } else {
1642 have++
1643 }
1644 if pm != nil && len(pm.envelope.From) == 1 {
1645 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1646 if err != nil {
1647 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1648 }
1649 }
1650 mrc <- msgResp{mil: mil, pm: pm}
1651 return nil
1652 })
1653 // Check for an error in one of the filters again. Check in ForEach would not
1654 // trigger if the last message has the error.
1655 if err == nil && state.err != nil {
1656 err = state.err
1657 }
1658 if err != nil {
1659 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1660 return
1661 }
1662 if end {
1663 mrc <- msgResp{viewEnd: true}
1664 }
1665}
1666
1667func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1668 if m.ThreadID == 0 {
1669 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1670 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1671 }
1672
1673 // Fetch other messages for this thread.
1674 qt := bstore.QueryTx[store.Message](tx)
1675 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1676 qt.FilterEqual("Expunged", false)
1677 qt.FilterNotEqual("ID", m.ID)
1678 qt.SortAsc("ID")
1679 tml, err := qt.List()
1680 if err != nil {
1681 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1682 }
1683
1684 var mil []MessageItem
1685 var pm *ParsedMessage
1686 var firstUnread bool
1687 for _, tm := range tml {
1688 err := func() error {
1689 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1690 defer xstate.clear()
1691
1692 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1693 if err != nil {
1694 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1695 }
1696 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1697 return tm, nil
1698 })
1699 if err != nil {
1700 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1701 }
1702 mil = append(mil, mi)
1703
1704 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1705 firstUnread = !tm.Seen
1706 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1707 if err != nil && errors.Is(err, message.ErrHeader) {
1708 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1709 } else if err != nil {
1710 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1711 } else {
1712 pm = &xpm
1713 }
1714 }
1715 return nil
1716 }()
1717 if err != nil {
1718 return nil, nil, err
1719 }
1720 }
1721
1722 // Finally, the message that caused us to gather this thread (which is likely the
1723 // most recent message in the thread) could be the only unread message.
1724 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1725 xstate := msgState{acc: acc, log: log}
1726 defer xstate.clear()
1727 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1728 if err != nil && errors.Is(err, message.ErrHeader) {
1729 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1730 } else if err != nil {
1731 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1732 } else {
1733 pm = &xpm
1734 }
1735 }
1736
1737 return mil, pm, nil
1738}
1739
1740// While checking the filters on a message, we may need to get more message
1741// details as each filter passes. We check the filters that need the basic
1742// information first, and load and cache more details for the next filters.
1743// msgState holds parsed details for a message, it is updated while filtering,
1744// with more information or reset for a next message.
1745type msgState struct {
1746 acc *store.Account // Never changes during lifetime.
1747 err error // Once set, doesn't get cleared.
1748 m store.Message
1749 part *message.Part // Will be without Reader when msgr is nil.
1750 msgr *store.MsgReader
1751 log mlog.Log
1752
1753 // If not nil, messages will get their Preview field filled when nil, and message
1754 // id and preview added to newPreviews, and saved in a separate write transaction
1755 // when the operation is done.
1756 newPreviews map[int64]string
1757}
1758
1759func (ms *msgState) clear() {
1760 if ms.msgr != nil {
1761 err := ms.msgr.Close()
1762 ms.log.Check(err, "closing message reader from state")
1763 ms.msgr = nil
1764 }
1765 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1766}
1767
1768func (ms *msgState) ensureMsg(m store.Message) {
1769 if m.ID != ms.m.ID {
1770 ms.clear()
1771 }
1772 ms.m = m
1773}
1774
1775func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1776 ms.ensureMsg(m)
1777
1778 if ms.err == nil {
1779 if ms.part == nil {
1780 if m.ParsedBuf == nil {
1781 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1782 return false
1783 }
1784 var p message.Part
1785 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1786 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1787 return false
1788 }
1789 ms.part = &p
1790 }
1791 if withMsgReader && ms.msgr == nil {
1792 ms.msgr = ms.acc.MessageReader(m)
1793 ms.part.SetReaderAt(ms.msgr)
1794 }
1795 }
1796 return ms.part != nil
1797}
1798
1799// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1800// filters for a query. A nil function is returned if there are no flags to filter
1801// on.
1802func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1803 labels := map[string]bool{}
1804 for _, k := range q.Filter.Labels {
1805 labels[k] = true
1806 }
1807 for _, k := range q.NotFilter.Labels {
1808 labels[k] = false
1809 }
1810
1811 if len(labels) == 0 {
1812 return nil
1813 }
1814
1815 var mask, flags store.Flags
1816 systemflags := map[string][]*bool{
1817 `\answered`: {&mask.Answered, &flags.Answered},
1818 `\flagged`: {&mask.Flagged, &flags.Flagged},
1819 `\deleted`: {&mask.Deleted, &flags.Deleted},
1820 `\seen`: {&mask.Seen, &flags.Seen},
1821 `\draft`: {&mask.Draft, &flags.Draft},
1822 `$junk`: {&mask.Junk, &flags.Junk},
1823 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1824 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1825 `$phishing`: {&mask.Phishing, &flags.Phishing},
1826 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1827 }
1828 keywords := map[string]bool{}
1829 for k, v := range labels {
1830 k = strings.ToLower(k)
1831 if mf, ok := systemflags[k]; ok {
1832 *mf[0] = true
1833 *mf[1] = v
1834 } else {
1835 keywords[k] = v
1836 }
1837 }
1838 return func(msgFlags store.Flags, msgKeywords []string) bool {
1839 var f store.Flags
1840 if f.Set(mask, msgFlags) != flags {
1841 return false
1842 }
1843 for k, v := range keywords {
1844 if slices.Contains(msgKeywords, k) != v {
1845 return false
1846 }
1847 }
1848 return true
1849 }
1850}
1851
1852// attachmentFilterFn returns a function that filters for the attachment-related
1853// filter from the query. A nil function is returned if there are attachment
1854// filters.
1855func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1856 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1857 return nil
1858 }
1859
1860 return func(m store.Message) bool {
1861 if !state.ensurePart(m, true) {
1862 return false
1863 }
1864 types, err := attachmentTypes(log, m, state)
1865 if err != nil {
1866 state.err = err
1867 return false
1868 }
1869 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1870 }
1871}
1872
1873var attachmentMimetypes = map[string]AttachmentType{
1874 "application/pdf": AttachmentPDF,
1875 "application/zip": AttachmentArchive,
1876 "application/x-rar-compressed": AttachmentArchive,
1877 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1878 "application/vnd.ms-excel": AttachmentSpreadsheet,
1879 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1880 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1881 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1882 "application/vnd.ms-powerpoint": AttachmentPresentation,
1883 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1884}
1885var attachmentExtensions = map[string]AttachmentType{
1886 ".pdf": AttachmentPDF,
1887 ".zip": AttachmentArchive,
1888 ".tar": AttachmentArchive,
1889 ".tgz": AttachmentArchive,
1890 ".tar.gz": AttachmentArchive,
1891 ".tbz2": AttachmentArchive,
1892 ".tar.bz2": AttachmentArchive,
1893 ".tar.lz": AttachmentArchive,
1894 ".tlz": AttachmentArchive,
1895 ".tar.xz": AttachmentArchive,
1896 ".txz": AttachmentArchive,
1897 ".tar.zst": AttachmentArchive,
1898 ".tar.lz4": AttachmentArchive,
1899 ".7z": AttachmentArchive,
1900 ".rar": AttachmentArchive,
1901 ".ods": AttachmentSpreadsheet,
1902 ".xls": AttachmentSpreadsheet,
1903 ".xlsx": AttachmentSpreadsheet,
1904 ".odt": AttachmentDocument,
1905 ".doc": AttachmentDocument,
1906 ".docx": AttachmentDocument,
1907 ".odp": AttachmentPresentation,
1908 ".ppt": AttachmentPresentation,
1909 ".pptx": AttachmentPresentation,
1910}
1911
1912func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1913 types := map[AttachmentType]bool{}
1914
1915 pm, err := parsedMessage(log, &m, state, false, false, false)
1916 if err != nil {
1917 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1918 }
1919 for _, a := range pm.attachments {
1920 if a.Part.MediaType == "IMAGE" {
1921 types[AttachmentImage] = true
1922 continue
1923 }
1924 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1925 if t, ok := attachmentMimetypes[mt]; ok {
1926 types[t] = true
1927 continue
1928 }
1929 _, filename, err := a.Part.DispositionFilename()
1930 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1931 log.Debugx("parsing disposition/filename", err)
1932 } else if err != nil {
1933 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1934 }
1935 if ext := filepath.Ext(filename); ext != "" {
1936 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1937 types[t] = true
1938 }
1939 }
1940 }
1941
1942 if len(types) == 0 {
1943 types[AttachmentNone] = true
1944 } else {
1945 types[AttachmentAny] = true
1946 }
1947 return types, nil
1948}
1949
1950// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1951// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1952// clash with SMTP envelope) for the query. A nil function is returned if no
1953// filtering is needed.
1954func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1955 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1956 return nil
1957 }
1958
1959 lower := func(l []string) []string {
1960 if len(l) == 0 {
1961 return nil
1962 }
1963 r := make([]string, len(l))
1964 for i, s := range l {
1965 r[i] = strings.ToLower(s)
1966 }
1967 return r
1968 }
1969
1970 filterSubject := lower(q.Filter.Subject)
1971 notFilterSubject := lower(q.NotFilter.Subject)
1972 filterFrom := lower(q.Filter.From)
1973 notFilterFrom := lower(q.NotFilter.From)
1974 filterTo := lower(q.Filter.To)
1975 notFilterTo := lower(q.NotFilter.To)
1976
1977 return func(m store.Message) bool {
1978 if !state.ensurePart(m, false) {
1979 return false
1980 }
1981
1982 var env message.Envelope
1983 if state.part.Envelope != nil {
1984 env = *state.part.Envelope
1985 }
1986
1987 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1988 subject := strings.ToLower(env.Subject)
1989 for _, s := range filterSubject {
1990 if !strings.Contains(subject, s) {
1991 return false
1992 }
1993 }
1994 for _, s := range notFilterSubject {
1995 if strings.Contains(subject, s) {
1996 return false
1997 }
1998 }
1999 }
2000
2001 contains := func(textLower []string, l []message.Address, all bool) bool {
2002 next:
2003 for _, s := range textLower {
2004 for _, a := range l {
2005 name := strings.ToLower(a.Name)
2006 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2007 if strings.Contains(name, s) || strings.Contains(addr, s) {
2008 if !all {
2009 return true
2010 }
2011 continue next
2012 }
2013 }
2014 if all {
2015 return false
2016 }
2017 }
2018 return all
2019 }
2020
2021 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2022 return false
2023 }
2024 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2025 return false
2026 }
2027 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2028 to := slices.Concat(env.To, env.CC, env.BCC)
2029 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2030 return false
2031 }
2032 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2033 return false
2034 }
2035 }
2036 return true
2037 }
2038}
2039
2040// headerFilterFn returns a function that filters for the header filters in the
2041// query. A nil function is returned if there are no header filters.
2042func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2043 if len(q.Filter.Headers) == 0 {
2044 return nil
2045 }
2046
2047 lowerValues := make([]string, len(q.Filter.Headers))
2048 for i, t := range q.Filter.Headers {
2049 lowerValues[i] = strings.ToLower(t[1])
2050 }
2051
2052 return func(m store.Message) bool {
2053 if !state.ensurePart(m, true) {
2054 return false
2055 }
2056 hdr, err := state.part.Header()
2057 if err != nil {
2058 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2059 return false
2060 }
2061
2062 next:
2063 for i, t := range q.Filter.Headers {
2064 k := t[0]
2065 v := lowerValues[i]
2066 l := hdr.Values(k)
2067 if v == "" && len(l) > 0 {
2068 continue
2069 }
2070 for _, e := range l {
2071 if strings.Contains(strings.ToLower(e), v) {
2072 continue next
2073 }
2074 }
2075 return false
2076 }
2077 return true
2078 }
2079}
2080
2081// wordFiltersFn returns a function that applies the word filters of the query. A
2082// nil function is returned when query does not contain a word filter.
2083func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2084 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2085 return nil
2086 }
2087
2088 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2089
2090 return func(m store.Message) bool {
2091 if !state.ensurePart(m, true) {
2092 return false
2093 }
2094
2095 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2096 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
2097 return false
2098 } else {
2099 return ok
2100 }
2101 }
2102}
2103