1package store
2
3import (
4 "os"
5 "path/filepath"
6 "reflect"
7 "strings"
8 "testing"
9 "time"
10
11 "github.com/mjl-/bstore"
12
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
15)
16
17func TestThreadingUpgrade(t *testing.T) {
18 log := mlog.New("store", nil)
19 os.RemoveAll("../testdata/store/data")
20 mox.ConfigStaticPath = filepath.FromSlash("../testdata/store/mox.conf")
21 mox.MustLoadConfig(true, false)
22 defer Switchboard()()
23
24 acc, err := OpenAccount(log, "mjl", true)
25 tcheck(t, err, "open account")
26 defer func() {
27 err = acc.Close()
28 tcheck(t, err, "closing account")
29 acc.WaitClosed()
30 }()
31
32 // New account already has threading. Add some messages, check the threading.
33 deliver := func(recv time.Time, s string, expThreadID int64) Message {
34 t.Helper()
35 f, err := CreateMessageTemp(log, "account-test")
36 tcheck(t, err, "temp file")
37 defer os.Remove(f.Name())
38 defer f.Close()
39
40 s = strings.ReplaceAll(s, "\n", "\r\n")
41 m := Message{
42 Size: int64(len(s)),
43 MsgPrefix: []byte(s),
44 Received: recv,
45 }
46 acc.WithWLock(func() {
47 err = acc.DeliverMailbox(log, "Inbox", &m, f)
48 tcheck(t, err, "deliver")
49 })
50
51 if expThreadID == 0 {
52 expThreadID = m.ID
53 }
54 if m.ThreadID != expThreadID {
55 t.Fatalf("got threadid %d, expected %d", m.ThreadID, expThreadID)
56 }
57 return m
58 }
59
60 now := time.Now()
61
62 m0 := deliver(now, "Message-ID: <m0@localhost>\nSubject: test1\n\ntest\n", 0)
63 m1 := deliver(now, "Message-ID: <m1@localhost>\nReferences: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // References.
64 m2 := deliver(now, "Message-ID: <m2@localhost>\nReferences: <m0@localhost>\nSubject: other\n\ntest\n", 0) // References, but different subject.
65 m3 := deliver(now, "Message-ID: <m3@localhost>\nIn-Reply-To: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // In-Reply-To.
66 m4 := deliver(now, "Message-ID: <m4@localhost>\nSubject: re: test1\n\ntest\n", m0.ID) // Subject.
67 m5 := deliver(now, "Message-ID: <m5@localhost>\nSubject: test1 (fwd)\n\ntest\n", m0.ID) // Subject.
68 m6 := deliver(now, "Message-ID: <m6@localhost>\nSubject: [fwd: test1]\n\ntest\n", m0.ID) // Subject.
69 m7 := deliver(now, "Message-ID: <m7@localhost>\nSubject: test1\n\ntest\n", 0) // Only subject, but not a response.
70
71 // Thread with a cyclic head, a self-referencing message.
72 c1 := deliver(now, "Message-ID: <c1@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", 0) // Head cycle with m8.
73 c2 := deliver(now, "Message-ID: <c2@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Head cycle with c1.
74 c3 := deliver(now, "Message-ID: <c3@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to one of the cycle elements.
75 c4 := deliver(now, "Message-ID: <c4@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to other cycle element.
76 c5 := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
77 c5b := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Duplicate, e.g. Sent item, internal cycle during upgrade.
78 c6 := deliver(now, "Message-ID: <c6@localhost>\nReferences: <c5@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
79 c7 := deliver(now, "Message-ID: <c7@localhost>\nReferences: <c5@localhost> <c7@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Self-referencing message that also points to actual parent.
80
81 // More than 2 messages to make a cycle.
82 d0 := deliver(now, "Message-ID: <d0@localhost>\nReferences: <d2@localhost>\nSubject: cycle1\n\ntest\n", 0)
83 d1 := deliver(now, "Message-ID: <d1@localhost>\nReferences: <d0@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
84 d2 := deliver(now, "Message-ID: <d2@localhost>\nReferences: <d1@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
85
86 // Cycle with messages delivered later. During import/upgrade, they will all be one thread.
87 e0 := deliver(now, "Message-ID: <e0@localhost>\nReferences: <e1@localhost>\nSubject: cycle2\n\ntest\n", 0)
88 e1 := deliver(now, "Message-ID: <e1@localhost>\nReferences: <e2@localhost>\nSubject: cycle2\n\ntest\n", 0)
89 e2 := deliver(now, "Message-ID: <e2@localhost>\nReferences: <e0@localhost>\nSubject: cycle2\n\ntest\n", e0.ID)
90
91 // Three messages in a cycle (f1, f2, f3), with one with an additional ancestor (f4) which is ignored due to the cycle. Has different threads during import.
92 f0 := deliver(now, "Message-ID: <f0@localhost>\nSubject: cycle3\n\ntest\n", 0)
93 f1 := deliver(now, "Message-ID: <f1@localhost>\nReferences: <f0@localhost> <f2@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
94 f2 := deliver(now, "Message-ID: <f2@localhost>\nReferences: <f3@localhost>\nSubject: cycle3\n\ntest\n", 0)
95 f3 := deliver(now, "Message-ID: <f3@localhost>\nReferences: <f1@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
96
97 // Duplicate single message (no larger thread).
98 g0 := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", 0)
99 g0b := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", g0.ID)
100
101 // Duplicate message with a child message.
102 h0 := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", 0)
103 h0b := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
104 h1 := deliver(now, "Message-ID: <h1@localhost>\nReferences: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
105
106 // Message has itself as reference.
107 s0 := deliver(now, "Message-ID: <s0@localhost>\nReferences: <s0@localhost>\nSubject: self-referencing message\n\ntest\n", 0)
108
109 // Message with \0 in subject, should get an empty base subject.
110 b0 := deliver(now, "Message-ID: <b0@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0)
111 b1 := deliver(now, "Message-ID: <b1@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0) // Not matched.
112
113 // Interleaved duplicate threaded messages. First child, then parent, then duplicate parent, then duplicat child again.
114 i0 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
115 i1 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
116 i2 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i1.ID)
117 i3 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i0.ID)
118
119 j0 := deliver(now, "Message-ID: <j0@localhost>\nReferences: <>\nSubject: empty id in references\n\ntest\n", 0)
120
121 dbpath := acc.DBPath
122 err = acc.Close()
123 tcheck(t, err, "close account")
124 acc.WaitClosed()
125
126 // Now clear the threading upgrade, and the threading fields and close the account.
127 // We open the database file directly, so we don't trigger the consistency checker.
128 opts := bstore.Options{Timeout: 5 * time.Second, Perm: 0660, RegisterLogger: log.Logger}
129 db, err := bstore.Open(ctxbg, dbpath, &opts, DBTypes...)
130 err = db.Write(ctxbg, func(tx *bstore.Tx) error {
131 up := Upgrade{ID: 1}
132 err := tx.Delete(&up)
133 tcheck(t, err, "delete upgrade")
134
135 q := bstore.QueryTx[Message](tx)
136 _, err = q.UpdateFields(map[string]any{
137 "MessageID": "",
138 "SubjectBase": "",
139 "ThreadID": int64(0),
140 "ThreadParentIDs": []int64(nil),
141 "ThreadMissingLink": false,
142 })
143 return err
144 })
145 tcheck(t, err, "reset threading fields")
146 err = db.Close()
147 tcheck(t, err, "closing db")
148
149 // Open the account again, that should get the account upgraded. Wait for upgrade to finish.
150 acc, err = OpenAccount(log, "mjl", true)
151 tcheck(t, err, "open account")
152 err = acc.ThreadingWait(log)
153 tcheck(t, err, "wait for threading")
154
155 check := func(id int64, expThreadID int64, expParentIDs []int64, expMissingLink bool) {
156 t.Helper()
157
158 m := Message{ID: id}
159 err := acc.DB.Get(ctxbg, &m)
160 tcheck(t, err, "get message")
161 if m.ThreadID != expThreadID || !reflect.DeepEqual(m.ThreadParentIDs, expParentIDs) || m.ThreadMissingLink != expMissingLink {
162 t.Fatalf("got thread id %d, parent ids %v, missing link %v, expected %d %v %v", m.ThreadID, m.ThreadParentIDs, m.ThreadMissingLink, expThreadID, expParentIDs, expMissingLink)
163 }
164 }
165
166 parents0 := []int64{m0.ID}
167 check(m0.ID, m0.ID, nil, false)
168 check(m1.ID, m0.ID, parents0, false)
169 check(m2.ID, m2.ID, nil, true)
170 check(m3.ID, m0.ID, parents0, false)
171 check(m4.ID, m0.ID, parents0, true)
172 check(m5.ID, m0.ID, parents0, true)
173 check(m6.ID, m0.ID, parents0, true)
174 check(m7.ID, m7.ID, nil, false)
175
176 check(c1.ID, c1.ID, nil, true) // Head of cycle, hence missing link
177 check(c2.ID, c1.ID, []int64{c1.ID}, false)
178 check(c3.ID, c1.ID, []int64{c1.ID}, false)
179 check(c4.ID, c1.ID, []int64{c2.ID, c1.ID}, false)
180 check(c5.ID, c1.ID, []int64{c4.ID, c2.ID, c1.ID}, false)
181 check(c5b.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
182 check(c6.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, false)
183 check(c7.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
184
185 check(d0.ID, d0.ID, nil, true)
186 check(d1.ID, d0.ID, []int64{d0.ID}, false)
187 check(d2.ID, d0.ID, []int64{d1.ID, d0.ID}, false)
188
189 check(e0.ID, e0.ID, nil, true)
190 check(e1.ID, e0.ID, []int64{e2.ID, e0.ID}, false)
191 check(e2.ID, e0.ID, []int64{e0.ID}, false)
192
193 check(f0.ID, f0.ID, nil, false)
194 check(f1.ID, f1.ID, nil, true)
195 check(f2.ID, f1.ID, []int64{f3.ID, f1.ID}, false)
196 check(f3.ID, f1.ID, []int64{f1.ID}, false)
197
198 check(g0.ID, g0.ID, nil, false)
199 check(g0b.ID, g0.ID, []int64{g0.ID}, true)
200
201 check(h0.ID, h0.ID, nil, false)
202 check(h0b.ID, h0.ID, []int64{h0.ID}, true)
203 check(h1.ID, h0.ID, []int64{h0.ID}, false)
204
205 check(s0.ID, s0.ID, nil, true)
206
207 check(b0.ID, b0.ID, nil, false)
208 check(b1.ID, b1.ID, nil, false)
209
210 check(i0.ID, i1.ID, []int64{i1.ID}, false)
211 check(i1.ID, i1.ID, nil, false)
212 check(i2.ID, i1.ID, []int64{i1.ID}, true)
213 check(i3.ID, i1.ID, []int64{i0.ID, i1.ID}, true)
214
215 check(j0.ID, j0.ID, nil, false)
216}
217