1package store
2
3import (
4 "fmt"
5 "log/slog"
6 "os"
7 "sync"
8 "sync/atomic"
9
10 "github.com/mjl-/bstore"
11
12 "github.com/mjl-/mox/metrics"
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
15)
16
17// CommPendingChangesMax is the maximum number of changes kept for a Comm before
18// registering a notification overflow and flushing changes. Variable because set
19// to low value during tests.
20var CommPendingChangesMax = 10000
21
22var (
23 register = make(chan *Comm)
24 unregister = make(chan *Comm)
25 broadcast = make(chan changeReq)
26 applied = make(chan removalApplied)
27)
28
29type changeReq struct {
30 acc *Account
31 comm *Comm // Can be nil.
32 changes []Change
33 done chan struct{}
34}
35
36type removalApplied struct {
37 Account *Account
38 MsgIDs []int64
39}
40
41type UID uint32 // IMAP UID.
42
43// Change to mailboxes/subscriptions/messages in an account. One of the Change*
44// types in this package.
45type Change interface {
46 ChangeModSeq() ModSeq // returns -1 for "modseq not applicable"
47}
48
49// ChangeAddUID is sent for a new message in a mailbox.
50type ChangeAddUID struct {
51 MailboxID int64
52 UID UID
53 ModSeq ModSeq
54 Flags Flags // System flags.
55 Keywords []string // Other flags.
56
57 // For IMAP NOTIFY.
58 MessageCountIMAP uint32
59 Unseen uint32
60}
61
62func (c ChangeAddUID) ChangeModSeq() ModSeq { return c.ModSeq }
63
64// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
65type ChangeRemoveUIDs struct {
66 MailboxID int64
67 UIDs []UID // Must be in increasing UID order, for IMAP.
68 ModSeq ModSeq
69 MsgIDs []int64 // Message.ID, for erasing, order does not necessarily correspond with UIDs!
70
71 // For IMAP NOTIFY.
72 UIDNext UID
73 MessageCountIMAP uint32
74 Unseen uint32
75}
76
77func (c ChangeRemoveUIDs) ChangeModSeq() ModSeq { return c.ModSeq }
78
79// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
80type ChangeFlags struct {
81 MailboxID int64
82 UID UID
83 ModSeq ModSeq
84 Mask Flags // Which flags are actually modified.
85 Flags Flags // New flag values. All are set, not just mask.
86 Keywords []string // Non-system/well-known flags/keywords/labels.
87
88 // For IMAP NOTIFY.
89 UIDValidity uint32
90 Unseen uint32
91}
92
93func (c ChangeFlags) ChangeModSeq() ModSeq { return c.ModSeq }
94
95// ChangeThread is sent when muted/collapsed changes.
96type ChangeThread struct {
97 MessageIDs []int64
98 Muted bool
99 Collapsed bool
100}
101
102func (c ChangeThread) ChangeModSeq() ModSeq { return -1 }
103
104// ChangeRemoveMailbox is sent for a removed mailbox.
105type ChangeRemoveMailbox struct {
106 MailboxID int64
107 Name string
108 ModSeq ModSeq
109}
110
111func (c ChangeRemoveMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
112
113// ChangeAddMailbox is sent for a newly created mailbox.
114type ChangeAddMailbox struct {
115 Mailbox
116 Flags []string // For flags like \Subscribed.
117}
118
119func (c ChangeAddMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
120
121// ChangeRenameMailbox is sent for a rename mailbox.
122type ChangeRenameMailbox struct {
123 MailboxID int64
124 OldName string
125 NewName string
126 Flags []string
127 ModSeq ModSeq
128}
129
130func (c ChangeRenameMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
131
132// ChangeAddSubscription is sent for an added subscription to a mailbox.
133type ChangeAddSubscription struct {
134 MailboxName string
135 ListFlags []string // For additional IMAP flags like \NonExistent.
136}
137
138func (c ChangeAddSubscription) ChangeModSeq() ModSeq { return -1 }
139
140// ChangeRemoveSubscription is sent for a removed subscription of a mailbox.
141type ChangeRemoveSubscription struct {
142 MailboxName string
143 ListFlags []string // For additional IMAP flags like \NonExistent.
144}
145
146func (c ChangeRemoveSubscription) ChangeModSeq() ModSeq { return -1 }
147
148// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
149type ChangeMailboxCounts struct {
150 MailboxID int64
151 MailboxName string
152 MailboxCounts
153}
154
155func (c ChangeMailboxCounts) ChangeModSeq() ModSeq { return -1 }
156
157// ChangeMailboxSpecialUse is sent when a special-use flag changes.
158type ChangeMailboxSpecialUse struct {
159 MailboxID int64
160 MailboxName string
161 SpecialUse SpecialUse
162 ModSeq ModSeq
163}
164
165func (c ChangeMailboxSpecialUse) ChangeModSeq() ModSeq { return c.ModSeq }
166
167// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
168// example, when a message is added with a previously unseen keyword.
169type ChangeMailboxKeywords struct {
170 MailboxID int64
171 MailboxName string
172 Keywords []string
173}
174
175func (c ChangeMailboxKeywords) ChangeModSeq() ModSeq { return -1 }
176
177// ChangeAnnotation is sent when an annotation is added/updated/removed, either for
178// a mailbox or a global per-account annotation. The value is not included.
179type ChangeAnnotation struct {
180 MailboxID int64 // Can be zero, meaning global (per-account) annotation.
181 MailboxName string // Empty for global (per-account) annotation.
182 Key string // Also called "entry name", e.g. "/private/comment".
183 ModSeq ModSeq
184}
185
186func (c ChangeAnnotation) ChangeModSeq() ModSeq { return c.ModSeq }
187
188func messageEraser(donec chan struct{}, cleanc chan map[*Account][]int64) {
189 log := mlog.New("store", nil)
190
191 for {
192 clean, ok := <-cleanc
193 if !ok {
194 donec <- struct{}{}
195 return
196 }
197
198 for acc, ids := range clean {
199 eraseMessages(log, acc, ids)
200 }
201 }
202}
203
204func eraseMessages(log mlog.Log, acc *Account, ids []int64) {
205 // We are responsible for closing the accounts.
206 defer func() {
207 err := acc.Close()
208 log.Check(err, "close account after erasing expunged messages", slog.String("account", acc.Name))
209 }()
210
211 acc.Lock()
212 defer acc.Unlock()
213 err := acc.DB.Write(mox.Context, func(tx *bstore.Tx) error {
214 du := DiskUsage{ID: 1}
215 if err := tx.Get(&du); err != nil {
216 return fmt.Errorf("get disk usage: %v", err)
217 }
218 var duchanged bool
219
220 for _, id := range ids {
221 me := MessageErase{ID: id}
222 if err := tx.Get(&me); err != nil {
223 return fmt.Errorf("delete message erase record %d: %v", id, err)
224 }
225
226 m := Message{ID: id}
227 if err := tx.Get(&m); err != nil {
228 return fmt.Errorf("get message %d to erase: %v", id, err)
229 } else if !m.Expunged {
230 return fmt.Errorf("message %d to erase is not marked expunged", id)
231 }
232 if !me.SkipUpdateDiskUsage {
233 du.MessageSize -= m.Size
234 duchanged = true
235 }
236 m.erase()
237 if err := tx.Update(&m); err != nil {
238 return fmt.Errorf("mark message %d erase in database: %v", id, err)
239 }
240
241 if err := tx.Delete(&me); err != nil {
242 return fmt.Errorf("deleting message erase record %d: %v", id, err)
243 }
244 }
245
246 if duchanged {
247 if err := tx.Update(&du); err != nil {
248 return fmt.Errorf("update disk usage after erasing: %v", err)
249 }
250 }
251
252 return nil
253 })
254 if err != nil {
255 log.Errorx("erasing expunged messages", err,
256 slog.String("account", acc.Name),
257 slog.Any("ids", ids),
258 )
259 return
260 }
261
262 // We remove the files after the database commit. It's better to have the files
263 // still around without being referenced from the database than references in the
264 // database to non-existent files.
265 for _, id := range ids {
266 p := acc.MessagePath(id)
267 err := os.Remove(p)
268 log.Check(err, "removing expunged message file from disk", slog.String("path", p))
269 }
270}
271
272func switchboard(stopc, donec chan struct{}, cleanc chan map[*Account][]int64) {
273 regs := map[*Account]map[*Comm]struct{}{}
274
275 // We don't remove message files or clear fields in the Message stored in the
276 // database until all references, from all sessions have gone away. When we see
277 // an expunge of a message, we count how many comms are active (i.e. how many
278 // sessions reference the message). We require each of them to tell us they are no
279 // longer referencing that message. Once we've seen that from all Comms, we remove
280 // the on-disk file and the fields from the database.
281 //
282 // During the initial account open (when there are no active sessions/Comms yet,
283 // and we open the message database file), the message erases will also be applied.
284 //
285 // When we add an account to eraseRefs, we increase the refcount, and we decrease
286 // it again when removing the account.
287 eraseRefs := map[*Account]map[int64]int{}
288
289 // We collect which messages can be erased per account, for sending them off to the
290 // eraser goroutine. When an account is added to this map, its refcount is
291 // increased. It is decreased again by the eraser goroutine.
292 eraseIDs := map[*Account][]int64{}
293
294 addEraseIDs := func(acc *Account, ids ...int64) {
295 if _, ok := eraseIDs[acc]; !ok {
296 openAccounts.Lock()
297 acc.nused++
298 openAccounts.Unlock()
299 }
300 eraseIDs[acc] = append(eraseIDs[acc], ids...)
301 }
302
303 decreaseEraseRefs := func(acc *Account, ids ...int64) {
304 for _, id := range ids {
305 v := eraseRefs[acc][id] - 1
306 if v < 0 {
307 metrics.PanicInc(metrics.Store) // For tests.
308 panic(fmt.Sprintf("negative expunged message references for account %q, message id %d", acc.Name, id))
309 }
310 if v > 0 {
311 eraseRefs[acc][id] = v
312 continue
313 }
314
315 addEraseIDs(acc, id)
316 delete(eraseRefs[acc], id)
317 if len(eraseRefs[acc]) > 0 {
318 continue
319 }
320 delete(eraseRefs, acc)
321 // Note: cannot use acc.Close, it tries to lock acc, but someone broadcasting to
322 // this goroutine will likely have the lock.
323 openAccounts.Lock()
324 acc.nused--
325 n := acc.nused
326 openAccounts.Unlock()
327 if n < 0 {
328 metrics.PanicInc(metrics.Store) // For tests.
329 panic(fmt.Sprintf("negative reference count for account %q, after removing message id %d", acc.Name, id))
330 }
331 }
332 }
333
334 for {
335 // If we have messages to clean, try sending to the eraser.
336 cc := cleanc
337 if len(eraseIDs) == 0 {
338 cc = nil
339 }
340
341 select {
342 case cc <- eraseIDs:
343 eraseIDs = map[*Account][]int64{}
344
345 case c := <-register:
346 if _, ok := regs[c.acc]; !ok {
347 regs[c.acc] = map[*Comm]struct{}{}
348 }
349 regs[c.acc][c] = struct{}{}
350
351 case c := <-unregister:
352 // Drain any ChangeRemoveUIDs references from the comm, to update our eraseRefs and
353 // possibly queue messages for cleaning. No need to take a lock, the caller does
354 // not use the comm anymore.
355 for _, ch := range c.changes {
356 if rem, ok := ch.(ChangeRemoveUIDs); ok {
357 decreaseEraseRefs(c.acc, rem.MsgIDs...)
358 }
359 }
360
361 delete(regs[c.acc], c)
362 if len(regs[c.acc]) == 0 {
363 delete(regs, c.acc)
364 }
365
366 case chReq := <-broadcast:
367 acc := chReq.acc
368
369 // Track references to removed messages in sessions (mostly IMAP) so we can pass
370 // them to the eraser.
371 for _, ch := range chReq.changes {
372 rem, ok := ch.(ChangeRemoveUIDs)
373 if !ok {
374 continue
375 }
376
377 refs := len(regs[acc])
378 if chReq.comm != nil {
379 // The sender does not get this change and doesn't have to notify us of having
380 // processed the removal.
381 refs--
382 }
383 if refs <= 0 {
384 addEraseIDs(acc, rem.MsgIDs...)
385 continue
386 }
387
388 // Comms/sessions still reference these messages, track how many.
389 for _, id := range rem.MsgIDs {
390 if _, ok := eraseRefs[acc]; !ok {
391 openAccounts.Lock()
392 acc.nused++
393 openAccounts.Unlock()
394
395 eraseRefs[acc] = map[int64]int{}
396 }
397 if _, ok := eraseRefs[acc][id]; ok {
398 metrics.PanicInc(metrics.Store) // For tests.
399 panic(fmt.Sprintf("already have eraseRef for message id %d, account %q", id, acc.Name))
400 }
401 eraseRefs[acc][id] = refs
402 }
403 }
404
405 for c := range regs[acc] {
406 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
407 // originating from a comm, so won't match in that case.
408 // Relevant for IMAP IDLE, and NOTIFY ../rfc/5465:428
409 if c == chReq.comm {
410 continue
411 }
412
413 var overflow bool
414 c.Lock()
415 if len(c.changes)+len(chReq.changes) > CommPendingChangesMax {
416 c.overflow = true
417 overflow = true
418 } else {
419 c.changes = append(c.changes, chReq.changes...)
420 }
421 c.Unlock()
422
423 // In case of overflow, we didn't add the pending changes to the comm, so we must
424 // decrease references again.
425 if overflow {
426 for _, ch := range chReq.changes {
427 if rem, ok := ch.(ChangeRemoveUIDs); ok {
428 decreaseEraseRefs(acc, rem.MsgIDs...)
429 }
430 }
431 }
432
433 select {
434 case c.Pending <- struct{}{}:
435 default:
436 }
437 }
438 chReq.done <- struct{}{}
439
440 case removal := <-applied:
441 acc := removal.Account
442
443 // Decrease references of messages, queueing for erasure when the last reference
444 // goes away.
445 decreaseEraseRefs(acc, removal.MsgIDs...)
446
447 case <-stopc:
448 // We may still have eraseRefs, messages currently referenced in a session. Those
449 // messages will be erased when the database file is opened again in the future. If
450 // we have messages ready to erase now, we'll do that first.
451
452 if len(eraseIDs) > 0 {
453 cleanc <- eraseIDs
454 eraseIDs = nil
455 }
456
457 for acc := range eraseRefs {
458 err := acc.Close()
459 log := mlog.New("store", nil)
460 log.Check(err, "closing account")
461 }
462
463 close(cleanc) // Tell eraser to stop.
464 donec <- struct{}{} // Say we are now done.
465 return
466 }
467 }
468}
469
470var switchboardBusy atomic.Bool
471
472// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
473func Switchboard() (stop func()) {
474 if !switchboardBusy.CompareAndSwap(false, true) {
475 panic("switchboard already busy")
476 }
477
478 stopc := make(chan struct{})
479 donec := make(chan struct{})
480 cleanc := make(chan map[*Account][]int64)
481
482 go messageEraser(donec, cleanc)
483 go switchboard(stopc, donec, cleanc)
484
485 return func() {
486 stopc <- struct{}{}
487
488 // Wait for switchboard and eraser goroutines to be ready.
489 <-donec
490 <-donec
491
492 if !switchboardBusy.CompareAndSwap(true, false) {
493 panic("switchboard already unregistered?")
494 }
495 }
496}
497
498// Comm handles communication with the goroutine that maintains the
499// account/mailbox/message state.
500type Comm struct {
501 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
502
503 acc *Account
504
505 sync.Mutex
506 changes []Change
507 // Set if too many changes were queued, cleared when changes are retrieved. While
508 // in overflow, no new changes are added.
509 overflow bool
510}
511
512// Register starts a Comm for the account. Unregister must be called.
513func RegisterComm(acc *Account) *Comm {
514 c := &Comm{
515 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
516 acc: acc,
517 }
518 register <- c
519 return c
520}
521
522// Unregister stops this Comm.
523func (c *Comm) Unregister() {
524 unregister <- c
525}
526
527// Broadcast ensures changes are sent to other Comms.
528func (c *Comm) Broadcast(ch []Change) {
529 if len(ch) == 0 {
530 return
531 }
532 done := make(chan struct{}, 1)
533 broadcast <- changeReq{c.acc, c, ch, done}
534 <-done
535}
536
537// Get retrieves all pending changes. If no changes are pending a nil or empty list
538// is returned. If too many changes were pending, overflow is true, and this Comm
539// stopped getting new changes. The caller should usually return an error to its
540// connection. Even with overflow, changes may still be non-empty. On
541// ChangeRemoveUIDs, the RemovalSeen must still be called by the caller.
542func (c *Comm) Get() (overflow bool, changes []Change) {
543 c.Lock()
544 defer c.Unlock()
545 overflow, changes = c.overflow, c.changes
546 c.overflow, c.changes = false, nil
547 return
548}
549
550// RemovalSeen must be called by consumers when they have applied the removal to
551// their session. The switchboard tracks references of expunged messages, and
552// removes/cleans the message up when the last reference is gone.
553func (c *Comm) RemovalSeen(ch ChangeRemoveUIDs) {
554 applied <- removalApplied{c.acc, ch.MsgIDs}
555}
556
557// BroadcastChanges ensures changes are sent to all listeners on the accoount.
558func BroadcastChanges(acc *Account, ch []Change) {
559 if len(ch) == 0 {
560 return
561 }
562 done := make(chan struct{}, 1)
563 broadcast <- changeReq{acc, nil, ch, done}
564 <-done
565}
566