1package store
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "runtime"
11 "slices"
12 "sort"
13 "time"
14
15 "github.com/mjl-/bstore"
16
17 "github.com/mjl-/mox/message"
18 "github.com/mjl-/mox/mlog"
19 "github.com/mjl-/mox/moxio"
20)
21
22// Assign a new/incoming message to a thread. Message does not yet have an ID. If
23// this isn't a response, ThreadID should remain 0 (unless this is a message with
24// existing message-id) and the caller must set ThreadID to ID.
25// If the account is still busy upgrading messages with threadids in the background, parents
26// may have a threadid 0. That results in this message getting threadid 0, which
27// will handled by the background upgrade process assigning a threadid when it gets
28// to this message.
29func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
30 if m.MessageID != "" {
31 // Match against existing different message with same Message-ID.
32 q := bstore.QueryTx[Message](tx)
33 q.FilterNonzero(Message{MessageID: m.MessageID})
34 q.FilterEqual("Expunged", false)
35 q.FilterNotEqual("ID", m.ID)
36 q.FilterNotEqual("ThreadID", int64(0))
37 q.SortAsc("ID")
38 q.Limit(1)
39 em, err := q.Get()
40 if err != nil && err != bstore.ErrAbsent {
41 return fmt.Errorf("looking up existing message with message-id: %v", err)
42 } else if err == nil {
43 assignParent(m, em, true)
44 return nil
45 }
46 }
47
48 h, err := part.Header()
49 if err != nil {
50 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
51 }
52 messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
53 if err != nil {
54 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
55 }
56 for i := len(messageIDs) - 1; i >= 0; i-- {
57 messageID := messageIDs[i]
58 if messageID == m.MessageID {
59 continue
60 }
61 tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
62 if err != nil {
63 return fmt.Errorf("looking up thread message for new message: %v", err)
64 } else if tm != nil {
65 assignParent(m, *tm, true)
66 return nil
67 }
68 m.ThreadMissingLink = true
69 }
70 if len(messageIDs) > 0 {
71 return nil
72 }
73
74 var isResp bool
75 if part != nil && part.Envelope != nil {
76 m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
77 }
78 if !isResp || m.SubjectBase == "" {
79 return nil
80 }
81 m.ThreadMissingLink = true
82 tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
83 if err != nil {
84 return fmt.Errorf("looking up thread message by subject: %v", err)
85 } else if tm != nil {
86 assignParent(m, *tm, true)
87 }
88 return nil
89}
90
91// assignParent assigns threading fields to m that make it a child of parent message pm.
92// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
93func assignParent(m *Message, pm Message, updateSeen bool) {
94 if pm.ThreadID == 0 {
95 panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
96 }
97 if m.ID == pm.ID {
98 panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
99 }
100 m.ThreadID = pm.ThreadID
101 // Make sure we don't add cycles.
102 if !slices.Contains(pm.ThreadParentIDs, m.ID) {
103 m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
104 } else if pm.ID != m.ID {
105 m.ThreadParentIDs = []int64{pm.ID}
106 } else {
107 m.ThreadParentIDs = nil
108 }
109 if m.MessageID != "" && m.MessageID == pm.MessageID {
110 m.ThreadMissingLink = true
111 }
112 m.ThreadMuted = pm.ThreadMuted
113 m.ThreadCollapsed = pm.ThreadCollapsed
114 if updateSeen && m.ThreadMuted {
115 m.Seen = true
116 }
117}
118
119// ResetThreading resets the MessageID and SubjectBase fields for all messages in
120// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
121// are made in transactions of batchSize changes. The total number of updated
122// messages is returned.
123//
124// ModSeq is not changed. Callers should bump the uid validity of the mailboxes
125// to propagate the changes to IMAP clients.
126func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize int, clearIDs bool) (int, error) {
127 // todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?
128
129 var lastID int64
130 total := 0
131 for {
132 n := 0
133
134 prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
135 for {
136 w, ok := <-in
137 if !ok {
138 return
139 }
140
141 m := w.In
142
143 // We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
144 // struct so we don't generate so much garbage for the garbage collector to sift
145 // through.
146 var part struct {
147 Envelope *message.Envelope
148 }
149 if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
150 log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, slog.Int64("msgid", m.ID))
151 } else {
152 m.MessageID = ""
153 if part.Envelope != nil && part.Envelope.MessageID != "" {
154 s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
155 if err != nil {
156 log.Debugx("parsing message-id, skipping", err, slog.Int64("msgid", m.ID), slog.String("messageid", part.Envelope.MessageID))
157 }
158 m.MessageID = s
159 }
160 if part.Envelope != nil {
161 m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
162 }
163 }
164 w.Out = m
165
166 out <- w
167 }
168 }
169
170 err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
171 processMessage := func(in, m Message) error {
172 if clearIDs {
173 m.ThreadID = 0
174 m.ThreadParentIDs = nil
175 m.ThreadMissingLink = false
176 }
177 return tx.Update(&m)
178 }
179
180 // JSON parsing is relatively heavy, we benefit from multiple goroutines.
181 procs := runtime.GOMAXPROCS(0)
182 wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)
183
184 q := bstore.QueryTx[Message](tx)
185 q.FilterEqual("Expunged", false)
186 q.FilterGreater("ID", lastID)
187 q.SortAsc("ID")
188 err := q.ForEach(func(m Message) error {
189 // We process in batches so we don't block other operations for a long time.
190 if n >= batchSize {
191 return bstore.StopForEach
192 }
193 // Update starting point for next batch.
194 lastID = m.ID
195
196 n++
197 return wq.Add(m)
198 })
199 if err == nil {
200 err = wq.Finish()
201 }
202 wq.Stop()
203 return err
204 })
205 if err != nil {
206 return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
207 }
208 total += n
209 if n == 0 {
210 break
211 }
212 }
213 return total, nil
214}
215
216// AssignThreads assigns thread-related fields to messages with ID >=
217// startMessageID. Changes are committed each batchSize changes if txOpt is nil
218// (i.e. during automatic account upgrade, we don't want to block database access
219// for a long time). If txOpt is not nil, all changes are made in that
220// transaction.
221//
222// When resetting thread assignments, the caller must first clear the existing
223// thread fields.
224//
225// Messages are processed in order of ID, so when added to the account, not
226// necessarily by received/date. Most threaded messages can immediately be matched
227// to their parent message. If not, we keep track of the missing message-id and
228// resolve as soon as we encounter it. At the end, we resolve all remaining
229// messages, they start with a cycle.
230//
231// Does not set Seen flag for muted threads.
232//
233// Progress is written to progressWriter, every 100k messages.
234func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, xprogressWriter io.Writer) error {
235 // We use a more basic version of the thread-matching algorithm describe in:
236 // ../rfc/5256:443
237 // The algorithm assumes you'll select messages, then group into threads. We normally do
238 // thread-calculation when messages are delivered. Here, we assign threads as soon
239 // as we can, but will queue messages that reference known ancestors and resolve as
240 // soon as we process them. We can handle large number of messages, but not very
241 // quickly because we make lots of database queries.
242
243 // xprogressWriter can call panic on write errors, when assigning threads through a
244 // ctl command.
245
246 type childMsg struct {
247 ID int64 // This message will be fetched and updated with the threading fields once the parent is resolved.
248 MessageID string // Of child message. Once child is resolved, its own children can be resolved too.
249 ThreadMissingLink bool
250 }
251 // Messages that have a References/In-Reply-To that we want to set as parent, but
252 // where the parent doesn't have a ThreadID yet are added to pending. The key is
253 // the normalized MessageID of the parent, and the value is a list of messages that
254 // can get resolved once the parent gets its ThreadID. The kids will get the same
255 // ThreadIDs, and they themselves may be parents to kids, and so on.
256 // For duplicate messages (messages with identical Message-ID), the second
257 // Message-ID to be added to pending is added under its own message-id, so it gets
258 // its original as parent.
259 pending := map[string][]childMsg{}
260
261 // Current tx. If not equal to txOpt, we clean it up before we leave.
262 var tx *bstore.Tx
263 defer func() {
264 if tx != nil && tx != txOpt {
265 err := tx.Rollback()
266 log.Check(err, "rolling back transaction")
267 }
268 }()
269
270 // Set thread-related fields for a single message. Caller must save the message,
271 // only if not an error and not added to the pending list.
272 assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
273 if m.MessageID != "" {
274 // Attempt to match against existing different message with same Message-ID that
275 // already has a threadid.
276 // If there are multiple messages for a message-id a future call to assign may use
277 // its threadid, or it may end up in pending and we resolve it when we need to.
278 q := bstore.QueryTx[Message](tx)
279 q.FilterNonzero(Message{MessageID: m.MessageID})
280 q.FilterEqual("Expunged", false)
281 q.FilterLess("ID", m.ID)
282 q.SortAsc("ID")
283 q.Limit(1)
284 em, err := q.Get()
285 if err != nil && err != bstore.ErrAbsent {
286 return false, fmt.Errorf("looking up existing message with message-id: %v", err)
287 } else if err == nil {
288 if em.ThreadID == 0 {
289 pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
290 return true, nil
291 } else {
292 assignParent(m, em, false)
293 return false, nil
294 }
295 }
296 }
297
298 refids, err := message.ReferencedIDs(references, inReplyTo)
299 if err != nil {
300 log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
301 }
302
303 for i := len(refids) - 1; i >= 0; i-- {
304 messageID := refids[i]
305 if messageID == m.MessageID {
306 continue
307 }
308 tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
309 if err != nil {
310 return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
311 } else if tm != nil {
312 assignParent(m, *tm, false)
313 return false, nil
314 } else if exists {
315 pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
316 return true, nil
317 }
318 }
319
320 var subjectBase string
321 var isResp bool
322 if subject != "" {
323 subjectBase, isResp = message.ThreadSubject(subject, false)
324 }
325 if len(refids) > 0 || !isResp || subjectBase == "" {
326 m.ThreadID = m.ID
327 m.ThreadMissingLink = len(refids) > 0
328 return false, nil
329 }
330
331 // No references to use. If this is a reply/forward (based on subject), we'll match
332 // against base subject, at most 4 weeks back so we don't match against ancient
333 // messages and 1 day ahead so we can match against delayed deliveries.
334 tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
335 if err != nil {
336 return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
337 } else if tm != nil {
338 m.ThreadID = tm.ThreadID
339 m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
340 m.ThreadMissingLink = true
341 m.ThreadMuted = tm.ThreadMuted
342 m.ThreadCollapsed = tm.ThreadCollapsed
343 } else {
344 m.ThreadID = m.ID
345 }
346 return false, nil
347 }
348
349 npendingResolved := 0
350
351 // Resolve pending messages that wait on m.MessageID to be resolved, recursively.
352 var resolvePending func(tm Message, cyclic bool) error
353 resolvePending = func(tm Message, cyclic bool) error {
354 if tm.MessageID == "" {
355 return nil
356 }
357 l := pending[tm.MessageID]
358 delete(pending, tm.MessageID)
359 for _, mi := range l {
360 m := Message{ID: mi.ID}
361 if err := tx.Get(&m); err != nil {
362 return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
363 } else if m.Expunged {
364 return fmt.Errorf("message %d marked as expunged", mi.ID)
365 }
366 if m.ThreadID != 0 {
367 // ThreadID already set because this is a cyclic message. If we would assign a
368 // parent again, we would create a cycle.
369 if m.MessageID != tm.MessageID && !cyclic {
370 panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
371 }
372 continue
373 }
374 assignParent(&m, tm, false)
375 m.ThreadMissingLink = mi.ThreadMissingLink
376 if err := tx.Update(&m); err != nil {
377 return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
378 }
379 if err := resolvePending(m, cyclic); err != nil {
380 return err
381 }
382 npendingResolved++
383 }
384 return nil
385 }
386
387 // Output of the worker goroutines.
388 type threadPrep struct {
389 references []string
390 inReplyTo []string
391 subject string
392 }
393
394 // Single allocation.
395 threadingFields := [][]byte{
396 []byte("references"),
397 []byte("in-reply-to"),
398 []byte("subject"),
399 }
400
401 // Worker goroutine function. We start with a reasonably large buffer for reading
402 // the header into. And we have scratch space to copy the needed headers into. That
403 // means we normally won't allocate any more buffers.
404 prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
405 headerbuf := make([]byte, 8*1024)
406 scratch := make([]byte, 4*1024)
407 for {
408 w, ok := <-in
409 if !ok {
410 return
411 }
412
413 m := w.In
414 var partialPart struct {
415 HeaderOffset int64
416 BodyOffset int64
417 }
418 if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
419 w.Err = fmt.Errorf("unmarshal part: %v", err)
420 } else {
421 size := partialPart.BodyOffset - partialPart.HeaderOffset
422 if int(size) > len(headerbuf) {
423 headerbuf = make([]byte, size)
424 }
425 if size > 0 {
426 buf := headerbuf[:int(size)]
427 err := func() error {
428 mr := a.MessageReader(m)
429 defer func() {
430 err := mr.Close()
431 log.Check(err, "closing message reader")
432 }()
433
434 // ReadAt returns whole buffer or error. Single read should be fast.
435 n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
436 if err != nil || n != len(buf) {
437 return fmt.Errorf("read header: %v", err)
438 }
439 return nil
440 }()
441 if err != nil {
442 w.Err = err
443 } else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
444 w.Err = err
445 } else {
446 w.Out.references = h["References"]
447 w.Out.inReplyTo = h["In-Reply-To"]
448 l := h["Subject"]
449 if len(l) > 0 {
450 w.Out.subject = l[0]
451 }
452 }
453 }
454 }
455
456 out <- w
457 }
458 }
459
460 // Assign threads to messages, possibly in batches.
461 nassigned := 0
462 for {
463 n := 0
464 tx = txOpt
465 if tx == nil {
466 var err error
467 tx, err = a.DB.Begin(ctx, true)
468 if err != nil {
469 return fmt.Errorf("begin transaction: %w", err)
470 }
471 }
472
473 processMessage := func(m Message, prep threadPrep) error {
474 pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
475 if err != nil {
476 return fmt.Errorf("for msgid %d: %w", m.ID, err)
477 } else if pend {
478 return nil
479 }
480 if m.ThreadID == 0 {
481 panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
482 }
483 // Fields have been set, store in database and resolve messages waiting for this MessageID.
484 if slices.Contains(m.ThreadParentIDs, m.ID) {
485 panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
486 }
487 if err := tx.Update(&m); err != nil {
488 return err
489 }
490 if err := resolvePending(m, false); err != nil {
491 return fmt.Errorf("resolving pending message-id: %v", err)
492 }
493 return nil
494 }
495
496 // Use multiple worker goroutines to parse headers from on-disk messages.
497 procs := runtime.GOMAXPROCS(0)
498 wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
499
500 // We assign threads in order by ID, so messages delivered in between our
501 // transaction will get assigned threads too: they'll have the highest id's.
502 q := bstore.QueryTx[Message](tx)
503 q.FilterGreaterEqual("ID", startMessageID)
504 q.FilterEqual("Expunged", false)
505 q.SortAsc("ID")
506 err := q.ForEach(func(m Message) error {
507 // Batch number of changes, so we give other users of account a change to run.
508 if txOpt == nil && n >= batchSize {
509 return bstore.StopForEach
510 }
511 // Starting point for next batch.
512 startMessageID = m.ID + 1
513 // Don't process again. Can happen when earlier upgrade was aborted.
514 if m.ThreadID != 0 {
515 return nil
516 }
517
518 n++
519 return wq.Add(m)
520 })
521 if err == nil {
522 err = wq.Finish()
523 }
524 wq.Stop()
525
526 if err == nil && txOpt == nil {
527 err = tx.Commit()
528 tx = nil
529 }
530 if err != nil {
531 return fmt.Errorf("assigning threads: %w", err)
532 }
533 if n == 0 {
534 break
535 }
536 nassigned += n
537 if nassigned%100000 == 0 {
538 log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
539 if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
540 return fmt.Errorf("writing progress: %v", err)
541 }
542 }
543 }
544 if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
545 return fmt.Errorf("writing progress: %v", err)
546 }
547
548 log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
549
550 if _, err := fmt.Fprintf(xprogressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
551 return fmt.Errorf("writing progress: %v", err)
552 }
553
554 // Remaining messages in pending have cycles and possibly tails. The cycle is at
555 // the head of the thread. Once we resolve that, the rest of the thread can be
556 // resolved too. Ignoring self-references (duplicate messages), there can only be
557 // one cycle, and it is at the head. So we look for cycles, ignoring
558 // self-references, and resolve a message as soon as we see the cycle.
559
560 parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
561 pendlist := []string{}
562 for pmsgid, l := range pending {
563 pendlist = append(pendlist, pmsgid)
564 for _, k := range l {
565 if k.MessageID == pmsgid {
566 // No self-references for duplicate messages.
567 continue
568 }
569 if _, ok := parent[k.MessageID]; !ok {
570 parent[k.MessageID] = pmsgid
571 }
572 // else, this message should be resolved by following pending.
573 }
574 }
575 sort.Strings(pendlist)
576
577 tx = txOpt
578 if tx == nil {
579 var err error
580 tx, err = a.DB.Begin(ctx, true)
581 if err != nil {
582 return fmt.Errorf("begin transaction: %w", err)
583 }
584 }
585
586 // We walk through all messages of pendlist, but some will already have been
587 // resolved by the time we get to them.
588 done := map[string]bool{}
589 for _, msgid := range pendlist {
590 if done[msgid] {
591 continue
592 }
593
594 // We walk up to parent, until we see a message-id we've already seen, a cycle.
595 seen := map[string]bool{}
596 for {
597 pmsgid, ok := parent[msgid]
598 if !ok {
599 panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
600 }
601 if !seen[pmsgid] {
602 seen[pmsgid] = true
603 msgid = pmsgid
604 continue
605 }
606
607 // Cycle detected. Make this message-id the thread root.
608 q := bstore.QueryTx[Message](tx)
609 q.FilterNonzero(Message{MessageID: msgid})
610 q.FilterEqual("ThreadID", int64(0))
611 q.FilterEqual("Expunged", false)
612 q.SortAsc("ID")
613 l, err := q.List()
614 if err == nil && len(l) == 0 {
615 err = errors.New("no messages")
616 }
617 if err != nil {
618 return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
619 }
620 for i, m := range l {
621 m.ThreadID = l[0].ID
622 m.ThreadMissingLink = true
623 if i == 0 {
624 m.ThreadParentIDs = nil
625 l[0] = m // For resolvePending below.
626 } else {
627 assignParent(&m, l[0], false)
628 }
629 if slices.Contains(m.ThreadParentIDs, m.ID) {
630 panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
631 }
632 if err := tx.Update(&m); err != nil {
633 return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
634 }
635 }
636
637 // Mark all children as done so we don't process these messages again.
638 walk := map[string]struct{}{msgid: {}}
639 for len(walk) > 0 {
640 for msgid := range walk {
641 delete(walk, msgid)
642 if done[msgid] {
643 continue
644 }
645 done[msgid] = true
646 for _, mi := range pending[msgid] {
647 if !done[mi.MessageID] {
648 walk[mi.MessageID] = struct{}{}
649 }
650 }
651 }
652 }
653
654 // Resolve all messages in this thread.
655 if err := resolvePending(l[0], true); err != nil {
656 return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
657 }
658
659 break
660 }
661 }
662
663 // Check that there are no more messages without threadid.
664 q := bstore.QueryTx[Message](tx)
665 q.FilterEqual("ThreadID", int64(0))
666 q.FilterEqual("Expunged", false)
667 l, err := q.List()
668 if err == nil && len(l) > 0 {
669 err = errors.New("found messages without threadid")
670 }
671 if err != nil {
672 return fmt.Errorf("listing messages without threadid: %v", err)
673 }
674
675 if txOpt == nil {
676 err := tx.Commit()
677 tx = nil
678 if err != nil {
679 return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
680 }
681 }
682 return nil
683}
684
685// lookupThreadMessage tries to find the parent message with messageID, that must
686// have a matching subjectBase (unless it is a DSN).
687//
688// If the message isn't present (with a valid thread id), a nil message and nil
689// error is returned. The bool return value indicates if a message with the
690// message-id exists at all.
691func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string, isDSN bool) (*Message, bool, error) {
692 q := bstore.QueryTx[Message](tx)
693 q.FilterNonzero(Message{MessageID: messageID})
694 if !isDSN {
695 q.FilterEqual("SubjectBase", subjectBase)
696 }
697 q.FilterEqual("Expunged", false)
698 q.FilterNotEqual("ID", mID)
699 q.SortAsc("ID")
700 l, err := q.List()
701 if err != nil {
702 return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
703 }
704 exists := len(l) > 0
705 for _, tm := range l {
706 if tm.ThreadID != 0 {
707 return &tm, true, nil
708 }
709 }
710 return nil, exists, nil
711}
712
713// lookupThreadMessageSubject looks up a parent/ancestor message for the message
714// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
715//
716// If no message (with a threadid) is found a nil message and nil error is returned.
717func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
718 q := bstore.QueryTx[Message](tx)
719 q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
720 q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
721 q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
722 q.FilterEqual("Expunged", false)
723 q.FilterNotEqual("ID", m.ID)
724 q.FilterNotEqual("ThreadID", int64(0))
725 q.SortDesc("Received")
726 q.Limit(1)
727 tm, err := q.Get()
728 if err == bstore.ErrAbsent {
729 return nil, nil
730 } else if err != nil {
731 return nil, err
732 }
733 return &tm, nil
734}
735
736func upgradeThreads(ctx context.Context, log mlog.Log, acc *Account, up Upgrade) error {
737 log = log.With(slog.String("account", acc.Name))
738
739 if up.Threads == 0 {
740 // Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
741 // message and the base subject for thread matching. This allows efficient thread
742 // lookup in the second step.
743
744 log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
745 t0 := time.Now()
746
747 const batchSize = 10000
748 total, err := acc.ResetThreading(ctx, log, batchSize, true)
749 if err != nil {
750 return fmt.Errorf("resetting message threading fields: %v", err)
751 }
752
753 // Must refresh up, it may have been modified by another upgrade progress.
754 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
755 up = Upgrade{ID: up.ID}
756 if err := tx.Get(&up); err != nil {
757 return err
758 }
759 up.Threads = 1
760 return tx.Update(&up)
761 })
762 if err != nil {
763 return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
764 }
765 log.Info("upgrading account for threading, step 1/2: completed", slog.Duration("duration", time.Since(t0)), slog.Int("messages", total))
766 }
767
768 if up.Threads == 1 {
769 // Step 2 of the upgrade is going through all messages and assigning threadid's.
770 // Lookup of messageid and base subject is now fast through indexed database
771 // access.
772
773 log.Info("upgrading account for threading, step 2/2: matching messages to threads")
774 t0 := time.Now()
775
776 const batchSize = 10000
777 if err := acc.AssignThreads(ctx, log, nil, 1, batchSize, io.Discard); err != nil {
778 return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
779 }
780
781 // Must refresh up, it may have been modified by another upgrade progress.
782 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
783 up = Upgrade{ID: up.ID}
784 if err := tx.Get(&up); err != nil {
785 return err
786 }
787 up.Threads = 2
788 return tx.Update(&up)
789 })
790 if err != nil {
791 return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
792 }
793
794 log.Info("upgrading account for threading, step 2/2: completed", slog.Duration("duration", time.Since(t0)))
795 }
796
797 // Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
798 // use threadid's before, so there is nothing to be out of date.
799
800 return nil
801}
802