1package store
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8var (
9 register = make(chan *Comm)
10 unregister = make(chan *Comm)
11 broadcast = make(chan changeReq)
12)
13
14type changeReq struct {
15 acc *Account
16 comm *Comm // Can be nil.
17 changes []Change
18 done chan struct{}
19}
20
21type UID uint32 // IMAP UID.
22
23// Change to mailboxes/subscriptions/messages in an account. One of the Change*
24// types in this package.
25type Change any
26
27// ChangeAddUID is sent for a new message in a mailbox.
28type ChangeAddUID struct {
29 MailboxID int64
30 UID UID
31 ModSeq ModSeq
32 Flags Flags // System flags.
33 Keywords []string // Other flags.
34}
35
36// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
37type ChangeRemoveUIDs struct {
38 MailboxID int64
39 UIDs []UID // Must be in increasing UID order, for IMAP.
40 ModSeq ModSeq
41}
42
43// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
44type ChangeFlags struct {
45 MailboxID int64
46 UID UID
47 ModSeq ModSeq
48 Mask Flags // Which flags are actually modified.
49 Flags Flags // New flag values. All are set, not just mask.
50 Keywords []string // Non-system/well-known flags/keywords/labels.
51}
52
53// ChangeThread is sent when muted/collapsed changes.
54type ChangeThread struct {
55 MessageIDs []int64
56 Muted bool
57 Collapsed bool
58}
59
60// ChangeRemoveMailbox is sent for a removed mailbox.
61type ChangeRemoveMailbox struct {
62 MailboxID int64
63 Name string
64}
65
66// ChangeAddMailbox is sent for a newly created mailbox.
67type ChangeAddMailbox struct {
68 Mailbox Mailbox
69 Flags []string // For flags like \Subscribed.
70}
71
72// ChangeRenameMailbox is sent for a rename mailbox.
73type ChangeRenameMailbox struct {
74 MailboxID int64
75 OldName string
76 NewName string
77 Flags []string
78}
79
80// ChangeAddSubscription is sent for an added subscription to a mailbox.
81type ChangeAddSubscription struct {
82 Name string
83 Flags []string // For additional IMAP flags like \NonExistent.
84}
85
86// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
87type ChangeMailboxCounts struct {
88 MailboxID int64
89 MailboxName string
90 MailboxCounts
91}
92
93// ChangeMailboxSpecialUse is sent when a special-use flag changes.
94type ChangeMailboxSpecialUse struct {
95 MailboxID int64
96 MailboxName string
97 SpecialUse SpecialUse
98}
99
100// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
101// example, when a message is added with a previously unseen keyword.
102type ChangeMailboxKeywords struct {
103 MailboxID int64
104 MailboxName string
105 Keywords []string
106}
107
108var switchboardBusy atomic.Bool
109
110// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
111func Switchboard() (stop func()) {
112 regs := map[*Account]map[*Comm]struct{}{}
113 done := make(chan struct{})
114
115 if !switchboardBusy.CompareAndSwap(false, true) {
116 panic("switchboard already busy")
117 }
118
119 go func() {
120 for {
121 select {
122 case c := <-register:
123 if _, ok := regs[c.acc]; !ok {
124 regs[c.acc] = map[*Comm]struct{}{}
125 }
126 regs[c.acc][c] = struct{}{}
127
128 case c := <-unregister:
129 delete(regs[c.acc], c)
130 if len(regs[c.acc]) == 0 {
131 delete(regs, c.acc)
132 }
133
134 case chReq := <-broadcast:
135 acc := chReq.acc
136 for c := range regs[acc] {
137 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
138 // originating from a comm, so won't match in that case.
139 if c == chReq.comm {
140 continue
141 }
142
143 c.Lock()
144 c.changes = append(c.changes, chReq.changes...)
145 c.Unlock()
146
147 select {
148 case c.Pending <- struct{}{}:
149 default:
150 }
151 }
152 chReq.done <- struct{}{}
153
154 case <-done:
155 done <- struct{}{}
156 return
157 }
158 }
159 }()
160 return func() {
161 done <- struct{}{}
162 <-done
163 if !switchboardBusy.CompareAndSwap(true, false) {
164 panic("switchboard already unregistered?")
165 }
166 }
167}
168
169// Comm handles communication with the goroutine that maintains the
170// account/mailbox/message state.
171type Comm struct {
172 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
173
174 acc *Account
175
176 sync.Mutex
177 changes []Change
178}
179
180// Register starts a Comm for the account. Unregister must be called.
181func RegisterComm(acc *Account) *Comm {
182 c := &Comm{
183 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
184 acc: acc,
185 }
186 register <- c
187 return c
188}
189
190// Unregister stops this Comm.
191func (c *Comm) Unregister() {
192 unregister <- c
193}
194
195// Broadcast ensures changes are sent to other Comms.
196func (c *Comm) Broadcast(ch []Change) {
197 if len(ch) == 0 {
198 return
199 }
200 done := make(chan struct{}, 1)
201 broadcast <- changeReq{c.acc, c, ch, done}
202 <-done
203}
204
205// Get retrieves all pending changes. If no changes are pending a nil or empty list
206// is returned.
207func (c *Comm) Get() []Change {
208 c.Lock()
209 defer c.Unlock()
210 l := c.changes
211 c.changes = nil
212 return l
213}
214
215// BroadcastChanges ensures changes are sent to all listeners on the accoount.
216func BroadcastChanges(acc *Account, ch []Change) {
217 if len(ch) == 0 {
218 return
219 }
220 done := make(chan struct{}, 1)
221 broadcast <- changeReq{acc, nil, ch, done}
222 <-done
223}
224