1// Package webops implements shared functionality between webapisrv and webmail.
2package webops
3
4import (
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "slices"
13 "sort"
14 "time"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/junk"
19 "github.com/mjl-/mox/message"
20 "github.com/mjl-/mox/mlog"
21 "github.com/mjl-/mox/moxio"
22 "github.com/mjl-/mox/store"
23)
24
25var ErrMessageNotFound = errors.New("no such message")
26
27type XOps struct {
28 DBWrite func(ctx context.Context, acc *store.Account, fn func(tx *bstore.Tx))
29 Checkf func(ctx context.Context, err error, format string, args ...any)
30 Checkuserf func(ctx context.Context, err error, format string, args ...any)
31}
32
33func (x XOps) mailboxID(ctx context.Context, tx *bstore.Tx, mailboxID int64) store.Mailbox {
34 if mailboxID == 0 {
35 x.Checkuserf(ctx, errors.New("invalid zero mailbox ID"), "getting mailbox")
36 }
37 mb, err := store.MailboxID(tx, mailboxID)
38 if err == bstore.ErrAbsent || err == store.ErrMailboxExpunged {
39 x.Checkuserf(ctx, err, "getting mailbox")
40 }
41 x.Checkf(ctx, err, "getting mailbox")
42 return mb
43}
44
45// messageID returns a non-expunged message or panics with a sherpa error.
46func (x XOps) messageID(ctx context.Context, tx *bstore.Tx, messageID int64) store.Message {
47 if messageID == 0 {
48 x.Checkuserf(ctx, errors.New("invalid zero message id"), "getting message")
49 }
50 m := store.Message{ID: messageID}
51 err := tx.Get(&m)
52 if err == bstore.ErrAbsent {
53 x.Checkuserf(ctx, ErrMessageNotFound, "getting message")
54 } else if err == nil && m.Expunged {
55 x.Checkuserf(ctx, errors.New("message was removed"), "getting message")
56 }
57 x.Checkf(ctx, err, "getting message")
58 return m
59}
60
61func (x XOps) MessageDelete(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64) {
62 acc.WithWLock(func() {
63 var changes []store.Change
64
65 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
66 var modseq store.ModSeq
67 changes = x.MessageDeleteTx(ctx, log, tx, acc, messageIDs, &modseq)
68 })
69
70 store.BroadcastChanges(acc, changes)
71 })
72}
73
74func (x XOps) MessageDeleteTx(ctx context.Context, log mlog.Log, tx *bstore.Tx, acc *store.Account, messageIDs []int64, modseq *store.ModSeq) []store.Change {
75 changes := make([]store.Change, 0, 1+1) // 1 remove, 1 mailbox counts, optimistic that all messages are in 1 mailbox.
76
77 var jf *junk.Filter
78 defer func() {
79 if jf != nil {
80 err := jf.CloseDiscard()
81 log.Check(err, "close junk filter")
82 }
83 }()
84
85 conf, _ := acc.Conf()
86
87 var mb store.Mailbox
88 var changeRemoveUIDs store.ChangeRemoveUIDs
89 xflushMailbox := func() {
90 err := tx.Update(&mb)
91 x.Checkf(ctx, err, "updating mailbox counts")
92 slices.Sort(changeRemoveUIDs.UIDs)
93 changes = append(changes, mb.ChangeCounts(), changeRemoveUIDs)
94 }
95
96 for _, id := range messageIDs {
97 m := x.messageID(ctx, tx, id)
98
99 if *modseq == 0 {
100 var err error
101 *modseq, err = acc.NextModSeq(tx)
102 x.Checkf(ctx, err, "assigning next modseq")
103 }
104
105 if m.MailboxID != mb.ID {
106 if mb.ID != 0 {
107 xflushMailbox()
108 }
109 mb = x.mailboxID(ctx, tx, m.MailboxID)
110 mb.ModSeq = *modseq
111 changeRemoveUIDs = store.ChangeRemoveUIDs{MailboxID: mb.ID, ModSeq: *modseq}
112 }
113
114 if m.Junk != m.Notjunk && jf == nil && conf.JunkFilter != nil {
115 var err error
116 jf, _, err = acc.OpenJunkFilter(ctx, log)
117 x.Checkf(ctx, err, "open junk filter")
118 }
119
120 opts := store.RemoveOpts{JunkFilter: jf}
121 _, _, err := acc.MessageRemove(log, tx, *modseq, &mb, opts, m)
122 x.Checkf(ctx, err, "expunge message")
123
124 changeRemoveUIDs.UIDs = append(changeRemoveUIDs.UIDs, m.UID)
125 changeRemoveUIDs.MsgIDs = append(changeRemoveUIDs.MsgIDs, m.ID)
126 }
127
128 xflushMailbox()
129
130 if jf != nil {
131 err := jf.Close()
132 jf = nil
133 x.Checkf(ctx, err, "close junk filter")
134 }
135
136 return changes
137}
138
139func (x XOps) MessageFlagsAdd(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, flaglist []string) {
140 flags, keywords, err := store.ParseFlagsKeywords(flaglist)
141 x.Checkuserf(ctx, err, "parsing flags")
142
143 acc.WithRLock(func() {
144 var changes []store.Change
145
146 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
147 var modseq store.ModSeq
148 var retrain []store.Message
149 var mb, origmb store.Mailbox
150
151 for _, mid := range messageIDs {
152 m := x.messageID(ctx, tx, mid)
153
154 if modseq == 0 {
155 modseq, err = acc.NextModSeq(tx)
156 x.Checkf(ctx, err, "assigning next modseq")
157 }
158
159 if mb.ID != m.MailboxID {
160 if mb.ID != 0 {
161 mb.ModSeq = modseq
162 err := tx.Update(&mb)
163 x.Checkf(ctx, err, "updating mailbox")
164 if mb.MailboxCounts != origmb.MailboxCounts {
165 changes = append(changes, mb.ChangeCounts())
166 }
167 if mb.KeywordsChanged(origmb) {
168 changes = append(changes, mb.ChangeKeywords())
169 }
170 }
171 mb = x.mailboxID(ctx, tx, m.MailboxID)
172 origmb = mb
173 }
174 mb.Keywords, _ = store.MergeKeywords(mb.Keywords, keywords)
175
176 mb.Sub(m.MailboxCounts())
177 oflags := m.Flags
178 m.Flags = m.Flags.Set(flags, flags)
179 var kwChanged bool
180 m.Keywords, kwChanged = store.MergeKeywords(m.Keywords, keywords)
181 mb.Add(m.MailboxCounts())
182
183 if m.Flags == oflags && !kwChanged {
184 continue
185 }
186
187 m.ModSeq = modseq
188 err = tx.Update(&m)
189 x.Checkf(ctx, err, "updating message")
190
191 changes = append(changes, m.ChangeFlags(oflags))
192 retrain = append(retrain, m)
193 }
194
195 if mb.ID != 0 {
196 mb.ModSeq = modseq
197 err := tx.Update(&mb)
198 x.Checkf(ctx, err, "updating mailbox")
199 if mb.MailboxCounts != origmb.MailboxCounts {
200 changes = append(changes, mb.ChangeCounts())
201 }
202 if mb.KeywordsChanged(origmb) {
203 changes = append(changes, mb.ChangeKeywords())
204 }
205 }
206
207 err = acc.RetrainMessages(ctx, log, tx, retrain)
208 x.Checkf(ctx, err, "retraining messages")
209 })
210
211 store.BroadcastChanges(acc, changes)
212 })
213}
214
215func (x XOps) MessageFlagsClear(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, flaglist []string) {
216 flags, keywords, err := store.ParseFlagsKeywords(flaglist)
217 x.Checkuserf(ctx, err, "parsing flags")
218
219 acc.WithRLock(func() {
220 var retrain []store.Message
221 var changes []store.Change
222
223 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
224 var modseq store.ModSeq
225 var mb, origmb store.Mailbox
226
227 for _, mid := range messageIDs {
228 m := x.messageID(ctx, tx, mid)
229
230 if modseq == 0 {
231 modseq, err = acc.NextModSeq(tx)
232 x.Checkf(ctx, err, "assigning next modseq")
233 }
234
235 if mb.ID != m.MailboxID {
236 if mb.ID != 0 {
237 mb.ModSeq = modseq
238 err := tx.Update(&mb)
239 x.Checkf(ctx, err, "updating counts for mailbox")
240 if mb.MailboxCounts != origmb.MailboxCounts {
241 changes = append(changes, mb.ChangeCounts())
242 }
243 // note: cannot remove keywords from mailbox by removing keywords from message.
244 }
245 mb = x.mailboxID(ctx, tx, m.MailboxID)
246 origmb = mb
247 }
248
249 oflags := m.Flags
250 mb.Sub(m.MailboxCounts())
251 m.Flags = m.Flags.Set(flags, store.Flags{})
252 var changed bool
253 m.Keywords, changed = store.RemoveKeywords(m.Keywords, keywords)
254 mb.Add(m.MailboxCounts())
255
256 if m.Flags == oflags && !changed {
257 continue
258 }
259
260 m.ModSeq = modseq
261 err = tx.Update(&m)
262 x.Checkf(ctx, err, "updating message")
263
264 changes = append(changes, m.ChangeFlags(oflags))
265 retrain = append(retrain, m)
266 }
267
268 if mb.ID != 0 {
269 mb.ModSeq = modseq
270 err := tx.Update(&mb)
271 x.Checkf(ctx, err, "updating keywords in mailbox")
272 if mb.MailboxCounts != origmb.MailboxCounts {
273 changes = append(changes, mb.ChangeCounts())
274 }
275 // note: cannot remove keywords from mailbox by removing keywords from message.
276 }
277
278 err = acc.RetrainMessages(ctx, log, tx, retrain)
279 x.Checkf(ctx, err, "retraining messages")
280 })
281
282 store.BroadcastChanges(acc, changes)
283 })
284}
285
286// MailboxesMarkRead updates all messages in the referenced mailboxes as seen when
287// they aren't yet. The mailboxes are updated with their unread messages counts,
288// and the changes are propagated.
289func (x XOps) MailboxesMarkRead(ctx context.Context, log mlog.Log, acc *store.Account, mailboxIDs []int64) {
290 acc.WithRLock(func() {
291 var changes []store.Change
292
293 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
294 var modseq store.ModSeq
295
296 // Note: we don't need to retrain, changing the "seen" flag is not relevant.
297
298 for _, mbID := range mailboxIDs {
299 mb := x.mailboxID(ctx, tx, mbID)
300
301 // Find messages to update.
302 q := bstore.QueryTx[store.Message](tx)
303 q.FilterNonzero(store.Message{MailboxID: mb.ID})
304 q.FilterEqual("Seen", false)
305 q.FilterEqual("Expunged", false)
306 q.SortAsc("UID")
307 var have bool
308 err := q.ForEach(func(m store.Message) error {
309 have = true // We need to update mailbox.
310
311 oflags := m.Flags
312 mb.Sub(m.MailboxCounts())
313 m.Seen = true
314 mb.Add(m.MailboxCounts())
315
316 if modseq == 0 {
317 var err error
318 modseq, err = acc.NextModSeq(tx)
319 x.Checkf(ctx, err, "assigning next modseq")
320 }
321 m.ModSeq = modseq
322 err := tx.Update(&m)
323 x.Checkf(ctx, err, "updating message")
324
325 changes = append(changes, m.ChangeFlags(oflags))
326 return nil
327 })
328 x.Checkf(ctx, err, "listing messages to mark as read")
329
330 if have {
331 mb.ModSeq = modseq
332 err := tx.Update(&mb)
333 x.Checkf(ctx, err, "updating mailbox")
334 changes = append(changes, mb.ChangeCounts())
335 }
336 }
337 })
338
339 store.BroadcastChanges(acc, changes)
340 })
341}
342
343// MessageMove moves messages to the mailbox represented by mailboxName, or to mailboxID if mailboxName is empty.
344func (x XOps) MessageMove(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, mailboxName string, mailboxID int64) {
345 acc.WithWLock(func() {
346 var changes []store.Change
347
348 var newIDs []int64
349 defer func() {
350 for _, id := range newIDs {
351 p := acc.MessagePath(id)
352 err := os.Remove(p)
353 log.Check(err, "removing delivered message after failure", slog.String("path", p))
354 }
355 }()
356
357 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
358 if mailboxName != "" {
359 mb, err := acc.MailboxFind(tx, mailboxName)
360 x.Checkf(ctx, err, "looking up mailbox name")
361 if mb == nil {
362 x.Checkuserf(ctx, errors.New("not found"), "looking up mailbox name")
363 } else {
364 mailboxID = mb.ID
365 }
366 }
367
368 mbDst := x.mailboxID(ctx, tx, mailboxID)
369
370 if len(messageIDs) == 0 {
371 return
372 }
373
374 var modseq store.ModSeq
375 newIDs, changes = x.MessageMoveTx(ctx, log, acc, tx, messageIDs, mbDst, &modseq)
376 })
377 newIDs = nil
378
379 store.BroadcastChanges(acc, changes)
380 })
381}
382
383// MessageMoveTx moves message to a new mailbox, which must be different than their
384// current mailbox. Moving a message is done by changing the MailboxID and
385// assigning an appriorate new UID, and then inserting a replacement Message record
386// with new ID that is marked expunged in the original mailbox, along with a
387// MessageErase record so the message gets erased when all sessions stopped
388// referencing the message.
389func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, messageIDs []int64, mbDst store.Mailbox, modseq *store.ModSeq) ([]int64, []store.Change) {
390 var newIDs []int64
391 var commit bool
392 defer func() {
393 if commit {
394 return
395 }
396 for _, id := range newIDs {
397 p := acc.MessagePath(id)
398 err := os.Remove(p)
399 log.Check(err, "removing delivered message after failure", slog.String("path", p))
400 }
401 newIDs = nil
402 }()
403
404 // n adds, 1 remove, 2 mailboxcounts, 1 mailboxkeywords, optimistic that messages are in a single source mailbox.
405 changes := make([]store.Change, 0, len(messageIDs)+4)
406
407 var err error
408 if *modseq == 0 {
409 *modseq, err = acc.NextModSeq(tx)
410 x.Checkf(ctx, err, "assigning next modseq")
411 }
412
413 mbDst.ModSeq = *modseq
414
415 // Get messages. group them by mailbox.
416 l := make([]store.Message, len(messageIDs))
417 for i, id := range messageIDs {
418 l[i] = x.messageID(ctx, tx, id)
419 if l[i].MailboxID == mbDst.ID {
420 // Client should filter out messages that are already in mailbox.
421 x.Checkuserf(ctx, fmt.Errorf("message %d already in destination mailbox", l[i].ID), "moving message")
422 }
423 }
424
425 // Sort (group) by mailbox, sort by UID.
426 sort.Slice(l, func(i, j int) bool {
427 if l[i].MailboxID != l[j].MailboxID {
428 return l[i].MailboxID < l[j].MailboxID
429 }
430 return l[i].UID < l[j].UID
431 })
432
433 var jf *junk.Filter
434 defer func() {
435 if jf != nil {
436 err := jf.CloseDiscard()
437 log.Check(err, "close junk filter")
438 }
439 }()
440
441 accConf, _ := acc.Conf()
442
443 var mbSrc store.Mailbox
444 var changeRemoveUIDs store.ChangeRemoveUIDs
445 xflushMailbox := func() {
446 changes = append(changes, changeRemoveUIDs, mbSrc.ChangeCounts())
447
448 err = tx.Update(&mbSrc)
449 x.Checkf(ctx, err, "updating source mailbox counts")
450 }
451
452 nkeywords := len(mbDst.Keywords)
453 now := time.Now()
454
455 syncDirs := map[string]struct{}{}
456
457 for _, om := range l {
458 if om.MailboxID != mbSrc.ID {
459 if mbSrc.ID != 0 {
460 xflushMailbox()
461 }
462 mbSrc = x.mailboxID(ctx, tx, om.MailboxID)
463 mbSrc.ModSeq = *modseq
464 changeRemoveUIDs = store.ChangeRemoveUIDs{MailboxID: mbSrc.ID, ModSeq: *modseq}
465 }
466
467 nm := om
468 nm.MailboxID = mbDst.ID
469 nm.UID = mbDst.UIDNext
470 mbDst.UIDNext++
471 nm.ModSeq = *modseq
472 nm.CreateSeq = *modseq
473 nm.SaveDate = &now
474 if nm.IsReject && nm.MailboxDestinedID != 0 {
475 // Incorrectly delivered to Rejects mailbox. Adjust MailboxOrigID so this message
476 // is used for reputation calculation during future deliveries.
477 nm.MailboxOrigID = nm.MailboxDestinedID
478 nm.IsReject = false
479 nm.Seen = false
480 }
481 if mbDst.Trash {
482 nm.Seen = true
483 }
484
485 nm.JunkFlagsForMailbox(mbDst, accConf)
486
487 err := tx.Update(&nm)
488 x.Checkf(ctx, err, "updating message with new mailbox")
489
490 mbDst.Add(nm.MailboxCounts())
491
492 mbSrc.Sub(om.MailboxCounts())
493 om.ID = 0
494 om.Expunged = true
495 om.ModSeq = *modseq
496 om.TrainedJunk = nil
497 err = tx.Insert(&om)
498 x.Checkf(ctx, err, "inserting expunged message in old mailbox")
499
500 dstPath := acc.MessagePath(om.ID)
501 dstDir := filepath.Dir(dstPath)
502 if _, ok := syncDirs[dstDir]; !ok {
503 os.MkdirAll(dstDir, 0770)
504 syncDirs[dstDir] = struct{}{}
505 }
506
507 err = moxio.LinkOrCopy(log, dstPath, acc.MessagePath(nm.ID), nil, false)
508 x.Checkf(ctx, err, "duplicating message in old mailbox for current sessions")
509 newIDs = append(newIDs, nm.ID)
510 // We don't sync the directory. In case of a crash and files disappearing, the
511 // eraser will simply not find the file at next startup.
512
513 err = tx.Insert(&store.MessageErase{ID: om.ID, SkipUpdateDiskUsage: true})
514 x.Checkf(ctx, err, "insert message erase")
515
516 mbDst.Keywords, _ = store.MergeKeywords(mbDst.Keywords, nm.Keywords)
517
518 if accConf.JunkFilter != nil && nm.NeedsTraining() {
519 // Lazily open junk filter.
520 if jf == nil {
521 jf, _, err = acc.OpenJunkFilter(ctx, log)
522 x.Checkf(ctx, err, "open junk filter")
523 }
524 err := acc.RetrainMessage(ctx, log, tx, jf, &nm)
525 x.Checkf(ctx, err, "retrain message after moving")
526 }
527
528 changeRemoveUIDs.UIDs = append(changeRemoveUIDs.UIDs, om.UID)
529 changeRemoveUIDs.MsgIDs = append(changeRemoveUIDs.MsgIDs, om.ID)
530 changes = append(changes, nm.ChangeAddUID())
531 }
532
533 for dir := range syncDirs {
534 err := moxio.SyncDir(log, dir)
535 x.Checkf(ctx, err, "sync directory")
536 }
537
538 xflushMailbox()
539
540 changes = append(changes, mbDst.ChangeCounts())
541 if nkeywords > len(mbDst.Keywords) {
542 changes = append(changes, mbDst.ChangeKeywords())
543 }
544
545 err = tx.Update(&mbDst)
546 x.Checkf(ctx, err, "updating destination mailbox with uidnext and modseq")
547
548 if jf != nil {
549 err := jf.Close()
550 x.Checkf(ctx, err, "saving junk filter")
551 jf = nil
552 }
553
554 commit = true
555 return newIDs, changes
556}
557
558func isText(p message.Part) bool {
559 return p.MediaType == "" && p.MediaSubType == "" || p.MediaType == "TEXT" && p.MediaSubType == "PLAIN"
560}
561
562func isHTML(p message.Part) bool {
563 return p.MediaType == "" && p.MediaSubType == "" || p.MediaType == "TEXT" && p.MediaSubType == "HTML"
564}
565
566func isAlternative(p message.Part) bool {
567 return p.MediaType == "MULTIPART" && p.MediaSubType == "ALTERNATIVE"
568}
569
570func readPart(p message.Part, maxSize int64) (string, error) {
571 buf, err := io.ReadAll(io.LimitReader(p.ReaderUTF8OrBinary(), maxSize))
572 if err != nil {
573 return "", fmt.Errorf("reading part contents: %v", err)
574 }
575 return string(buf), nil
576}
577
578// ReadableParts returns the contents of the first text and/or html parts,
579// descending into multiparts, truncated to maxSize bytes if longer.
580func ReadableParts(p message.Part, maxSize int64) (text string, html string, found bool, err error) {
581 // todo: may want to merge this logic with webmail's message parsing.
582
583 // For non-multipart messages, top-level part.
584 if isText(p) {
585 data, err := readPart(p, maxSize)
586 return data, "", true, err
587 } else if isHTML(p) {
588 data, err := readPart(p, maxSize)
589 return "", data, true, err
590 }
591
592 // Look in sub-parts. Stop when we have a readable part, don't continue with other
593 // subparts unless we have a multipart/alternative.
594 // todo: we may have to look at disposition "inline".
595 var haveText, haveHTML bool
596 for _, pp := range p.Parts {
597 if isText(pp) {
598 haveText = true
599 text, err = readPart(pp, maxSize)
600 if !isAlternative(p) {
601 break
602 }
603 } else if isHTML(pp) {
604 haveHTML = true
605 html, err = readPart(pp, maxSize)
606 if !isAlternative(p) {
607 break
608 }
609 }
610 }
611 if haveText || haveHTML {
612 return text, html, true, err
613 }
614
615 // Descend into the subparts.
616 for _, pp := range p.Parts {
617 text, html, found, err = ReadableParts(pp, maxSize)
618 if found {
619 break
620 }
621 }
622 return
623}
624