3// todo: if fetch fails part-way through the command, we wouldn't be storing the messages that were parsed. should we try harder to get parsed form of messages stored in db?
15 "golang.org/x/exp/maps"
17 "github.com/mjl-/bstore"
19 "github.com/mjl-/mox/message"
20 "github.com/mjl-/mox/mox-"
21 "github.com/mjl-/mox/moxio"
22 "github.com/mjl-/mox/store"
25// functions to handle fetch attribute requests are defined on fetchCmd.
30 tx *bstore.Tx // Writable tx, for storing message when first parsed as mime parts.
31 changes []store.Change // For updated Seen flag.
34 needModseq bool // Whether untagged responses needs modseq.
35 expungeIssued bool // Set if a message cannot be read. Can happen for expunged messages.
36 modseq store.ModSeq // Initialized on first change, for marking messages as seen.
37 isUID bool // If this is a UID FETCH command.
38 hasChangedSince bool // Whether CHANGEDSINCE was set. Enables MODSEQ in response.
39 deltaCounts store.MailboxCounts // By marking \Seen, the number of unread/unseen messages will go down. We update counts at the end.
41 // Loaded when first needed, closed when message was processed.
42 m *store.Message // Message currently being processed.
47// error when processing an attribute. we typically just don't respond with requested attributes that encounter a failure.
48type attrError struct{ err error }
50func (e attrError) Error() string {
54// raise error processing an attribute.
55func (cmd *fetchCmd) xerrorf(format string, args ...any) {
56 panic(attrError{fmt.Errorf(format, args...)})
59func (cmd *fetchCmd) xcheckf(err error, format string, args ...any) {
61 msg := fmt.Sprintf(format, args...)
62 cmd.xerrorf("%s: %w", msg, err)
66// Fetch returns information about messages, be it email envelopes, headers,
67// bodies, full messages, flags.
70func (c *conn) cmdxFetch(isUID bool, tag, cmdstr string, p *parser) {
79 atts := p.xfetchAtts(isUID)
80 var changedSince int64
81 var haveChangedSince bool
87 seen := map[string]bool{}
90 if isUID && p.conn.enabled[capQresync] {
92 w = p.xtakelist("CHANGEDSINCE", "VANISHED")
94 w = p.xtakelist("CHANGEDSINCE")
97 xsyntaxErrorf("duplicate fetch modifier %s", w)
103 changedSince = p.xnumber64()
104 // workaround: ios mail (16.5.1) was seen sending changedSince 0 on an existing account that got condstore enabled.
105 if changedSince == 0 && mox.Pedantic {
107 xsyntaxErrorf("changedsince modseq must be > 0")
110 p.conn.xensureCondstore(nil)
111 haveChangedSince = true
122 if vanished && !haveChangedSince {
123 xsyntaxErrorf("VANISHED can only be used with CHANGEDSINCE")
128 // We don't use c.account.WithRLock because we write to the client while reading messages.
129 // We get the rlock, then we check the mailbox, release the lock and read the messages.
130 // The db transaction still locks out any changes to the database...
132 runlock := c.account.RUnlock
133 // Note: we call runlock in a closure because we replace it below.
138 var vanishedUIDs []store.UID
139 cmd := &fetchCmd{conn: c, mailboxID: c.mailboxID, isUID: isUID, hasChangedSince: haveChangedSince}
140 c.xdbwrite(func(tx *bstore.Tx) {
143 // Ensure the mailbox still exists.
144 mb := c.xmailboxID(tx, c.mailboxID)
148 // With changedSince, the client is likely asking for a small set of changes. Use a
149 // database query to trim down the uids we need to look at.
151 if changedSince > 0 {
152 q := bstore.QueryTx[store.Message](tx)
153 q.FilterNonzero(store.Message{MailboxID: c.mailboxID})
154 q.FilterGreater("ModSeq", store.ModSeqFromClient(changedSince))
156 q.FilterEqual("Expunged", false)
158 err := q.ForEach(func(m store.Message) error {
160 vanishedUIDs = append(vanishedUIDs, m.UID)
162 if nums.containsUID(m.UID, c.uids, c.searchResult) {
163 uids = append(uids, m.UID)
166 seq := c.sequence(m.UID)
167 if seq > 0 && nums.containsSeq(seq, c.uids, c.searchResult) {
168 uids = append(uids, m.UID)
173 xcheckf(err, "looking up messages with changedsince")
175 uids = c.xnumSetUIDs(isUID, nums)
180 delModSeq, err := c.account.HighestDeletedModSeq(tx)
181 xcheckf(err, "looking up highest deleted modseq")
182 if changedSince < delModSeq.Client() {
183 // First sort the uids we already found, for fast lookup.
184 sort.Slice(vanishedUIDs, func(i, j int) bool {
185 return vanishedUIDs[i] < vanishedUIDs[j]
188 // We'll be gathering any more vanished uids in more.
189 more := map[store.UID]struct{}{}
190 checkVanished := func(uid store.UID) {
191 if uidSearch(c.uids, uid) <= 0 && uidSearch(vanishedUIDs, uid) <= 0 {
192 more[uid] = struct{}{}
195 // Now look through the requested uids. We may have a searchResult, handle it
196 // separately from a numset with potential stars, over which we can more easily
198 if nums.searchResult {
199 for _, uid := range c.searchResult {
203 iter := nums.interpretStar(c.uids).newIter()
205 num, ok := iter.Next()
209 checkVanished(store.UID(num))
212 vanishedUIDs = append(vanishedUIDs, maps.Keys(more)...)
216 // Release the account lock.
218 runlock = func() {} // Prevent defer from unlocking again.
221 if len(vanishedUIDs) > 0 {
222 // Mention all vanished UIDs in compact numset form.
224 sort.Slice(vanishedUIDs, func(i, j int) bool {
225 return vanishedUIDs[i] < vanishedUIDs[j]
227 // No hard limit on response sizes, but clients are recommended to not send more
228 // than 8k. We send a more conservative max 4k.
229 for _, s := range compactUIDSet(vanishedUIDs).Strings(4*1024 - 32) {
230 c.bwritelinef("* VANISHED (EARLIER) %s", s)
234 for _, uid := range uids {
236 cmd.conn.log.Debug("processing uid", slog.Any("uid", uid))
240 var zeromc store.MailboxCounts
241 if cmd.deltaCounts != zeromc {
242 mb.Add(cmd.deltaCounts) // Unseen/Unread will be <= 0.
243 err := tx.Update(&mb)
244 xcheckf(err, "updating mailbox counts")
245 cmd.changes = append(cmd.changes, mb.ChangeCounts())
246 // No need to update account total message size.
250 if len(cmd.changes) > 0 {
251 // Broadcast seen updates to other connections.
252 c.broadcast(cmd.changes)
255 if cmd.expungeIssued {
257 c.writeresultf("%s NO [EXPUNGEISSUED] at least one message was expunged", tag)
263func (cmd *fetchCmd) xmodseq() store.ModSeq {
266 cmd.modseq, err = cmd.conn.account.NextModSeq(cmd.tx)
267 cmd.xcheckf(err, "assigning next modseq")
272func (cmd *fetchCmd) xensureMessage() *store.Message {
277 q := bstore.QueryTx[store.Message](cmd.tx)
278 q.FilterNonzero(store.Message{MailboxID: cmd.mailboxID, UID: cmd.uid})
279 q.FilterEqual("Expunged", false)
281 cmd.xcheckf(err, "get message for uid %d", cmd.uid)
286func (cmd *fetchCmd) xensureParsed() (*store.MsgReader, *message.Part) {
288 return cmd.msgr, cmd.part
291 m := cmd.xensureMessage()
293 cmd.msgr = cmd.conn.account.MessageReader(*m)
296 err := cmd.msgr.Close()
297 cmd.conn.xsanity(err, "closing messagereader")
302 p, err := m.LoadPart(cmd.msgr)
303 xcheckf(err, "load parsed message")
305 return cmd.msgr, cmd.part
308func (cmd *fetchCmd) process(atts []fetchAtt) {
313 err := cmd.msgr.Close()
314 cmd.conn.xsanity(err, "closing messagereader")
322 err, ok := x.(attrError)
326 if errors.Is(err, bstore.ErrAbsent) {
327 cmd.expungeIssued = true
330 cmd.conn.log.Infox("processing fetch attribute", err, slog.Any("uid", cmd.uid))
331 xuserErrorf("processing fetch attribute: %v", err)
334 data := listspace{bare("UID"), number(cmd.uid)}
337 cmd.needFlags = false
338 cmd.needModseq = false
340 for _, a := range atts {
341 data = append(data, cmd.xprocessAtt(a)...)
345 m := cmd.xensureMessage()
346 cmd.deltaCounts.Sub(m.MailboxCounts())
349 cmd.deltaCounts.Add(m.MailboxCounts())
350 m.ModSeq = cmd.xmodseq()
351 err := cmd.tx.Update(m)
352 xcheckf(err, "marking message as seen")
353 // No need to update account total message size.
355 cmd.changes = append(cmd.changes, m.ChangeFlags(origFlags))
359 m := cmd.xensureMessage()
360 data = append(data, bare("FLAGS"), flaglist(m.Flags, m.Keywords))
363 // The wording around when to include the MODSEQ attribute is hard to follow and is
364 // specified and refined in several places.
366 // An additional rule applies to "QRESYNC servers" (we'll assume it only applies
367 // when QRESYNC is enabled on a connection): setting the \Seen flag also triggers
371 // subsequent untagged fetch responses", then lists cases, but leaves out FETCH/UID
372 // FETCH. That appears intentional, it is not a list of examples, it is the full
373 // list, and the "all subsequent untagged fetch responses" doesn't mean "all", just
374 // those covering the listed cases. That makes sense, because otherwise all the
375 // other mentioning of cases elsewhere in the RFC would be too superfluous.
378 if cmd.needModseq || cmd.hasChangedSince || cmd.conn.enabled[capQresync] && (cmd.isUID || cmd.markSeen) {
379 m := cmd.xensureMessage()
380 data = append(data, bare("MODSEQ"), listspace{bare(fmt.Sprintf("%d", m.ModSeq.Client()))})
383 // Write errors are turned into panics because we write through c.
384 fmt.Fprintf(cmd.conn.bw, "* %d FETCH ", cmd.conn.xsequence(cmd.uid))
385 data.writeTo(cmd.conn, cmd.conn.bw)
386 cmd.conn.bw.Write([]byte("\r\n"))
389// result for one attribute. if processing fails, e.g. because data was requested
390// that doesn't exist and cannot be represented in imap, the attribute is simply
391// not returned to the user. in this case, the returned value is a nil list.
392func (cmd *fetchCmd) xprocessAtt(a fetchAtt) []token {
398 _, part := cmd.xensureParsed()
399 envelope := xenvelope(part)
400 return []token{bare("ENVELOPE"), envelope}
404 m := cmd.xensureMessage()
405 return []token{bare("INTERNALDATE"), dquote(m.Received.Format("_2-Jan-2006 15:04:05 -0700"))}
407 case "BODYSTRUCTURE":
408 _, part := cmd.xensureParsed()
409 bs := xbodystructure(part)
410 return []token{bare("BODYSTRUCTURE"), bs}
413 respField, t := cmd.xbody(a)
417 return []token{bare(respField), t}
420 _, p := cmd.xensureParsed()
421 if len(a.sectionBinary) == 0 {
422 // Must return the size of the entire message but with decoded body.
423 // todo: make this less expensive and/or cache the result?
424 n, err := io.Copy(io.Discard, cmd.xbinaryMessageReader(p))
425 cmd.xcheckf(err, "reading message as binary for its size")
426 return []token{bare(cmd.sectionRespField(a)), number(uint32(n))}
428 p = cmd.xpartnumsDeref(a.sectionBinary, p)
429 if len(p.Parts) > 0 || p.Message != nil {
431 cmd.xerrorf("binary only allowed on leaf parts, not multipart/* or message/rfc822 or message/global")
433 return []token{bare(cmd.sectionRespField(a)), number(p.DecodedSize)}
436 respField, t := cmd.xbinary(a)
440 return []token{bare(respField), t}
443 m := cmd.xensureMessage()
444 return []token{bare("RFC822.SIZE"), number(m.Size)}
446 case "RFC822.HEADER":
450 section: §ionSpec{
451 msgtext: §ionMsgtext{s: "HEADER"},
454 respField, t := cmd.xbody(ba)
458 return []token{bare(a.field), t}
463 section: §ionSpec{},
465 respField, t := cmd.xbody(ba)
469 return []token{bare(a.field), t}
474 section: §ionSpec{
475 msgtext: §ionMsgtext{s: "TEXT"},
478 respField, t := cmd.xbody(ba)
482 return []token{bare(a.field), t}
488 cmd.needModseq = true
491 xserverErrorf("field %q not yet implemented", a.field)
497func xenvelope(p *message.Part) token {
498 var env message.Envelope
499 if p.Envelope != nil {
502 var date token = nilt
503 if !env.Date.IsZero() {
505 date = string0(env.Date.Format("Mon, 2 Jan 2006 15:04:05 -0700"))
507 var subject token = nilt
508 if env.Subject != "" {
509 subject = string0(env.Subject)
511 var inReplyTo token = nilt
512 if env.InReplyTo != "" {
513 inReplyTo = string0(env.InReplyTo)
515 var messageID token = nilt
516 if env.MessageID != "" {
517 messageID = string0(env.MessageID)
520 addresses := func(l []message.Address) token {
525 for _, a := range l {
526 var name token = nilt
528 name = string0(a.Name)
530 user := string0(a.User)
531 var host token = nilt
533 host = string0(a.Host)
535 r = append(r, listspace{name, nilt, user, host})
542 if len(sender) == 0 {
545 replyTo := env.ReplyTo
546 if len(replyTo) == 0 {
564func (cmd *fetchCmd) peekOrSeen(peek bool) {
565 if cmd.conn.readonly || peek {
568 m := cmd.xensureMessage()
575// reader that returns the message, but with header Content-Transfer-Encoding left out.
576func (cmd *fetchCmd) xbinaryMessageReader(p *message.Part) io.Reader {
577 hr := cmd.xmodifiedHeader(p, []string{"Content-Transfer-Encoding"}, true)
578 return io.MultiReader(hr, p.Reader())
581// return header with only fields, or with everything except fields if "not" is set.
582func (cmd *fetchCmd) xmodifiedHeader(p *message.Part, fields []string, not bool) io.Reader {
583 h, err := io.ReadAll(p.HeaderReader())
584 cmd.xcheckf(err, "reading header")
586 matchesFields := func(line []byte) bool {
587 k := bytes.TrimRight(bytes.SplitN(line, []byte(":"), 2)[0], " \t")
588 for _, f := range fields {
589 if bytes.EqualFold(k, []byte(f)) {
597 hb := &bytes.Buffer{}
600 i := bytes.Index(line, []byte("\r\n"))
606 match = matchesFields(line) || match && (bytes.HasPrefix(line, []byte(" ")) || bytes.HasPrefix(line, []byte("\t")))
607 if match != not || len(line) == 2 {
614func (cmd *fetchCmd) xbinary(a fetchAtt) (string, token) {
615 _, part := cmd.xensureParsed()
617 cmd.peekOrSeen(a.peek)
618 if len(a.sectionBinary) == 0 {
619 r := cmd.xbinaryMessageReader(part)
620 if a.partial != nil {
621 r = cmd.xpartialReader(a.partial, r)
623 return cmd.sectionRespField(a), readerSyncliteral{r}
627 if len(a.sectionBinary) > 0 {
628 p = cmd.xpartnumsDeref(a.sectionBinary, p)
630 if len(p.Parts) != 0 || p.Message != nil {
632 cmd.xerrorf("binary only allowed on leaf parts, not multipart/* or message/rfc822 or message/global")
635 switch p.ContentTransferEncoding {
636 case "", "7BIT", "8BIT", "BINARY", "BASE64", "QUOTED-PRINTABLE":
639 xusercodeErrorf("UNKNOWN-CTE", "unknown Content-Transfer-Encoding %q", p.ContentTransferEncoding)
643 if a.partial != nil {
644 r = cmd.xpartialReader(a.partial, r)
646 return cmd.sectionRespField(a), readerSyncliteral{r}
649func (cmd *fetchCmd) xpartialReader(partial *partial, r io.Reader) io.Reader {
650 n, err := io.Copy(io.Discard, io.LimitReader(r, int64(partial.offset)))
651 cmd.xcheckf(err, "skipping to offset for partial")
652 if n != int64(partial.offset) {
655 return io.LimitReader(r, int64(partial.count))
658func (cmd *fetchCmd) xbody(a fetchAtt) (string, token) {
659 msgr, part := cmd.xensureParsed()
661 if a.section == nil {
662 // Non-extensible form of BODYSTRUCTURE.
663 return a.field, xbodystructure(part)
666 cmd.peekOrSeen(a.peek)
668 respField := cmd.sectionRespField(a)
670 if a.section.msgtext == nil && a.section.part == nil {
671 m := cmd.xensureMessage()
674 if a.partial != nil {
675 offset = int64(a.partial.offset)
679 count = int64(a.partial.count)
680 if offset+count > m.Size {
681 count = m.Size - offset
684 return respField, readerSizeSyncliteral{&moxio.AtReader{R: msgr, Offset: offset}, count}
687 sr := cmd.xsection(a.section, part)
689 if a.partial != nil {
690 n, err := io.Copy(io.Discard, io.LimitReader(sr, int64(a.partial.offset)))
691 cmd.xcheckf(err, "skipping to offset for partial")
692 if n != int64(a.partial.offset) {
695 return respField, readerSyncliteral{io.LimitReader(sr, int64(a.partial.count))}
697 return respField, readerSyncliteral{sr}
700func (cmd *fetchCmd) xpartnumsDeref(nums []uint32, p *message.Part) *message.Part {
702 if (len(p.Parts) == 0 && p.Message == nil) && len(nums) == 1 && nums[0] == 1 {
707 for i, num := range nums {
708 index := int(num - 1)
709 if p.Message != nil {
710 err := p.SetMessageReaderAt()
711 cmd.xcheckf(err, "preparing submessage")
712 return cmd.xpartnumsDeref(nums[i:], p.Message)
714 if index < 0 || index >= len(p.Parts) {
715 cmd.xerrorf("requested part does not exist")
722func (cmd *fetchCmd) xsection(section *sectionSpec, p *message.Part) io.Reader {
723 if section.part == nil {
724 return cmd.xsectionMsgtext(section.msgtext, p)
727 p = cmd.xpartnumsDeref(section.part.part, p)
729 if section.part.text == nil {
734 if p.Message != nil {
735 err := p.SetMessageReaderAt()
736 cmd.xcheckf(err, "preparing submessage")
740 if !section.part.text.mime {
741 return cmd.xsectionMsgtext(section.part.text.msgtext, p)
745 h, err := io.ReadAll(p.HeaderReader())
746 cmd.xcheckf(err, "reading header")
748 matchesFields := func(line []byte) bool {
749 k := textproto.CanonicalMIMEHeaderKey(string(bytes.TrimRight(bytes.SplitN(line, []byte(":"), 2)[0], " \t")))
751 return (p.Envelope != nil && k == "Mime-Version") || strings.HasPrefix(k, "Content-")
755 hb := &bytes.Buffer{}
758 i := bytes.Index(line, []byte("\r\n"))
764 match = matchesFields(line) || match && (bytes.HasPrefix(line, []byte(" ")) || bytes.HasPrefix(line, []byte("\t")))
765 if match || len(line) == 2 {
772func (cmd *fetchCmd) xsectionMsgtext(smt *sectionMsgtext, p *message.Part) io.Reader {
773 if smt.s == "HEADER" {
774 return p.HeaderReader()
778 case "HEADER.FIELDS":
779 return cmd.xmodifiedHeader(p, smt.headers, false)
781 case "HEADER.FIELDS.NOT":
782 return cmd.xmodifiedHeader(p, smt.headers, true)
785 // It appears imap clients expect to get the body of the message, not a "text body"
789 panic(serverError{fmt.Errorf("missing case")})
792func (cmd *fetchCmd) sectionRespField(a fetchAtt) string {
794 if len(a.sectionBinary) > 0 {
795 s += fmt.Sprintf("%d", a.sectionBinary[0])
796 for _, v := range a.sectionBinary[1:] {
797 s += "." + fmt.Sprintf("%d", v)
799 } else if a.section != nil {
800 if a.section.part != nil {
802 s += fmt.Sprintf("%d", p.part[0])
803 for _, v := range p.part[1:] {
804 s += "." + fmt.Sprintf("%d", v)
810 s += "." + cmd.sectionMsgtextName(p.text.msgtext)
813 } else if a.section.msgtext != nil {
814 s += cmd.sectionMsgtextName(a.section.msgtext)
819 if a.field != "BINARY" && a.partial != nil {
820 s += fmt.Sprintf("<%d>", a.partial.offset)
825func (cmd *fetchCmd) sectionMsgtextName(smt *sectionMsgtext) string {
827 if strings.HasPrefix(smt.s, "HEADER.FIELDS") {
829 for _, h := range smt.headers {
830 l = append(l, astring(h))
832 s += " " + l.pack(cmd.conn)
837func bodyFldParams(params map[string]string) token {
838 if len(params) == 0 {
841 // Ensure same ordering, easier for testing.
843 for k := range params {
844 keys = append(keys, k)
847 l := make(listspace, 2*len(keys))
849 for _, k := range keys {
850 l[i] = string0(strings.ToUpper(k))
851 l[i+1] = string0(params[k])
857func bodyFldEnc(s string) token {
858 up := strings.ToUpper(s)
860 case "7BIT", "8BIT", "BINARY", "BASE64", "QUOTED-PRINTABLE":
866// xbodystructure returns a "body".
867// calls itself for multipart messages and message/{rfc822,global}.
868func xbodystructure(p *message.Part) token {
869 if p.MediaType == "MULTIPART" {
872 for i := range p.Parts {
873 bodies = append(bodies, xbodystructure(&p.Parts[i]))
875 return listspace{bodies, string0(p.MediaSubType)}
879 if p.MediaType == "TEXT" {
885 nilOrString(p.ContentID),
886 nilOrString(p.ContentDescription),
887 bodyFldEnc(p.ContentTransferEncoding),
888 number(p.EndOffset - p.BodyOffset),
889 number(p.RawLineCount),
891 } else if p.MediaType == "MESSAGE" && (p.MediaSubType == "RFC822" || p.MediaSubType == "GLOBAL") {
893 // note: we don't have to prepare p.Message for reading, because we aren't going to read from it.
898 nilOrString(p.ContentID),
899 nilOrString(p.ContentDescription),
900 bodyFldEnc(p.ContentTransferEncoding),
901 number(p.EndOffset - p.BodyOffset),
902 xenvelope(p.Message),
903 xbodystructure(p.Message),
904 number(p.RawLineCount), // todo: or mp.RawLineCount?
909 case "APPLICATION", "AUDIO", "IMAGE", "FONT", "MESSAGE", "MODEL", "VIDEO":
910 media = dquote(p.MediaType)
912 media = string0(p.MediaType)
919 nilOrString(p.ContentID),
920 nilOrString(p.ContentDescription),
921 bodyFldEnc(p.ContentTransferEncoding),
922 number(p.EndOffset - p.BodyOffset),