1package store
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8var (
9 register = make(chan *Comm)
10 unregister = make(chan *Comm)
11 broadcast = make(chan changeReq)
12)
13
14type changeReq struct {
15 acc *Account
16 comm *Comm // Can be nil.
17 changes []Change
18 done chan struct{}
19}
20
21type UID uint32 // IMAP UID.
22
23// Change to mailboxes/subscriptions/messages in an account. One of the Change*
24// types in this package.
25type Change any
26
27// ChangeAddUID is sent for a new message in a mailbox.
28type ChangeAddUID struct {
29 MailboxID int64
30 UID UID
31 ModSeq ModSeq
32 Flags Flags // System flags.
33 Keywords []string // Other flags.
34}
35
36// ChangeRemoveUIDs is sent for removal of one or more messages from a mailbox.
37type ChangeRemoveUIDs struct {
38 MailboxID int64
39 UIDs []UID // Must be in increasing UID order, for IMAP.
40 ModSeq ModSeq
41}
42
43// ChangeFlags is sent for an update to flags for a message, e.g. "Seen".
44type ChangeFlags struct {
45 MailboxID int64
46 UID UID
47 ModSeq ModSeq
48 Mask Flags // Which flags are actually modified.
49 Flags Flags // New flag values. All are set, not just mask.
50 Keywords []string // Non-system/well-known flags/keywords/labels.
51}
52
53// ChangeThread is sent when muted/collapsed changes.
54type ChangeThread struct {
55 MessageIDs []int64
56 Muted bool
57 Collapsed bool
58}
59
60// ChangeRemoveMailbox is sent for a removed mailbox.
61type ChangeRemoveMailbox struct {
62 MailboxID int64
63 Name string
64}
65
66// ChangeAddMailbox is sent for a newly created mailbox.
67type ChangeAddMailbox struct {
68 Mailbox Mailbox
69 Flags []string // For flags like \Subscribed.
70}
71
72// ChangeRenameMailbox is sent for a rename mailbox.
73type ChangeRenameMailbox struct {
74 MailboxID int64
75 OldName string
76 NewName string
77 Flags []string
78}
79
80// ChangeAddSubscription is sent for an added subscription to a mailbox.
81type ChangeAddSubscription struct {
82 Name string
83 Flags []string // For additional IMAP flags like \NonExistent.
84}
85
86// ChangeMailboxCounts is sent when the number of total/deleted/unseen/unread messages changes.
87type ChangeMailboxCounts struct {
88 MailboxID int64
89 MailboxName string
90 MailboxCounts
91}
92
93// ChangeMailboxSpecialUse is sent when a special-use flag changes.
94type ChangeMailboxSpecialUse struct {
95 MailboxID int64
96 MailboxName string
97 SpecialUse SpecialUse
98}
99
100// ChangeMailboxKeywords is sent when keywords are changed for a mailbox. For
101// example, when a message is added with a previously unseen keyword.
102type ChangeMailboxKeywords struct {
103 MailboxID int64
104 MailboxName string
105 Keywords []string
106}
107
108// ChangeAnnotation is sent when an annotation is added/updated/removed, either for
109// a mailbox or a global per-account annotation. The value is not included.
110type ChangeAnnotation struct {
111 MailboxID int64 // Can be zero, meaning global (per-account) annotation.
112 MailboxName string // Empty for global (per-account) annotation.
113 Key string // Also called "entry name", e.g. "/private/comment".
114}
115
116var switchboardBusy atomic.Bool
117
118// Switchboard distributes changes to accounts to interested listeners. See Comm and Change.
119func Switchboard() (stop func()) {
120 regs := map[*Account]map[*Comm]struct{}{}
121 done := make(chan struct{})
122
123 if !switchboardBusy.CompareAndSwap(false, true) {
124 panic("switchboard already busy")
125 }
126
127 go func() {
128 for {
129 select {
130 case c := <-register:
131 if _, ok := regs[c.acc]; !ok {
132 regs[c.acc] = map[*Comm]struct{}{}
133 }
134 regs[c.acc][c] = struct{}{}
135
136 case c := <-unregister:
137 delete(regs[c.acc], c)
138 if len(regs[c.acc]) == 0 {
139 delete(regs, c.acc)
140 }
141
142 case chReq := <-broadcast:
143 acc := chReq.acc
144 for c := range regs[acc] {
145 // Do not send the broadcaster back their own changes. chReq.comm is nil if not
146 // originating from a comm, so won't match in that case.
147 if c == chReq.comm {
148 continue
149 }
150
151 c.Lock()
152 c.changes = append(c.changes, chReq.changes...)
153 c.Unlock()
154
155 select {
156 case c.Pending <- struct{}{}:
157 default:
158 }
159 }
160 chReq.done <- struct{}{}
161
162 case <-done:
163 done <- struct{}{}
164 return
165 }
166 }
167 }()
168 return func() {
169 done <- struct{}{}
170 <-done
171 if !switchboardBusy.CompareAndSwap(true, false) {
172 panic("switchboard already unregistered?")
173 }
174 }
175}
176
177// Comm handles communication with the goroutine that maintains the
178// account/mailbox/message state.
179type Comm struct {
180 Pending chan struct{} // Receives block until changes come in, e.g. for IMAP IDLE.
181
182 acc *Account
183
184 sync.Mutex
185 changes []Change
186}
187
188// Register starts a Comm for the account. Unregister must be called.
189func RegisterComm(acc *Account) *Comm {
190 c := &Comm{
191 Pending: make(chan struct{}, 1), // Bufferend so Switchboard can just do a non-blocking send.
192 acc: acc,
193 }
194 register <- c
195 return c
196}
197
198// Unregister stops this Comm.
199func (c *Comm) Unregister() {
200 unregister <- c
201}
202
203// Broadcast ensures changes are sent to other Comms.
204func (c *Comm) Broadcast(ch []Change) {
205 if len(ch) == 0 {
206 return
207 }
208 done := make(chan struct{}, 1)
209 broadcast <- changeReq{c.acc, c, ch, done}
210 <-done
211}
212
213// Get retrieves all pending changes. If no changes are pending a nil or empty list
214// is returned.
215func (c *Comm) Get() []Change {
216 c.Lock()
217 defer c.Unlock()
218 l := c.changes
219 c.changes = nil
220 return l
221}
222
223// BroadcastChanges ensures changes are sent to all listeners on the accoount.
224func BroadcastChanges(acc *Account, ch []Change) {
225 if len(ch) == 0 {
226 return
227 }
228 done := make(chan struct{}, 1)
229 broadcast <- changeReq{acc, nil, ch, done}
230 <-done
231}
232