10 mathrand2 "math/rand/v2"
16 "github.com/mjl-/mox/metrics"
17 "github.com/mjl-/mox/mlog"
18 "github.com/mjl-/mox/store"
21type eventWriter struct {
23 waitMin, waitMax time.Duration
25 // If connection is closed, the goroutine doing delayed writes must abort.
29 // Before writing an event, we check if session is still valid. If not, we send a
30 // fatal error instead.
32 sessionToken store.SessionToken
34 wrote bool // To be reset by user, set on write.
36 name string // E.g. "start" for EventStart.
37 v any // Written as JSON.
38 when time.Time // For delaying.
39 } // Will only be set when waitMin or waitMax is > 0. Closed on connection shutdown.
40 errors chan error // If we have an events channel, we read errors and abort for them.
43func newEventWriter(out writeFlusher, waitMin, waitMax time.Duration, accountName string, sessionToken store.SessionToken) *eventWriter {
44 return &eventWriter{out: out, waitMin: waitMin, waitMax: waitMax, accountName: accountName, sessionToken: sessionToken}
47// close shuts down the events channel, causing the goroutine (if created) to
49func (ew *eventWriter) close() {
58// Write an event to the connection, e.g. "start" with value v, written as
59// JSON. This directly writes the event, no more delay.
60func (ew *eventWriter) write(name string, v any) error {
61 bw := bufio.NewWriter(ew.out)
62 if _, err := fmt.Fprintf(bw, "event: %s\ndata: ", name); err != nil {
64 } else if err := json.NewEncoder(bw).Encode(v); err != nil {
66 } else if _, err := fmt.Fprint(bw, "\n"); err != nil {
68 } else if err := bw.Flush(); err != nil {
74// Schedule an event for writing to the connection. If events get a delay, this
75// function still returns immediately.
76func (ew *eventWriter) xsendEvent(ctx context.Context, log mlog.Log, name string, v any) {
77 if name != "fatalErr" {
78 if _, err := store.SessionUse(ctx, log, ew.accountName, ew.sessionToken, ""); err != nil {
79 ew.xsendEvent(ctx, log, "fatalErr", "session no longer valid")
84 if (ew.waitMin > 0 || ew.waitMax > 0) && ew.events == nil {
85 // First write on a connection with delay.
86 ew.events = make(chan struct {
91 ew.errors = make(chan error)
94 x := recover() // Should not happen, but don't take program down if it does.
96 log.WithContext(ctx).Error("writeEvent panic", slog.Any("err", x))
98 metrics.PanicInc(metrics.Webmailsendevent)
103 ev, ok := <-ew.events
107 d := time.Until(ev.when)
116 err := ew.write(ev.name, ev.v)
125 // Check for previous write error before continuing.
126 if ew.errors != nil {
128 case err := <-ew.errors:
134 // If we have an events channel, we have a goroutine that write the events, delayed.
135 if ew.events != nil {
136 wait := ew.waitMin + time.Duration(mathrand2.IntN(1000))*(ew.waitMax-ew.waitMin)/1000
137 when := time.Now().Add(wait)
138 ew.events <- struct {
144 err := ew.write(name, v)
152// writeFlusher is a writer and flusher. We need to flush after writing an
153// Event. Both to flush pending gzip data to the http response, and the http
154// response to the client.
155type writeFlusher interface {
160// nopFlusher is a standin for writeFlusher if gzip is not used.
161type nopFlusher struct {
165func (f nopFlusher) Flush() error {
169// httpFlusher wraps Flush for a writeFlusher with a call to an http.Flusher.
170type httpFlusher struct {
175// Flush flushes the underlying writeFlusher, and calls Flush on the http.Flusher
176// (which doesn't return an error).
177func (f httpFlusher) Flush() error {
178 err := f.writeFlusher.Flush()