10 "github.com/mjl-/bstore"
12 "github.com/mjl-/mox/metrics"
13 "github.com/mjl-/mox/mlog"
14 "github.com/mjl-/mox/mox-"
18 register = make(chan *Comm)
19 unregister = make(chan *Comm)
20 broadcast = make(chan changeReq)
21 applied = make(chan removalApplied)
24type changeReq struct {
26 comm *Comm // Can be nil.
31type removalApplied struct {
36type UID uint32 // IMAP UID.
38// Change to mailboxes/subscriptions/messages in an account. One of the Change*
39// types in this package.
40type Change interface {
41 ChangeModSeq() ModSeq // returns -1 for "modseq not applicable"
44// ChangeAddUID is sent for a new message in a mailbox.
45type ChangeAddUID struct {
49 Flags Flags // System flags.
50 Keywords []string // Other flags.
53func (c ChangeAddUID) ChangeModSeq() ModSeq { return c.ModSeq }
55// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
56type ChangeRemoveUIDs struct {
58 UIDs []UID // Must be in increasing UID order, for IMAP.
60 MsgIDs []int64 // Message.ID, for erasing, order does not necessarily correspond with UIDs!
63func (c ChangeRemoveUIDs) ChangeModSeq() ModSeq { return c.ModSeq }
65// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
66type ChangeFlags struct {
70 Mask Flags // Which flags are actually modified.
71 Flags Flags // New flag values. All are set, not just mask.
72 Keywords []string // Non-system/well-known flags/keywords/labels.
75func (c ChangeFlags) ChangeModSeq() ModSeq { return c.ModSeq }
77// ChangeThread is sent when muted/collapsed changes.
78type ChangeThread struct {
84func (c ChangeThread) ChangeModSeq() ModSeq { return -1 }
86// ChangeRemoveMailbox is sent for a removed mailbox.
87type ChangeRemoveMailbox struct {
93func (c ChangeRemoveMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
95// ChangeAddMailbox is sent for a newly created mailbox.
96type ChangeAddMailbox struct {
98 Flags []string // For flags like \Subscribed.
101func (c ChangeAddMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
103// ChangeRenameMailbox is sent for a rename mailbox.
104type ChangeRenameMailbox struct {
112func (c ChangeRenameMailbox) ChangeModSeq() ModSeq { return c.ModSeq }
114// ChangeAddSubscription is sent for an added subscription to a mailbox.
115type ChangeAddSubscription struct {
117 Flags []string // For additional IMAP flags like \NonExistent.
120func (c ChangeAddSubscription) ChangeModSeq() ModSeq { return -1 }
122// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
123type ChangeMailboxCounts struct {
129func (c ChangeMailboxCounts) ChangeModSeq() ModSeq { return -1 }
131// ChangeMailboxSpecialUse is sent when a special-use flag changes.
132type ChangeMailboxSpecialUse struct {
135 SpecialUse SpecialUse
139func (c ChangeMailboxSpecialUse) ChangeModSeq() ModSeq { return c.ModSeq }
141// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
142// example, when a message is added with a previously unseen keyword.
143type ChangeMailboxKeywords struct {
149func (c ChangeMailboxKeywords) ChangeModSeq() ModSeq { return -1 }
151// ChangeAnnotation is sent when an annotation is added/updated/removed, either for
152// a mailbox or a global per-account annotation. The value is not included.
153type ChangeAnnotation struct {
154 MailboxID int64 // Can be zero, meaning global (per-account) annotation.
155 MailboxName string // Empty for global (per-account) annotation.
156 Key string // Also called "entry name", e.g. "/private/comment".
160func (c ChangeAnnotation) ChangeModSeq() ModSeq { return c.ModSeq }
162func messageEraser(donec chan struct{}, cleanc chan map[*Account][]int64) {
163 log := mlog.New("store", nil)
166 clean, ok := <-cleanc
172 for acc, ids := range clean {
173 eraseMessages(log, acc, ids)
178func eraseMessages(log mlog.Log, acc *Account, ids []int64) {
179 // We are responsible for closing the accounts.
182 log.Check(err, "close account after erasing expunged messages", slog.String("account", acc.Name))
187 err := acc.DB.Write(mox.Context, func(tx *bstore.Tx) error {
188 du := DiskUsage{ID: 1}
189 if err := tx.Get(&du); err != nil {
190 return fmt.Errorf("get disk usage: %v", err)
194 for _, id := range ids {
195 me := MessageErase{ID: id}
196 if err := tx.Get(&me); err != nil {
197 return fmt.Errorf("delete message erase record %d: %v", id, err)
201 if err := tx.Get(&m); err != nil {
202 return fmt.Errorf("get message %d to erase: %v", id, err)
203 } else if !m.Expunged {
204 return fmt.Errorf("message %d to erase is not marked expunged", id)
206 if !me.SkipUpdateDiskUsage {
207 du.MessageSize -= m.Size
211 if err := tx.Update(&m); err != nil {
212 return fmt.Errorf("mark message %d erase in database: %v", id, err)
215 if err := tx.Delete(&me); err != nil {
216 return fmt.Errorf("deleting message erase record %d: %v", id, err)
221 if err := tx.Update(&du); err != nil {
222 return fmt.Errorf("update disk usage after erasing: %v", err)
229 log.Errorx("erasing expunged messages", err,
230 slog.String("account", acc.Name),
231 slog.Any("ids", ids),
236 // We remove the files after the database commit. It's better to have the files
237 // still around without being referenced from the database than references in the
238 // database to non-existent files.
239 for _, id := range ids {
240 p := acc.MessagePath(id)
242 log.Check(err, "removing expunged message file from disk", slog.String("path", p))
246func switchboard(stopc, donec chan struct{}, cleanc chan map[*Account][]int64) {
247 regs := map[*Account]map[*Comm]struct{}{}
249 // We don't remove message files or clear fields in the Message stored in the
250 // database until all references, from all sessions have gone away. When we see
251 // an expunge of a message, we count how many comms are active (i.e. how many
252 // sessions reference the message). We require each of them to tell us they are no
253 // longer referencing that message. Once we've seen that from all Comms, we remove
254 // the on-disk file and the fields from the database.
256 // During the initial account open (when there are no active sessions/Comms yet,
257 // and we open the message database file), the message erases will also be applied.
259 // When we add an account to eraseRefs, we increase the refcount, and we decrease
260 // it again when removing the account.
261 eraseRefs := map[*Account]map[int64]int{}
263 // We collect which messages can be erased per account, for sending them off to the
264 // eraser goroutine. When an account is added to this map, its refcount is
265 // increased. It is decreased again by the eraser goroutine.
266 eraseIDs := map[*Account][]int64{}
268 addEraseIDs := func(acc *Account, ids ...int64) {
269 if _, ok := eraseIDs[acc]; !ok {
272 openAccounts.Unlock()
274 eraseIDs[acc] = append(eraseIDs[acc], ids...)
277 decreaseEraseRefs := func(acc *Account, ids ...int64) {
278 for _, id := range ids {
279 v := eraseRefs[acc][id] - 1
281 metrics.PanicInc(metrics.Store) // For tests.
282 panic(fmt.Sprintf("negative expunged message references for account %q, message id %d", acc.Name, id))
285 eraseRefs[acc][id] = v
290 delete(eraseRefs[acc], id)
291 if len(eraseRefs[acc]) > 0 {
294 delete(eraseRefs, acc)
295 // Note: cannot use acc.Close, it tries to lock acc, but someone broadcasting to
296 // this goroutine will likely have the lock.
300 openAccounts.Unlock()
302 metrics.PanicInc(metrics.Store) // For tests.
303 panic(fmt.Sprintf("negative reference count for account %q, after removing message id %d", acc.Name, id))
309 // If we have messages to clean, try sending to the eraser.
311 if len(eraseIDs) == 0 {
317 eraseIDs = map[*Account][]int64{}
319 case c := <-register:
320 if _, ok := regs[c.acc]; !ok {
321 regs[c.acc] = map[*Comm]struct{}{}
323 regs[c.acc][c] = struct{}{}
325 case c := <-unregister:
326 // Drain any ChangeRemoveUIDs references from the comm, to update our eraseRefs and
327 // possibly queue messages for cleaning. No need to take a lock, the caller does
328 // not use the comm anymore.
329 for _, ch := range c.changes {
330 rem, ok := ch.(ChangeRemoveUIDs)
334 decreaseEraseRefs(c.acc, rem.MsgIDs...)
337 delete(regs[c.acc], c)
338 if len(regs[c.acc]) == 0 {
342 case chReq := <-broadcast:
345 // Track references to removed messages in sessions (mostly IMAP) so we can pass
346 // them to the eraser.
347 for _, ch := range chReq.changes {
348 rem, ok := ch.(ChangeRemoveUIDs)
353 refs := len(regs[acc])
354 if chReq.comm != nil {
355 // The sender does not get this change and doesn't have to notify us of having
356 // processed the removal.
360 addEraseIDs(acc, rem.MsgIDs...)
364 // Comms/sessions still reference these messages, track how many.
365 for _, id := range rem.MsgIDs {
366 if _, ok := eraseRefs[acc]; !ok {
369 openAccounts.Unlock()
371 eraseRefs[acc] = map[int64]int{}
373 if _, ok := eraseRefs[acc][id]; ok {
374 metrics.PanicInc(metrics.Store) // For tests.
375 panic(fmt.Sprintf("already have eraseRef for message id %d, account %q", id, acc.Name))
377 eraseRefs[acc][id] = refs
381 for c := range regs[acc] {
382 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
383 // originating from a comm, so won't match in that case.
389 c.changes = append(c.changes, chReq.changes...)
393 case c.Pending <- struct{}{}:
397 chReq.done <- struct{}{}
399 case removal := <-applied:
400 acc := removal.Account
402 // Decrease references of messages, queueing for erasure when the last reference
404 decreaseEraseRefs(acc, removal.MsgIDs...)
407 // We may still have eraseRefs, messages currently referenced in a session. Those
408 // messages will be erased when the database file is opened again in the future. If
409 // we have messages ready to erase now, we'll do that first.
411 if len(eraseIDs) > 0 {
416 for acc := range eraseRefs {
418 log := mlog.New("store", nil)
419 log.Check(err, "closing account")
422 close(cleanc) // Tell eraser to stop.
423 donec <- struct{}{} // Say we are now done.
429var switchboardBusy atomic.Bool
431// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
432func Switchboard() (stop func()) {
433 if !switchboardBusy.CompareAndSwap(false, true) {
434 panic("switchboard already busy")
437 stopc := make(chan struct{})
438 donec := make(chan struct{})
439 cleanc := make(chan map[*Account][]int64)
441 go messageEraser(donec, cleanc)
442 go switchboard(stopc, donec, cleanc)
447 // Wait for switchboard and eraser goroutines to be ready.
451 if !switchboardBusy.CompareAndSwap(true, false) {
452 panic("switchboard already unregistered?")
457// Comm handles communication with the goroutine that maintains the
458// account/mailbox/message state.
460 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
468// Register starts a Comm for the account. Unregister must be called.
469func RegisterComm(acc *Account) *Comm {
471 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
478// Unregister stops this Comm.
479func (c *Comm) Unregister() {
483// Broadcast ensures changes are sent to other Comms.
484func (c *Comm) Broadcast(ch []Change) {
488 done := make(chan struct{}, 1)
489 broadcast <- changeReq{c.acc, c, ch, done}
493// Get retrieves all pending changes. If no changes are pending a nil or empty list
495func (c *Comm) Get() []Change {
503// RemovalSeen must be called by consumers when they have applied the removal to
504// their session. The switchboard tracks references of expunged messages, and
505// removes/cleans the message up when the last reference is gone.
506func (c *Comm) RemovalSeen(ch ChangeRemoveUIDs) {
507 applied <- removalApplied{c.acc, ch.MsgIDs}
510// BroadcastChanges ensures changes are sent to all listeners on the accoount.
511func BroadcastChanges(acc *Account, ch []Change) {
515 done := make(chan struct{}, 1)
516 broadcast <- changeReq{acc, nil, ch, done}