1package store
2
3import (
4 "os"
5 "path/filepath"
6 "reflect"
7 "strings"
8 "testing"
9 "time"
10
11 "github.com/mjl-/bstore"
12
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
15)
16
17func TestThreadingUpgrade(t *testing.T) {
18 log := mlog.New("store", nil)
19 os.RemoveAll("../testdata/store/data")
20 mox.ConfigStaticPath = filepath.FromSlash("../testdata/store/mox.conf")
21 mox.MustLoadConfig(true, false)
22 acc, err := OpenAccount(log, "mjl")
23 tcheck(t, err, "open account")
24 defer func() {
25 err = acc.Close()
26 tcheck(t, err, "closing account")
27 acc.CheckClosed()
28 }()
29 defer Switchboard()()
30
31 // New account already has threading. Add some messages, check the threading.
32 deliver := func(recv time.Time, s string, expThreadID int64) Message {
33 t.Helper()
34 f, err := CreateMessageTemp(log, "account-test")
35 tcheck(t, err, "temp file")
36 defer os.Remove(f.Name())
37 defer f.Close()
38
39 s = strings.ReplaceAll(s, "\n", "\r\n")
40 m := Message{
41 Size: int64(len(s)),
42 MsgPrefix: []byte(s),
43 Received: recv,
44 }
45 err = acc.DeliverMailbox(log, "Inbox", &m, f)
46 tcheck(t, err, "deliver")
47 if expThreadID == 0 {
48 expThreadID = m.ID
49 }
50 if m.ThreadID != expThreadID {
51 t.Fatalf("got threadid %d, expected %d", m.ThreadID, expThreadID)
52 }
53 return m
54 }
55
56 now := time.Now()
57
58 m0 := deliver(now, "Message-ID: <m0@localhost>\nSubject: test1\n\ntest\n", 0)
59 m1 := deliver(now, "Message-ID: <m1@localhost>\nReferences: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // References.
60 m2 := deliver(now, "Message-ID: <m2@localhost>\nReferences: <m0@localhost>\nSubject: other\n\ntest\n", 0) // References, but different subject.
61 m3 := deliver(now, "Message-ID: <m3@localhost>\nIn-Reply-To: <m0@localhost>\nSubject: test1\n\ntest\n", m0.ID) // In-Reply-To.
62 m4 := deliver(now, "Message-ID: <m4@localhost>\nSubject: re: test1\n\ntest\n", m0.ID) // Subject.
63 m5 := deliver(now, "Message-ID: <m5@localhost>\nSubject: test1 (fwd)\n\ntest\n", m0.ID) // Subject.
64 m6 := deliver(now, "Message-ID: <m6@localhost>\nSubject: [fwd: test1]\n\ntest\n", m0.ID) // Subject.
65 m7 := deliver(now, "Message-ID: <m7@localhost>\nSubject: test1\n\ntest\n", 0) // Only subject, but not a response.
66
67 // Thread with a cyclic head, a self-referencing message.
68 c1 := deliver(now, "Message-ID: <c1@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", 0) // Head cycle with m8.
69 c2 := deliver(now, "Message-ID: <c2@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Head cycle with c1.
70 c3 := deliver(now, "Message-ID: <c3@localhost>\nReferences: <c1@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to one of the cycle elements.
71 c4 := deliver(now, "Message-ID: <c4@localhost>\nReferences: <c2@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Connected to other cycle element.
72 c5 := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
73 c5b := deliver(now, "Message-ID: <c5@localhost>\nReferences: <c4@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Duplicate, e.g. Sent item, internal cycle during upgrade.
74 c6 := deliver(now, "Message-ID: <c6@localhost>\nReferences: <c5@localhost>\nSubject: cycle0\n\ntest\n", c1.ID)
75 c7 := deliver(now, "Message-ID: <c7@localhost>\nReferences: <c5@localhost> <c7@localhost>\nSubject: cycle0\n\ntest\n", c1.ID) // Self-referencing message that also points to actual parent.
76
77 // More than 2 messages to make a cycle.
78 d0 := deliver(now, "Message-ID: <d0@localhost>\nReferences: <d2@localhost>\nSubject: cycle1\n\ntest\n", 0)
79 d1 := deliver(now, "Message-ID: <d1@localhost>\nReferences: <d0@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
80 d2 := deliver(now, "Message-ID: <d2@localhost>\nReferences: <d1@localhost>\nSubject: cycle1\n\ntest\n", d0.ID)
81
82 // Cycle with messages delivered later. During import/upgrade, they will all be one thread.
83 e0 := deliver(now, "Message-ID: <e0@localhost>\nReferences: <e1@localhost>\nSubject: cycle2\n\ntest\n", 0)
84 e1 := deliver(now, "Message-ID: <e1@localhost>\nReferences: <e2@localhost>\nSubject: cycle2\n\ntest\n", 0)
85 e2 := deliver(now, "Message-ID: <e2@localhost>\nReferences: <e0@localhost>\nSubject: cycle2\n\ntest\n", e0.ID)
86
87 // Three messages in a cycle (f1, f2, f3), with one with an additional ancestor (f4) which is ignored due to the cycle. Has different threads during import.
88 f0 := deliver(now, "Message-ID: <f0@localhost>\nSubject: cycle3\n\ntest\n", 0)
89 f1 := deliver(now, "Message-ID: <f1@localhost>\nReferences: <f0@localhost> <f2@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
90 f2 := deliver(now, "Message-ID: <f2@localhost>\nReferences: <f3@localhost>\nSubject: cycle3\n\ntest\n", 0)
91 f3 := deliver(now, "Message-ID: <f3@localhost>\nReferences: <f1@localhost>\nSubject: cycle3\n\ntest\n", f0.ID)
92
93 // Duplicate single message (no larger thread).
94 g0 := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", 0)
95 g0b := deliver(now, "Message-ID: <g0@localhost>\nSubject: dup\n\ntest\n", g0.ID)
96
97 // Duplicate message with a child message.
98 h0 := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", 0)
99 h0b := deliver(now, "Message-ID: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
100 h1 := deliver(now, "Message-ID: <h1@localhost>\nReferences: <h0@localhost>\nSubject: dup2\n\ntest\n", h0.ID)
101
102 // Message has itself as reference.
103 s0 := deliver(now, "Message-ID: <s0@localhost>\nReferences: <s0@localhost>\nSubject: self-referencing message\n\ntest\n", 0)
104
105 // Message with \0 in subject, should get an empty base subject.
106 b0 := deliver(now, "Message-ID: <b0@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0)
107 b1 := deliver(now, "Message-ID: <b1@localhost>\nSubject: bad\u0000subject\n\ntest\n", 0) // Not matched.
108
109 // Interleaved duplicate threaded messages. First child, then parent, then duplicate parent, then duplicat child again.
110 i0 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
111 i1 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", 0)
112 i2 := deliver(now, "Message-ID: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i1.ID)
113 i3 := deliver(now, "Message-ID: <i0@localhost>\nReferences: <i1@localhost>\nSubject: interleaved duplicate\n\ntest\n", i0.ID)
114
115 j0 := deliver(now, "Message-ID: <j0@localhost>\nReferences: <>\nSubject: empty id in references\n\ntest\n", 0)
116
117 dbpath := acc.DBPath
118 err = acc.Close()
119 tcheck(t, err, "close account")
120 acc.CheckClosed()
121
122 // Now clear the threading upgrade, and the threading fields and close the account.
123 // We open the database file directly, so we don't trigger the consistency checker.
124 opts := bstore.Options{Timeout: 5 * time.Second, Perm: 0660, RegisterLogger: log.Logger}
125 db, err := bstore.Open(ctxbg, dbpath, &opts, DBTypes...)
126 err = db.Write(ctxbg, func(tx *bstore.Tx) error {
127 up := Upgrade{ID: 1}
128 err := tx.Delete(&up)
129 tcheck(t, err, "delete upgrade")
130
131 q := bstore.QueryTx[Message](tx)
132 _, err = q.UpdateFields(map[string]any{
133 "MessageID": "",
134 "SubjectBase": "",
135 "ThreadID": int64(0),
136 "ThreadParentIDs": []int64(nil),
137 "ThreadMissingLink": false,
138 })
139 return err
140 })
141 tcheck(t, err, "reset threading fields")
142 err = db.Close()
143 tcheck(t, err, "closing db")
144
145 // Open the account again, that should get the account upgraded. Wait for upgrade to finish.
146 acc, err = OpenAccount(log, "mjl")
147 tcheck(t, err, "open account")
148 err = acc.ThreadingWait(log)
149 tcheck(t, err, "wait for threading")
150
151 check := func(id int64, expThreadID int64, expParentIDs []int64, expMissingLink bool) {
152 t.Helper()
153
154 m := Message{ID: id}
155 err := acc.DB.Get(ctxbg, &m)
156 tcheck(t, err, "get message")
157 if m.ThreadID != expThreadID || !reflect.DeepEqual(m.ThreadParentIDs, expParentIDs) || m.ThreadMissingLink != expMissingLink {
158 t.Fatalf("got thread id %d, parent ids %v, missing link %v, expected %d %v %v", m.ThreadID, m.ThreadParentIDs, m.ThreadMissingLink, expThreadID, expParentIDs, expMissingLink)
159 }
160 }
161
162 parents0 := []int64{m0.ID}
163 check(m0.ID, m0.ID, nil, false)
164 check(m1.ID, m0.ID, parents0, false)
165 check(m2.ID, m2.ID, nil, true)
166 check(m3.ID, m0.ID, parents0, false)
167 check(m4.ID, m0.ID, parents0, true)
168 check(m5.ID, m0.ID, parents0, true)
169 check(m6.ID, m0.ID, parents0, true)
170 check(m7.ID, m7.ID, nil, false)
171
172 check(c1.ID, c1.ID, nil, true) // Head of cycle, hence missing link
173 check(c2.ID, c1.ID, []int64{c1.ID}, false)
174 check(c3.ID, c1.ID, []int64{c1.ID}, false)
175 check(c4.ID, c1.ID, []int64{c2.ID, c1.ID}, false)
176 check(c5.ID, c1.ID, []int64{c4.ID, c2.ID, c1.ID}, false)
177 check(c5b.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
178 check(c6.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, false)
179 check(c7.ID, c1.ID, []int64{c5.ID, c4.ID, c2.ID, c1.ID}, true)
180
181 check(d0.ID, d0.ID, nil, true)
182 check(d1.ID, d0.ID, []int64{d0.ID}, false)
183 check(d2.ID, d0.ID, []int64{d1.ID, d0.ID}, false)
184
185 check(e0.ID, e0.ID, nil, true)
186 check(e1.ID, e0.ID, []int64{e2.ID, e0.ID}, false)
187 check(e2.ID, e0.ID, []int64{e0.ID}, false)
188
189 check(f0.ID, f0.ID, nil, false)
190 check(f1.ID, f1.ID, nil, true)
191 check(f2.ID, f1.ID, []int64{f3.ID, f1.ID}, false)
192 check(f3.ID, f1.ID, []int64{f1.ID}, false)
193
194 check(g0.ID, g0.ID, nil, false)
195 check(g0b.ID, g0.ID, []int64{g0.ID}, true)
196
197 check(h0.ID, h0.ID, nil, false)
198 check(h0b.ID, h0.ID, []int64{h0.ID}, true)
199 check(h1.ID, h0.ID, []int64{h0.ID}, false)
200
201 check(s0.ID, s0.ID, nil, true)
202
203 check(b0.ID, b0.ID, nil, false)
204 check(b1.ID, b1.ID, nil, false)
205
206 check(i0.ID, i1.ID, []int64{i1.ID}, false)
207 check(i1.ID, i1.ID, nil, false)
208 check(i2.ID, i1.ID, []int64{i1.ID}, true)
209 check(i3.ID, i1.ID, []int64{i0.ID, i1.ID}, true)
210
211 check(j0.ID, j0.ID, nil, false)
212}
213