1package store
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "runtime"
10 "sort"
11 "time"
12
13 "golang.org/x/exp/slices"
14 "golang.org/x/exp/slog"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/message"
19 "github.com/mjl-/mox/mlog"
20 "github.com/mjl-/mox/moxio"
21)
22
23// Assign a new/incoming message to a thread. Message does not yet have an ID. If
24// this isn't a response, ThreadID should remain 0 (unless this is a message with
25// existing message-id) and the caller must set ThreadID to ID.
26// If the account is still busy upgrading messages with threadids in the background, parents
27// may have a threadid 0. That results in this message getting threadid 0, which
28// will handled by the background upgrade process assigning a threadid when it gets
29// to this message.
30func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
31 if m.MessageID != "" {
32 // Match against existing different message with same Message-ID.
33 q := bstore.QueryTx[Message](tx)
34 q.FilterNonzero(Message{MessageID: m.MessageID})
35 q.FilterEqual("Expunged", false)
36 q.FilterNotEqual("ID", m.ID)
37 q.FilterNotEqual("ThreadID", int64(0))
38 q.SortAsc("ID")
39 q.Limit(1)
40 em, err := q.Get()
41 if err != nil && err != bstore.ErrAbsent {
42 return fmt.Errorf("looking up existing message with message-id: %v", err)
43 } else if err == nil {
44 assignParent(m, em, true)
45 return nil
46 }
47 }
48
49 h, err := part.Header()
50 if err != nil {
51 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
52 }
53 messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
54 if err != nil {
55 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
56 }
57 for i := len(messageIDs) - 1; i >= 0; i-- {
58 messageID := messageIDs[i]
59 if messageID == m.MessageID {
60 continue
61 }
62 tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
63 if err != nil {
64 return fmt.Errorf("looking up thread message for new message: %v", err)
65 } else if tm != nil {
66 assignParent(m, *tm, true)
67 return nil
68 }
69 m.ThreadMissingLink = true
70 }
71 if len(messageIDs) > 0 {
72 return nil
73 }
74
75 var isResp bool
76 if part != nil && part.Envelope != nil {
77 m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
78 }
79 if !isResp || m.SubjectBase == "" {
80 return nil
81 }
82 m.ThreadMissingLink = true
83 tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
84 if err != nil {
85 return fmt.Errorf("looking up thread message by subject: %v", err)
86 } else if tm != nil {
87 assignParent(m, *tm, true)
88 }
89 return nil
90}
91
92// assignParent assigns threading fields to m that make it a child of parent message pm.
93// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
94func assignParent(m *Message, pm Message, updateSeen bool) {
95 if pm.ThreadID == 0 {
96 panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
97 }
98 if m.ID == pm.ID {
99 panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
100 }
101 m.ThreadID = pm.ThreadID
102 // Make sure we don't add cycles.
103 if !slices.Contains(pm.ThreadParentIDs, m.ID) {
104 m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
105 } else if pm.ID != m.ID {
106 m.ThreadParentIDs = []int64{pm.ID}
107 } else {
108 m.ThreadParentIDs = nil
109 }
110 if m.MessageID != "" && m.MessageID == pm.MessageID {
111 m.ThreadMissingLink = true
112 }
113 m.ThreadMuted = pm.ThreadMuted
114 m.ThreadCollapsed = pm.ThreadCollapsed
115 if updateSeen && m.ThreadMuted {
116 m.Seen = true
117 }
118}
119
120// ResetThreading resets the MessageID and SubjectBase fields for all messages in
121// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
122// are made in transactions of batchSize changes. The total number of updated
123// messages is returned.
124//
125// ModSeq is not changed. Calles should bump the uid validity of the mailboxes
126// to propagate the changes to IMAP clients.
127func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize int, clearIDs bool) (int, error) {
128 // todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?
129
130 var lastID int64
131 total := 0
132 for {
133 n := 0
134
135 prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
136 for {
137 w, ok := <-in
138 if !ok {
139 return
140 }
141
142 m := w.In
143
144 // We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
145 // struct so we don't generate so much garbage for the garbage collector to sift
146 // through.
147 var part struct {
148 Envelope *message.Envelope
149 }
150 if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
151 log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, slog.Int64("msgid", m.ID))
152 } else {
153 m.MessageID = ""
154 if part.Envelope != nil && part.Envelope.MessageID != "" {
155 s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
156 if err != nil {
157 log.Debugx("parsing message-id, skipping", err, slog.Int64("msgid", m.ID), slog.String("messageid", part.Envelope.MessageID))
158 }
159 m.MessageID = s
160 }
161 if part.Envelope != nil {
162 m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
163 }
164 }
165 w.Out = m
166
167 out <- w
168 }
169 }
170
171 err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
172 processMessage := func(in, m Message) error {
173 if clearIDs {
174 m.ThreadID = 0
175 m.ThreadParentIDs = nil
176 m.ThreadMissingLink = false
177 }
178 return tx.Update(&m)
179 }
180
181 // JSON parsing is relatively heavy, we benefit from multiple goroutines.
182 procs := runtime.GOMAXPROCS(0)
183 wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)
184
185 q := bstore.QueryTx[Message](tx)
186 q.FilterEqual("Expunged", false)
187 q.FilterGreater("ID", lastID)
188 q.SortAsc("ID")
189 err := q.ForEach(func(m Message) error {
190 // We process in batches so we don't block other operations for a long time.
191 if n >= batchSize {
192 return bstore.StopForEach
193 }
194 // Update starting point for next batch.
195 lastID = m.ID
196
197 n++
198 return wq.Add(m)
199 })
200 if err == nil {
201 err = wq.Finish()
202 }
203 wq.Stop()
204 return err
205 })
206 if err != nil {
207 return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
208 }
209 total += n
210 if n == 0 {
211 break
212 }
213 }
214 return total, nil
215}
216
217// AssignThreads assigns thread-related fields to messages with ID >=
218// startMessageID. Changes are committed each batchSize changes if txOpt is nil
219// (i.e. during automatic account upgrade, we don't want to block database access
220// for a long time). If txOpt is not nil, all changes are made in that
221// transaction.
222//
223// When resetting thread assignments, the caller must first clear the existing
224// thread fields.
225//
226// Messages are processed in order of ID, so when added to the account, not
227// necessarily by received/date. Most threaded messages can immediately be matched
228// to their parent message. If not, we keep track of the missing message-id and
229// resolve as soon as we encounter it. At the end, we resolve all remaining
230// messages, they start with a cycle.
231//
232// Does not set Seen flag for muted threads.
233//
234// Progress is written to progressWriter, every 100k messages.
235func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
236 // We use a more basic version of the thread-matching algorithm describe in:
237 // ../rfc/5256:443
238 // The algorithm assumes you'll select messages, then group into threads. We normally do
239 // thread-calculation when messages are delivered. Here, we assign threads as soon
240 // as we can, but will queue messages that reference known ancestors and resolve as
241 // soon as we process them. We can handle large number of messages, but not very
242 // quickly because we make lots of database queries.
243
244 type childMsg struct {
245 ID int64 // This message will be fetched and updated with the threading fields once the parent is resolved.
246 MessageID string // Of child message. Once child is resolved, its own children can be resolved too.
247 ThreadMissingLink bool
248 }
249 // Messages that have a References/In-Reply-To that we want to set as parent, but
250 // where the parent doesn't have a ThreadID yet are added to pending. The key is
251 // the normalized MessageID of the parent, and the value is a list of messages that
252 // can get resolved once the parent gets its ThreadID. The kids will get the same
253 // ThreadIDs, and they themselves may be parents to kids, and so on.
254 // For duplicate messages (messages with identical Message-ID), the second
255 // Message-ID to be added to pending is added under its own message-id, so it gets
256 // its original as parent.
257 pending := map[string][]childMsg{}
258
259 // Current tx. If not equal to txOpt, we clean it up before we leave.
260 var tx *bstore.Tx
261 defer func() {
262 if tx != nil && tx != txOpt {
263 err := tx.Rollback()
264 log.Check(err, "rolling back transaction")
265 }
266 }()
267
268 // Set thread-related fields for a single message. Caller must save the message,
269 // only if not an error and not added to the pending list.
270 assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
271 if m.MessageID != "" {
272 // Attempt to match against existing different message with same Message-ID that
273 // already has a threadid.
274 // If there are multiple messages for a message-id a future call to assign may use
275 // its threadid, or it may end up in pending and we resolve it when we need to.
276 q := bstore.QueryTx[Message](tx)
277 q.FilterNonzero(Message{MessageID: m.MessageID})
278 q.FilterEqual("Expunged", false)
279 q.FilterLess("ID", m.ID)
280 q.SortAsc("ID")
281 q.Limit(1)
282 em, err := q.Get()
283 if err != nil && err != bstore.ErrAbsent {
284 return false, fmt.Errorf("looking up existing message with message-id: %v", err)
285 } else if err == nil {
286 if em.ThreadID == 0 {
287 pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
288 return true, nil
289 } else {
290 assignParent(m, em, false)
291 return false, nil
292 }
293 }
294 }
295
296 refids, err := message.ReferencedIDs(references, inReplyTo)
297 if err != nil {
298 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
299 }
300
301 for i := len(refids) - 1; i >= 0; i-- {
302 messageID := refids[i]
303 if messageID == m.MessageID {
304 continue
305 }
306 tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
307 if err != nil {
308 return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
309 } else if tm != nil {
310 assignParent(m, *tm, false)
311 return false, nil
312 } else if exists {
313 pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
314 return true, nil
315 }
316 }
317
318 var subjectBase string
319 var isResp bool
320 if subject != "" {
321 subjectBase, isResp = message.ThreadSubject(subject, false)
322 }
323 if len(refids) > 0 || !isResp || subjectBase == "" {
324 m.ThreadID = m.ID
325 m.ThreadMissingLink = len(refids) > 0
326 return false, nil
327 }
328
329 // No references to use. If this is a reply/forward (based on subject), we'll match
330 // against base subject, at most 4 weeks back so we don't match against ancient
331 // messages and 1 day ahead so we can match against delayed deliveries.
332 tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
333 if err != nil {
334 return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
335 } else if tm != nil {
336 m.ThreadID = tm.ThreadID
337 m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
338 m.ThreadMissingLink = true
339 m.ThreadMuted = tm.ThreadMuted
340 m.ThreadCollapsed = tm.ThreadCollapsed
341 } else {
342 m.ThreadID = m.ID
343 }
344 return false, nil
345 }
346
347 npendingResolved := 0
348
349 // Resolve pending messages that wait on m.MessageID to be resolved, recursively.
350 var resolvePending func(tm Message, cyclic bool) error
351 resolvePending = func(tm Message, cyclic bool) error {
352 if tm.MessageID == "" {
353 return nil
354 }
355 l := pending[tm.MessageID]
356 delete(pending, tm.MessageID)
357 for _, mi := range l {
358 m := Message{ID: mi.ID}
359 if err := tx.Get(&m); err != nil {
360 return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
361 }
362 if m.ThreadID != 0 {
363 // ThreadID already set because this is a cyclic message. If we would assign a
364 // parent again, we would create a cycle.
365 if m.MessageID != tm.MessageID && !cyclic {
366 panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
367 }
368 continue
369 }
370 assignParent(&m, tm, false)
371 m.ThreadMissingLink = mi.ThreadMissingLink
372 if err := tx.Update(&m); err != nil {
373 return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
374 }
375 if err := resolvePending(m, cyclic); err != nil {
376 return err
377 }
378 npendingResolved++
379 }
380 return nil
381 }
382
383 // Output of the worker goroutines.
384 type threadPrep struct {
385 references []string
386 inReplyTo []string
387 subject string
388 }
389
390 // Single allocation.
391 threadingFields := [][]byte{
392 []byte("references"),
393 []byte("in-reply-to"),
394 []byte("subject"),
395 }
396
397 // Worker goroutine function. We start with a reasonably large buffer for reading
398 // the header into. And we have scratch space to copy the needed headers into. That
399 // means we normally won't allocate any more buffers.
400 prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
401 headerbuf := make([]byte, 8*1024)
402 scratch := make([]byte, 4*1024)
403 for {
404 w, ok := <-in
405 if !ok {
406 return
407 }
408
409 m := w.In
410 var partialPart struct {
411 HeaderOffset int64
412 BodyOffset int64
413 }
414 if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
415 w.Err = fmt.Errorf("unmarshal part: %v", err)
416 } else {
417 size := partialPart.BodyOffset - partialPart.HeaderOffset
418 if int(size) > len(headerbuf) {
419 headerbuf = make([]byte, size)
420 }
421 if size > 0 {
422 buf := headerbuf[:int(size)]
423 err := func() error {
424 mr := a.MessageReader(m)
425 defer mr.Close()
426
427 // ReadAt returns whole buffer or error. Single read should be fast.
428 n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
429 if err != nil || n != len(buf) {
430 return fmt.Errorf("read header: %v", err)
431 }
432 return nil
433 }()
434 if err != nil {
435 w.Err = err
436 } else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
437 w.Err = err
438 } else {
439 w.Out.references = h["References"]
440 w.Out.inReplyTo = h["In-Reply-To"]
441 l := h["Subject"]
442 if len(l) > 0 {
443 w.Out.subject = l[0]
444 }
445 }
446 }
447 }
448
449 out <- w
450 }
451 }
452
453 // Assign threads to messages, possibly in batches.
454 nassigned := 0
455 for {
456 n := 0
457 tx = txOpt
458 if tx == nil {
459 var err error
460 tx, err = a.DB.Begin(ctx, true)
461 if err != nil {
462 return fmt.Errorf("begin transaction: %w", err)
463 }
464 }
465
466 processMessage := func(m Message, prep threadPrep) error {
467 pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
468 if err != nil {
469 return fmt.Errorf("for msgid %d: %w", m.ID, err)
470 } else if pend {
471 return nil
472 }
473 if m.ThreadID == 0 {
474 panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
475 }
476 // Fields have been set, store in database and resolve messages waiting for this MessageID.
477 if slices.Contains(m.ThreadParentIDs, m.ID) {
478 panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
479 }
480 if err := tx.Update(&m); err != nil {
481 return err
482 }
483 if err := resolvePending(m, false); err != nil {
484 return fmt.Errorf("resolving pending message-id: %v", err)
485 }
486 return nil
487 }
488
489 // Use multiple worker goroutines to read parse headers from on-disk messages.
490 procs := runtime.GOMAXPROCS(0)
491 wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
492
493 // We assign threads in order by ID, so messages delivered in between our
494 // transaction will get assigned threads too: they'll have the highest id's.
495 q := bstore.QueryTx[Message](tx)
496 q.FilterGreaterEqual("ID", startMessageID)
497 q.FilterEqual("Expunged", false)
498 q.SortAsc("ID")
499 err := q.ForEach(func(m Message) error {
500 // Batch number of changes, so we give other users of account a change to run.
501 if txOpt == nil && n >= batchSize {
502 return bstore.StopForEach
503 }
504 // Starting point for next batch.
505 startMessageID = m.ID + 1
506 // Don't process again. Can happen when earlier upgrade was aborted.
507 if m.ThreadID != 0 {
508 return nil
509 }
510
511 n++
512 return wq.Add(m)
513 })
514 if err == nil {
515 err = wq.Finish()
516 }
517 wq.Stop()
518
519 if err == nil && txOpt == nil {
520 err = tx.Commit()
521 tx = nil
522 }
523 if err != nil {
524 return fmt.Errorf("assigning threads: %w", err)
525 }
526 if n == 0 {
527 break
528 }
529 nassigned += n
530 if nassigned%100000 == 0 {
531 log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
532 if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
533 return fmt.Errorf("writing progress: %v", err)
534 }
535 }
536 }
537 if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
538 return fmt.Errorf("writing progress: %v", err)
539 }
540
541 log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
542
543 if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
544 return fmt.Errorf("writing progress: %v", err)
545 }
546
547 // Remaining messages in pending have cycles and possibly tails. The cycle is at
548 // the head of the thread. Once we resolve that, the rest of the thread can be
549 // resolved too. Ignoring self-references (duplicate messages), there can only be
550 // one cycle, and it is at the head. So we look for cycles, ignoring
551 // self-references, and resolve a message as soon as we see the cycle.
552
553 parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
554 pendlist := []string{}
555 for pmsgid, l := range pending {
556 pendlist = append(pendlist, pmsgid)
557 for _, k := range l {
558 if k.MessageID == pmsgid {
559 // No self-references for duplicate messages.
560 continue
561 }
562 if _, ok := parent[k.MessageID]; !ok {
563 parent[k.MessageID] = pmsgid
564 }
565 // else, this message should be resolved by following pending.
566 }
567 }
568 sort.Strings(pendlist)
569
570 tx = txOpt
571 if tx == nil {
572 var err error
573 tx, err = a.DB.Begin(ctx, true)
574 if err != nil {
575 return fmt.Errorf("begin transaction: %w", err)
576 }
577 }
578
579 // We walk through all messages of pendlist, but some will already have been
580 // resolved by the time we get to them.
581 done := map[string]bool{}
582 for _, msgid := range pendlist {
583 if done[msgid] {
584 continue
585 }
586
587 // We walk up to parent, until we see a message-id we've already seen, a cycle.
588 seen := map[string]bool{}
589 for {
590 pmsgid, ok := parent[msgid]
591 if !ok {
592 panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
593 }
594 if !seen[pmsgid] {
595 seen[pmsgid] = true
596 msgid = pmsgid
597 continue
598 }
599
600 // Cycle detected. Make this message-id the thread root.
601 q := bstore.QueryTx[Message](tx)
602 q.FilterNonzero(Message{MessageID: msgid})
603 q.FilterEqual("ThreadID", int64(0))
604 q.FilterEqual("Expunged", false)
605 q.SortAsc("ID")
606 l, err := q.List()
607 if err == nil && len(l) == 0 {
608 err = errors.New("no messages")
609 }
610 if err != nil {
611 return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
612 }
613 for i, m := range l {
614 m.ThreadID = l[0].ID
615 m.ThreadMissingLink = true
616 if i == 0 {
617 m.ThreadParentIDs = nil
618 l[0] = m // For resolvePending below.
619 } else {
620 assignParent(&m, l[0], false)
621 }
622 if slices.Contains(m.ThreadParentIDs, m.ID) {
623 panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
624 }
625 if err := tx.Update(&m); err != nil {
626 return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
627 }
628 }
629
630 // Mark all children as done so we don't process these messages again.
631 walk := map[string]struct{}{msgid: {}}
632 for len(walk) > 0 {
633 for msgid := range walk {
634 delete(walk, msgid)
635 if done[msgid] {
636 continue
637 }
638 done[msgid] = true
639 for _, mi := range pending[msgid] {
640 if !done[mi.MessageID] {
641 walk[mi.MessageID] = struct{}{}
642 }
643 }
644 }
645 }
646
647 // Resolve all messages in this thread.
648 if err := resolvePending(l[0], true); err != nil {
649 return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
650 }
651
652 break
653 }
654 }
655
656 // Check that there are no more messages without threadid.
657 q := bstore.QueryTx[Message](tx)
658 q.FilterEqual("ThreadID", int64(0))
659 q.FilterEqual("Expunged", false)
660 l, err := q.List()
661 if err == nil && len(l) > 0 {
662 err = errors.New("found messages without threadid")
663 }
664 if err != nil {
665 return fmt.Errorf("listing messages without threadid: %v", err)
666 }
667
668 if txOpt == nil {
669 err := tx.Commit()
670 tx = nil
671 if err != nil {
672 return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
673 }
674 }
675 return nil
676}
677
678// lookupThreadMessage tries to find the parent message with messageID that must
679// have a matching subjectBase.
680//
681// If the message isn't present (with a valid thread id), a nil message and nil
682// error is returned. The bool return value indicates if a message with the
683// message-id exists at all.
684func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string) (*Message, bool, error) {
685 q := bstore.QueryTx[Message](tx)
686 q.FilterNonzero(Message{MessageID: messageID})
687 q.FilterEqual("SubjectBase", subjectBase)
688 q.FilterEqual("Expunged", false)
689 q.FilterNotEqual("ID", mID)
690 q.SortAsc("ID")
691 l, err := q.List()
692 if err != nil {
693 return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
694 }
695 exists := len(l) > 0
696 for _, tm := range l {
697 if tm.ThreadID != 0 {
698 return &tm, true, nil
699 }
700 }
701 return nil, exists, nil
702}
703
704// lookupThreadMessageSubject looks up a parent/ancestor message for the message
705// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
706//
707// If no message (with a threadid) is found a nil message and nil error is returned.
708func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
709 q := bstore.QueryTx[Message](tx)
710 q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
711 q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
712 q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
713 q.FilterEqual("Expunged", false)
714 q.FilterNotEqual("ID", m.ID)
715 q.FilterNotEqual("ThreadID", int64(0))
716 q.SortDesc("Received")
717 q.Limit(1)
718 tm, err := q.Get()
719 if err == bstore.ErrAbsent {
720 return nil, nil
721 } else if err != nil {
722 return nil, err
723 }
724 return &tm, nil
725}
726
727func upgradeThreads(ctx context.Context, log mlog.Log, acc *Account, up *Upgrade) error {
728 log = log.With(slog.String("account", acc.Name))
729
730 if up.Threads == 0 {
731 // Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
732 // message and the base subject for thread matching. This allows efficient thread
733 // lookup in the second step.
734
735 log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
736 t0 := time.Now()
737
738 const batchSize = 10000
739 total, err := acc.ResetThreading(ctx, log, batchSize, true)
740 if err != nil {
741 return fmt.Errorf("resetting message threading fields: %v", err)
742 }
743
744 up.Threads = 1
745 if err := acc.DB.Update(ctx, up); err != nil {
746 up.Threads = 0
747 return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
748 }
749 log.Info("upgrading account for threading, step 1/2: completed", slog.Duration("duration", time.Since(t0)), slog.Int("messages", total))
750 }
751
752 if up.Threads == 1 {
753 // Step 2 of the upgrade is going through all messages and assigning threadid's.
754 // Lookup of messageid and base subject is now fast through indexed database
755 // access.
756
757 log.Info("upgrading account for threading, step 2/2: matching messages to threads")
758 t0 := time.Now()
759
760 const batchSize = 10000
761 if err := acc.AssignThreads(ctx, log, nil, 1, batchSize, io.Discard); err != nil {
762 return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
763 }
764 up.Threads = 2
765 if err := acc.DB.Update(ctx, up); err != nil {
766 up.Threads = 1
767 return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
768 }
769 log.Info("upgrading account for threading, step 2/2: completed", slog.Duration("duration", time.Since(t0)))
770 }
771
772 // Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
773 // use threadid's before, so there is nothing to be out of date.
774
775 return nil
776}
777