1package store
2
3import (
4 "fmt"
5 "log/slog"
6 "os"
7 "sync"
8 "sync/atomic"
9
10 "github.com/mjl-/bstore"
11
12 "github.com/mjl-/mox/metrics"
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
15)
16
17var (
18 register = make(chan *Comm)
19 unregister = make(chan *Comm)
20 broadcast = make(chan changeReq)
21 applied = make(chan removalApplied)
22)
23
24type changeReq struct {
25 acc *Account
26 comm *Comm // Can be nil.
27 changes []Change
28 done chan struct{}
29}
30
31type removalApplied struct {
32 Account *Account
33 MsgIDs []int64
34}
35
36type UID uint32 // IMAP UID.
37
38// Change to mailboxes/subscriptions/messages in an account. One of the Change*
39// types in this package.
40type Change interface {
41 ChangeModSeq() ModSeq // returns -1 for "modseq not applicable"
42}
43
44// ChangeAddUID is sent for a new message in a mailbox.
45type ChangeAddUID struct {
46 MailboxID int64
47 UID UID
48 ModSeq ModSeq
49 Flags Flags // System flags.
50 Keywords []string // Other flags.
51}
52
53func (c ChangeAddUID) ChangeModSeq() ModSeq { return c.ModSeq }
54
55// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
56type ChangeRemoveUIDs struct {
57 MailboxID int64
58 UIDs []UID // Must be in increasing UID order, for IMAP.
59 ModSeq ModSeq
60 MsgIDs []int64 // Message.ID, for erasing, order does not necessarily correspond with UIDs!
61}
62
63func (c ChangeRemoveUIDs) ChangeModSeq() ModSeq { return c.ModSeq }
64
65// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
66type ChangeFlags struct {
67 MailboxID int64
68 UID UID
69 ModSeq ModSeq
70 Mask Flags // Which flags are actually modified.
71 Flags Flags // New flag values. All are set, not just mask.
72 Keywords []string // Non-system/well-known flags/keywords/labels.
73}
74
75func (c ChangeFlags) ChangeModSeq() ModSeq { return c.ModSeq }
76
77// ChangeThread is sent when muted/collapsed changes.
78type ChangeThread struct {
79 MessageIDs []int64
80 Muted bool
81 Collapsed bool
82}
83
84func (c ChangeThread) ChangeModSeq() ModSeq { return -1 }
85
86// ChangeRemoveMailbox is sent for a removed mailbox.
87type ChangeRemoveMailbox struct {
88 MailboxID int64
89 Name string
90 ModSeq ModSeq
91}
92
93func (c ChangeRemoveMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
94
95// ChangeAddMailbox is sent for a newly created mailbox.
96type ChangeAddMailbox struct {
97 Mailbox
98 Flags []string // For flags like \Subscribed.
99}
100
101func (c ChangeAddMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
102
103// ChangeRenameMailbox is sent for a rename mailbox.
104type ChangeRenameMailbox struct {
105 MailboxID int64
106 OldName string
107 NewName string
108 Flags []string
109 ModSeq ModSeq
110}
111
112func (c ChangeRenameMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
113
114// ChangeAddSubscription is sent for an added subscription to a mailbox.
115type ChangeAddSubscription struct {
116 Name string
117 Flags []string // For additional IMAP flags like \NonExistent.
118}
119
120func (c ChangeAddSubscription) ChangeModSeq() ModSeq { return -1 }
121
122// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
123type ChangeMailboxCounts struct {
124 MailboxID int64
125 MailboxName string
126 MailboxCounts
127}
128
129func (c ChangeMailboxCounts) ChangeModSeq() ModSeq { return -1 }
130
131// ChangeMailboxSpecialUse is sent when a special-use flag changes.
132type ChangeMailboxSpecialUse struct {
133 MailboxID int64
134 MailboxName string
135 SpecialUse SpecialUse
136 ModSeq ModSeq
137}
138
139func (c ChangeMailboxSpecialUse) ChangeModSeq() ModSeq { return c.ModSeq }
140
141// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
142// example, when a message is added with a previously unseen keyword.
143type ChangeMailboxKeywords struct {
144 MailboxID int64
145 MailboxName string
146 Keywords []string
147}
148
149func (c ChangeMailboxKeywords) ChangeModSeq() ModSeq { return -1 }
150
151// ChangeAnnotation is sent when an annotation is added/updated/removed, either for
152// a mailbox or a global per-account annotation. The value is not included.
153type ChangeAnnotation struct {
154 MailboxID int64 // Can be zero, meaning global (per-account) annotation.
155 MailboxName string // Empty for global (per-account) annotation.
156 Key string // Also called "entry name", e.g. "/private/comment".
157 ModSeq ModSeq
158}
159
160func (c ChangeAnnotation) ChangeModSeq() ModSeq { return c.ModSeq }
161
162func messageEraser(donec chan struct{}, cleanc chan map[*Account][]int64) {
163 log := mlog.New("store", nil)
164
165 for {
166 clean, ok := <-cleanc
167 if !ok {
168 donec <- struct{}{}
169 return
170 }
171
172 for acc, ids := range clean {
173 eraseMessages(log, acc, ids)
174 }
175 }
176}
177
178func eraseMessages(log mlog.Log, acc *Account, ids []int64) {
179 // We are responsible for closing the accounts.
180 defer func() {
181 err := acc.Close()
182 log.Check(err, "close account after erasing expunged messages", slog.String("account", acc.Name))
183 }()
184
185 acc.Lock()
186 defer acc.Unlock()
187 err := acc.DB.Write(mox.Context, func(tx *bstore.Tx) error {
188 du := DiskUsage{ID: 1}
189 if err := tx.Get(&du); err != nil {
190 return fmt.Errorf("get disk usage: %v", err)
191 }
192 var duchanged bool
193
194 for _, id := range ids {
195 me := MessageErase{ID: id}
196 if err := tx.Get(&me); err != nil {
197 return fmt.Errorf("delete message erase record %d: %v", id, err)
198 }
199
200 m := Message{ID: id}
201 if err := tx.Get(&m); err != nil {
202 return fmt.Errorf("get message %d to erase: %v", id, err)
203 } else if !m.Expunged {
204 return fmt.Errorf("message %d to erase is not marked expunged", id)
205 }
206 if !me.SkipUpdateDiskUsage {
207 du.MessageSize -= m.Size
208 duchanged = true
209 }
210 m.erase()
211 if err := tx.Update(&m); err != nil {
212 return fmt.Errorf("mark message %d erase in database: %v", id, err)
213 }
214
215 if err := tx.Delete(&me); err != nil {
216 return fmt.Errorf("deleting message erase record %d: %v", id, err)
217 }
218 }
219
220 if duchanged {
221 if err := tx.Update(&du); err != nil {
222 return fmt.Errorf("update disk usage after erasing: %v", err)
223 }
224 }
225
226 return nil
227 })
228 if err != nil {
229 log.Errorx("erasing expunged messages", err,
230 slog.String("account", acc.Name),
231 slog.Any("ids", ids),
232 )
233 return
234 }
235
236 // We remove the files after the database commit. It's better to have the files
237 // still around without being referenced from the database than references in the
238 // database to non-existent files.
239 for _, id := range ids {
240 p := acc.MessagePath(id)
241 err := os.Remove(p)
242 log.Check(err, "removing expunged message file from disk", slog.String("path", p))
243 }
244}
245
246func switchboard(stopc, donec chan struct{}, cleanc chan map[*Account][]int64) {
247 regs := map[*Account]map[*Comm]struct{}{}
248
249 // We don't remove message files or clear fields in the Message stored in the
250 // database until all references, from all sessions have gone away. When we see
251 // an expunge of a message, we count how many comms are active (i.e. how many
252 // sessions reference the message). We require each of them to tell us they are no
253 // longer referencing that message. Once we've seen that from all Comms, we remove
254 // the on-disk file and the fields from the database.
255 //
256 // During the initial account open (when there are no active sessions/Comms yet,
257 // and we open the message database file), the message erases will also be applied.
258 //
259 // When we add an account to eraseRefs, we increase the refcount, and we decrease
260 // it again when removing the account.
261 eraseRefs := map[*Account]map[int64]int{}
262
263 // We collect which messages can be erased per account, for sending them off to the
264 // eraser goroutine. When an account is added to this map, its refcount is
265 // increased. It is decreased again by the eraser goroutine.
266 eraseIDs := map[*Account][]int64{}
267
268 addEraseIDs := func(acc *Account, ids ...int64) {
269 if _, ok := eraseIDs[acc]; !ok {
270 openAccounts.Lock()
271 acc.nused++
272 openAccounts.Unlock()
273 }
274 eraseIDs[acc] = append(eraseIDs[acc], ids...)
275 }
276
277 decreaseEraseRefs := func(acc *Account, ids ...int64) {
278 for _, id := range ids {
279 v := eraseRefs[acc][id] - 1
280 if v < 0 {
281 metrics.PanicInc(metrics.Store) // For tests.
282 panic(fmt.Sprintf("negative expunged message references for account %q, message id %d", acc.Name, id))
283 }
284 if v > 0 {
285 eraseRefs[acc][id] = v
286 continue
287 }
288
289 addEraseIDs(acc, id)
290 delete(eraseRefs[acc], id)
291 if len(eraseRefs[acc]) > 0 {
292 continue
293 }
294 delete(eraseRefs, acc)
295 // Note: cannot use acc.Close, it tries to lock acc, but someone broadcasting to
296 // this goroutine will likely have the lock.
297 openAccounts.Lock()
298 acc.nused--
299 n := acc.nused
300 openAccounts.Unlock()
301 if n < 0 {
302 metrics.PanicInc(metrics.Store) // For tests.
303 panic(fmt.Sprintf("negative reference count for account %q, after removing message id %d", acc.Name, id))
304 }
305 }
306 }
307
308 for {
309 // If we have messages to clean, try sending to the eraser.
310 cc := cleanc
311 if len(eraseIDs) == 0 {
312 cc = nil
313 }
314
315 select {
316 case cc <- eraseIDs:
317 eraseIDs = map[*Account][]int64{}
318
319 case c := <-register:
320 if _, ok := regs[c.acc]; !ok {
321 regs[c.acc] = map[*Comm]struct{}{}
322 }
323 regs[c.acc][c] = struct{}{}
324
325 case c := <-unregister:
326 // Drain any ChangeRemoveUIDs references from the comm, to update our eraseRefs and
327 // possibly queue messages for cleaning. No need to take a lock, the caller does
328 // not use the comm anymore.
329 for _, ch := range c.changes {
330 rem, ok := ch.(ChangeRemoveUIDs)
331 if !ok {
332 continue
333 }
334 decreaseEraseRefs(c.acc, rem.MsgIDs...)
335 }
336
337 delete(regs[c.acc], c)
338 if len(regs[c.acc]) == 0 {
339 delete(regs, c.acc)
340 }
341
342 case chReq := <-broadcast:
343 acc := chReq.acc
344
345 // Track references to removed messages in sessions (mostly IMAP) so we can pass
346 // them to the eraser.
347 for _, ch := range chReq.changes {
348 rem, ok := ch.(ChangeRemoveUIDs)
349 if !ok {
350 continue
351 }
352
353 refs := len(regs[acc])
354 if chReq.comm != nil {
355 // The sender does not get this change and doesn't have to notify us of having
356 // processed the removal.
357 refs--
358 }
359 if refs <= 0 {
360 addEraseIDs(acc, rem.MsgIDs...)
361 continue
362 }
363
364 // Comms/sessions still reference these messages, track how many.
365 for _, id := range rem.MsgIDs {
366 if _, ok := eraseRefs[acc]; !ok {
367 openAccounts.Lock()
368 acc.nused++
369 openAccounts.Unlock()
370
371 eraseRefs[acc] = map[int64]int{}
372 }
373 if _, ok := eraseRefs[acc][id]; ok {
374 metrics.PanicInc(metrics.Store) // For tests.
375 panic(fmt.Sprintf("already have eraseRef for message id %d, account %q", id, acc.Name))
376 }
377 eraseRefs[acc][id] = refs
378 }
379 }
380
381 for c := range regs[acc] {
382 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
383 // originating from a comm, so won't match in that case.
384 if c == chReq.comm {
385 continue
386 }
387
388 c.Lock()
389 c.changes = append(c.changes, chReq.changes...)
390 c.Unlock()
391
392 select {
393 case c.Pending <- struct{}{}:
394 default:
395 }
396 }
397 chReq.done <- struct{}{}
398
399 case removal := <-applied:
400 acc := removal.Account
401
402 // Decrease references of messages, queueing for erasure when the last reference
403 // goes away.
404 decreaseEraseRefs(acc, removal.MsgIDs...)
405
406 case <-stopc:
407 // We may still have eraseRefs, messages currently referenced in a session. Those
408 // messages will be erased when the database file is opened again in the future. If
409 // we have messages ready to erase now, we'll do that first.
410
411 if len(eraseIDs) > 0 {
412 cleanc <- eraseIDs
413 eraseIDs = nil
414 }
415
416 for acc := range eraseRefs {
417 err := acc.Close()
418 log := mlog.New("store", nil)
419 log.Check(err, "closing account")
420 }
421
422 close(cleanc) // Tell eraser to stop.
423 donec <- struct{}{} // Say we are now done.
424 return
425 }
426 }
427}
428
429var switchboardBusy atomic.Bool
430
431// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
432func Switchboard() (stop func()) {
433 if !switchboardBusy.CompareAndSwap(false, true) {
434 panic("switchboard already busy")
435 }
436
437 stopc := make(chan struct{})
438 donec := make(chan struct{})
439 cleanc := make(chan map[*Account][]int64)
440
441 go messageEraser(donec, cleanc)
442 go switchboard(stopc, donec, cleanc)
443
444 return func() {
445 stopc <- struct{}{}
446
447 // Wait for switchboard and eraser goroutines to be ready.
448 <-donec
449 <-donec
450
451 if !switchboardBusy.CompareAndSwap(true, false) {
452 panic("switchboard already unregistered?")
453 }
454 }
455}
456
457// Comm handles communication with the goroutine that maintains the
458// account/mailbox/message state.
459type Comm struct {
460 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
461
462 acc *Account
463
464 sync.Mutex
465 changes []Change
466}
467
468// Register starts a Comm for the account. Unregister must be called.
469func RegisterComm(acc *Account) *Comm {
470 c := &Comm{
471 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
472 acc: acc,
473 }
474 register <- c
475 return c
476}
477
478// Unregister stops this Comm.
479func (c *Comm) Unregister() {
480 unregister <- c
481}
482
483// Broadcast ensures changes are sent to other Comms.
484func (c *Comm) Broadcast(ch []Change) {
485 if len(ch) == 0 {
486 return
487 }
488 done := make(chan struct{}, 1)
489 broadcast <- changeReq{c.acc, c, ch, done}
490 <-done
491}
492
493// Get retrieves all pending changes. If no changes are pending a nil or empty list
494// is returned.
495func (c *Comm) Get() []Change {
496 c.Lock()
497 defer c.Unlock()
498 l := c.changes
499 c.changes = nil
500 return l
501}
502
503// RemovalSeen must be called by consumers when they have applied the removal to
504// their session. The switchboard tracks references of expunged messages, and
505// removes/cleans the message up when the last reference is gone.
506func (c *Comm) RemovalSeen(ch ChangeRemoveUIDs) {
507 applied <- removalApplied{c.acc, ch.MsgIDs}
508}
509
510// BroadcastChanges ensures changes are sent to all listeners on the accoount.
511func BroadcastChanges(acc *Account, ch []Change) {
512 if len(ch) == 0 {
513 return
514 }
515 done := make(chan struct{}, 1)
516 broadcast <- changeReq{acc, nil, ch, done}
517 <-done
518}
519