10 "github.com/mjl-/bstore"
12 "github.com/mjl-/mox/metrics"
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
17// CommPendingChangesMax is the maximum number of changes kept for a Comm before
18// registering a notification overflow and flushing changes. Variable because set
19// to low value during tests.
20var CommPendingChangesMax = 10000
23 register = make(chan *Comm)
24 unregister = make(chan *Comm)
25 broadcast = make(chan changeReq)
26 applied = make(chan removalApplied)
29type changeReq struct {
31 comm *Comm // Can be nil.
36type removalApplied struct {
41type UID uint32 // IMAP UID.
43// Change to mailboxes/subscriptions/messages in an account. One of the Change*
44// types in this package.
45type Change interface {
46 ChangeModSeq() ModSeq // returns -1 for "modseq not applicable"
49// ChangeAddUID is sent for a new message in a mailbox.
50type ChangeAddUID struct {
54 Flags Flags // System flags.
55 Keywords []string // Other flags.
58 MessageCountIMAP uint32
62func (c ChangeAddUID) ChangeModSeq() ModSeq { return c.ModSeq }
64// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
65type ChangeRemoveUIDs struct {
67 UIDs []UID // Must be in increasing UID order, for IMAP.
69 MsgIDs []int64 // Message.ID, for erasing, order does not necessarily correspond with UIDs!
73 MessageCountIMAP uint32
77func (c ChangeRemoveUIDs) ChangeModSeq() ModSeq { return c.ModSeq }
79// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
80type ChangeFlags struct {
84 Mask Flags // Which flags are actually modified.
85 Flags Flags // New flag values. All are set, not just mask.
86 Keywords []string // Non-system/well-known flags/keywords/labels.
93func (c ChangeFlags) ChangeModSeq() ModSeq { return c.ModSeq }
95// ChangeThread is sent when muted/collapsed changes.
96type ChangeThread struct {
102func (c ChangeThread) ChangeModSeq() ModSeq { return -1 }
104// ChangeRemoveMailbox is sent for a removed mailbox.
105type ChangeRemoveMailbox struct {
111func (c ChangeRemoveMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
113// ChangeAddMailbox is sent for a newly created mailbox.
114type ChangeAddMailbox struct {
116 Flags []string // For flags like \Subscribed.
119func (c ChangeAddMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
121// ChangeRenameMailbox is sent for a rename mailbox.
122type ChangeRenameMailbox struct {
130func (c ChangeRenameMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
132// ChangeAddSubscription is sent for an added subscription to a mailbox.
133type ChangeAddSubscription struct {
135 ListFlags []string // For additional IMAP flags like \NonExistent.
138func (c ChangeAddSubscription) ChangeModSeq() ModSeq { return -1 }
140// ChangeRemoveSubscription is sent for a removed subscription of a mailbox.
141type ChangeRemoveSubscription struct {
143 ListFlags []string // For additional IMAP flags like \NonExistent.
146func (c ChangeRemoveSubscription) ChangeModSeq() ModSeq { return -1 }
148// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
149type ChangeMailboxCounts struct {
155func (c ChangeMailboxCounts) ChangeModSeq() ModSeq { return -1 }
157// ChangeMailboxSpecialUse is sent when a special-use flag changes.
158type ChangeMailboxSpecialUse struct {
161 SpecialUse SpecialUse
165func (c ChangeMailboxSpecialUse) ChangeModSeq() ModSeq { return c.ModSeq }
167// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
168// example, when a message is added with a previously unseen keyword.
169type ChangeMailboxKeywords struct {
175func (c ChangeMailboxKeywords) ChangeModSeq() ModSeq { return -1 }
177// ChangeAnnotation is sent when an annotation is added/updated/removed, either for
178// a mailbox or a global per-account annotation. The value is not included.
179type ChangeAnnotation struct {
180 MailboxID int64 // Can be zero, meaning global (per-account) annotation.
181 MailboxName string // Empty for global (per-account) annotation.
182 Key string // Also called "entry name", e.g. "/private/comment".
186func (c ChangeAnnotation) ChangeModSeq() ModSeq { return c.ModSeq }
188func messageEraser(donec chan struct{}, cleanc chan map[*Account][]int64) {
189 log := mlog.New("store", nil)
192 clean, ok := <-cleanc
198 for acc, ids := range clean {
199 eraseMessages(log, acc, ids)
204func eraseMessages(log mlog.Log, acc *Account, ids []int64) {
205 // We are responsible for closing the accounts.
208 log.Check(err, "close account after erasing expunged messages", slog.String("account", acc.Name))
213 err := acc.DB.Write(mox.Context, func(tx *bstore.Tx) error {
214 du := DiskUsage{ID: 1}
215 if err := tx.Get(&du); err != nil {
216 return fmt.Errorf("get disk usage: %v", err)
220 for _, id := range ids {
221 me := MessageErase{ID: id}
222 if err := tx.Get(&me); err != nil {
223 return fmt.Errorf("delete message erase record %d: %v", id, err)
227 if err := tx.Get(&m); err != nil {
228 return fmt.Errorf("get message %d to erase: %v", id, err)
229 } else if !m.Expunged {
230 return fmt.Errorf("message %d to erase is not marked expunged", id)
232 if !me.SkipUpdateDiskUsage {
233 du.MessageSize -= m.Size
237 if err := tx.Update(&m); err != nil {
238 return fmt.Errorf("mark message %d erase in database: %v", id, err)
241 if err := tx.Delete(&me); err != nil {
242 return fmt.Errorf("deleting message erase record %d: %v", id, err)
247 if err := tx.Update(&du); err != nil {
248 return fmt.Errorf("update disk usage after erasing: %v", err)
255 log.Errorx("erasing expunged messages", err,
256 slog.String("account", acc.Name),
257 slog.Any("ids", ids),
262 // We remove the files after the database commit. It's better to have the files
263 // still around without being referenced from the database than references in the
264 // database to non-existent files.
265 for _, id := range ids {
266 p := acc.MessagePath(id)
268 log.Check(err, "removing expunged message file from disk", slog.String("path", p))
272func switchboard(stopc, donec chan struct{}, cleanc chan map[*Account][]int64) {
273 regs := map[*Account]map[*Comm]struct{}{}
275 // We don't remove message files or clear fields in the Message stored in the
276 // database until all references, from all sessions have gone away. When we see
277 // an expunge of a message, we count how many comms are active (i.e. how many
278 // sessions reference the message). We require each of them to tell us they are no
279 // longer referencing that message. Once we've seen that from all Comms, we remove
280 // the on-disk file and the fields from the database.
282 // During the initial account open (when there are no active sessions/Comms yet,
283 // and we open the message database file), the message erases will also be applied.
285 // When we add an account to eraseRefs, we increase the refcount, and we decrease
286 // it again when removing the account.
287 eraseRefs := map[*Account]map[int64]int{}
289 // We collect which messages can be erased per account, for sending them off to the
290 // eraser goroutine. When an account is added to this map, its refcount is
291 // increased. It is decreased again by the eraser goroutine.
292 eraseIDs := map[*Account][]int64{}
294 addEraseIDs := func(acc *Account, ids ...int64) {
295 if _, ok := eraseIDs[acc]; !ok {
298 openAccounts.Unlock()
300 eraseIDs[acc] = append(eraseIDs[acc], ids...)
303 decreaseEraseRefs := func(acc *Account, ids ...int64) {
304 for _, id := range ids {
305 v := eraseRefs[acc][id] - 1
307 metrics.PanicInc(metrics.Store) // For tests.
308 panic(fmt.Sprintf("negative expunged message references for account %q, message id %d", acc.Name, id))
311 eraseRefs[acc][id] = v
316 delete(eraseRefs[acc], id)
317 if len(eraseRefs[acc]) > 0 {
320 delete(eraseRefs, acc)
321 // Note: cannot use acc.Close, it tries to lock acc, but someone broadcasting to
322 // this goroutine will likely have the lock.
326 openAccounts.Unlock()
328 metrics.PanicInc(metrics.Store) // For tests.
329 panic(fmt.Sprintf("negative reference count for account %q, after removing message id %d", acc.Name, id))
335 // If we have messages to clean, try sending to the eraser.
337 if len(eraseIDs) == 0 {
343 eraseIDs = map[*Account][]int64{}
345 case c := <-register:
346 if _, ok := regs[c.acc]; !ok {
347 regs[c.acc] = map[*Comm]struct{}{}
349 regs[c.acc][c] = struct{}{}
351 case c := <-unregister:
352 // Drain any ChangeRemoveUIDs references from the comm, to update our eraseRefs and
353 // possibly queue messages for cleaning. No need to take a lock, the caller does
354 // not use the comm anymore.
355 for _, ch := range c.changes {
356 if rem, ok := ch.(ChangeRemoveUIDs); ok {
357 decreaseEraseRefs(c.acc, rem.MsgIDs...)
361 delete(regs[c.acc], c)
362 if len(regs[c.acc]) == 0 {
366 case chReq := <-broadcast:
369 // Track references to removed messages in sessions (mostly IMAP) so we can pass
370 // them to the eraser.
371 for _, ch := range chReq.changes {
372 rem, ok := ch.(ChangeRemoveUIDs)
377 refs := len(regs[acc])
378 if chReq.comm != nil {
379 // The sender does not get this change and doesn't have to notify us of having
380 // processed the removal.
384 addEraseIDs(acc, rem.MsgIDs...)
388 // Comms/sessions still reference these messages, track how many.
389 for _, id := range rem.MsgIDs {
390 if _, ok := eraseRefs[acc]; !ok {
393 openAccounts.Unlock()
395 eraseRefs[acc] = map[int64]int{}
397 if _, ok := eraseRefs[acc][id]; ok {
398 metrics.PanicInc(metrics.Store) // For tests.
399 panic(fmt.Sprintf("already have eraseRef for message id %d, account %q", id, acc.Name))
401 eraseRefs[acc][id] = refs
405 for c := range regs[acc] {
406 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
407 // originating from a comm, so won't match in that case.
415 if len(c.changes)+len(chReq.changes) > CommPendingChangesMax {
419 c.changes = append(c.changes, chReq.changes...)
423 // In case of overflow, we didn't add the pending changes to the comm, so we must
424 // decrease references again.
426 for _, ch := range chReq.changes {
427 if rem, ok := ch.(ChangeRemoveUIDs); ok {
428 decreaseEraseRefs(acc, rem.MsgIDs...)
434 case c.Pending <- struct{}{}:
438 chReq.done <- struct{}{}
440 case removal := <-applied:
441 acc := removal.Account
443 // Decrease references of messages, queueing for erasure when the last reference
445 decreaseEraseRefs(acc, removal.MsgIDs...)
448 // We may still have eraseRefs, messages currently referenced in a session. Those
449 // messages will be erased when the database file is opened again in the future. If
450 // we have messages ready to erase now, we'll do that first.
452 if len(eraseIDs) > 0 {
457 for acc := range eraseRefs {
459 log := mlog.New("store", nil)
460 log.Check(err, "closing account")
463 close(cleanc) // Tell eraser to stop.
464 donec <- struct{}{} // Say we are now done.
470var switchboardBusy atomic.Bool
472// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
473func Switchboard() (stop func()) {
474 if !switchboardBusy.CompareAndSwap(false, true) {
475 panic("switchboard already busy")
478 stopc := make(chan struct{})
479 donec := make(chan struct{})
480 cleanc := make(chan map[*Account][]int64)
482 go messageEraser(donec, cleanc)
483 go switchboard(stopc, donec, cleanc)
488 // Wait for switchboard and eraser goroutines to be ready.
492 if !switchboardBusy.CompareAndSwap(true, false) {
493 panic("switchboard already unregistered?")
498// Comm handles communication with the goroutine that maintains the
499// account/mailbox/message state.
501 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
507 // Set if too many changes were queued, cleared when changes are retrieved. While
508 // in overflow, no new changes are added.
512// Register starts a Comm for the account. Unregister must be called.
513func RegisterComm(acc *Account) *Comm {
515 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
522// Unregister stops this Comm.
523func (c *Comm) Unregister() {
527// Broadcast ensures changes are sent to other Comms.
528func (c *Comm) Broadcast(ch []Change) {
532 done := make(chan struct{}, 1)
533 broadcast <- changeReq{c.acc, c, ch, done}
537// Get retrieves all pending changes. If no changes are pending a nil or empty list
538// is returned. If too many changes were pending, overflow is true, and this Comm
539// stopped getting new changes. The caller should usually return an error to its
540// connection. Even with overflow, changes may still be non-empty. On
541// ChangeRemoveUIDs, the RemovalSeen must still be called by the caller.
542func (c *Comm) Get() (overflow bool, changes []Change) {
545 overflow, changes = c.overflow, c.changes
546 c.overflow, c.changes = false, nil
550// RemovalSeen must be called by consumers when they have applied the removal to
551// their session. The switchboard tracks references of expunged messages, and
552// removes/cleans the message up when the last reference is gone.
553func (c *Comm) RemovalSeen(ch ChangeRemoveUIDs) {
554 applied <- removalApplied{c.acc, ch.MsgIDs}
557// BroadcastChanges ensures changes are sent to all listeners on the accoount.
558func BroadcastChanges(acc *Account, ch []Change) {
562 done := make(chan struct{}, 1)
563 broadcast <- changeReq{acc, nil, ch, done}