1package webmail
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 mathrand "math/rand"
11 "net/http"
12 "runtime/debug"
13 "sync"
14 "time"
15
16 "github.com/mjl-/mox/metrics"
17 "github.com/mjl-/mox/mlog"
18 "github.com/mjl-/mox/store"
19)
20
21type eventWriter struct {
22 out writeFlusher
23 waitMin, waitMax time.Duration
24
25 // If connection is closed, the goroutine doing delayed writes must abort.
26 sync.Mutex
27 closed bool
28
29 // Before writing an event, we check if session is still valid. If not, we send a
30 // fatal error instead.
31 accountName string
32 sessionToken store.SessionToken
33
34 wrote bool // To be reset by user, set on write.
35 events chan struct {
36 name string // E.g. "start" for EventStart.
37 v any // Written as JSON.
38 when time.Time // For delaying.
39 } // Will only be set when waitMin or waitMax is > 0. Closed on connection shutdown.
40 errors chan error // If we have an events channel, we read errors and abort for them.
41}
42
43func newEventWriter(out writeFlusher, waitMin, waitMax time.Duration, accountName string, sessionToken store.SessionToken) *eventWriter {
44 return &eventWriter{out: out, waitMin: waitMin, waitMax: waitMax, accountName: accountName, sessionToken: sessionToken}
45}
46
47// close shuts down the events channel, causing the goroutine (if created) to
48// stop.
49func (ew *eventWriter) close() {
50 if ew.events != nil {
51 close(ew.events)
52 }
53 ew.Lock()
54 defer ew.Unlock()
55 ew.closed = true
56}
57
58// Write an event to the connection, e.g. "start" with value v, written as
59// JSON. This directly writes the event, no more delay.
60func (ew *eventWriter) write(name string, v any) error {
61 bw := bufio.NewWriter(ew.out)
62 if _, err := fmt.Fprintf(bw, "event: %s\ndata: ", name); err != nil {
63 return err
64 } else if err := json.NewEncoder(bw).Encode(v); err != nil {
65 return err
66 } else if _, err := fmt.Fprint(bw, "\n"); err != nil {
67 return err
68 } else if err := bw.Flush(); err != nil {
69 return err
70 }
71 return ew.out.Flush()
72}
73
74// For random wait between min and max delay.
75var waitGen = mathrand.New(mathrand.NewSource(time.Now().UnixNano()))
76
77// Schedule an event for writing to the connection. If events get a delay, this
78// function still returns immediately.
79func (ew *eventWriter) xsendEvent(ctx context.Context, log mlog.Log, name string, v any) {
80 if name != "fatalErr" {
81 if _, err := store.SessionUse(ctx, log, ew.accountName, ew.sessionToken, ""); err != nil {
82 ew.xsendEvent(ctx, log, "fatalErr", "session no longer valid")
83 return
84 }
85 }
86
87 if (ew.waitMin > 0 || ew.waitMax > 0) && ew.events == nil {
88 // First write on a connection with delay.
89 ew.events = make(chan struct {
90 name string
91 v any
92 when time.Time
93 }, 100)
94 ew.errors = make(chan error)
95 go func() {
96 defer func() {
97 x := recover() // Should not happen, but don't take program down if it does.
98 if x != nil {
99 log.WithContext(ctx).Error("writeEvent panic", slog.Any("err", x))
100 debug.PrintStack()
101 metrics.PanicInc(metrics.Webmailsendevent)
102 }
103 }()
104
105 for {
106 ev, ok := <-ew.events
107 if !ok {
108 return
109 }
110 d := time.Until(ev.when)
111 if d > 0 {
112 time.Sleep(d)
113 }
114 ew.Lock()
115 if ew.closed {
116 ew.Unlock()
117 return
118 }
119 err := ew.write(ev.name, ev.v)
120 ew.Unlock()
121 if err != nil {
122 ew.errors <- err
123 return
124 }
125 }
126 }()
127 }
128 // Check for previous write error before continuing.
129 if ew.errors != nil {
130 select {
131 case err := <-ew.errors:
132 panic(ioErr{err})
133 default:
134 break
135 }
136 }
137 // If we have an events channel, we have a goroutine that write the events, delayed.
138 if ew.events != nil {
139 wait := ew.waitMin + time.Duration(waitGen.Intn(1000))*(ew.waitMax-ew.waitMin)/1000
140 when := time.Now().Add(wait)
141 ew.events <- struct {
142 name string
143 v any
144 when time.Time
145 }{name, v, when}
146 } else {
147 err := ew.write(name, v)
148 if err != nil {
149 panic(ioErr{err})
150 }
151 }
152 ew.wrote = true
153}
154
155// writeFlusher is a writer and flusher. We need to flush after writing an
156// Event. Both to flush pending gzip data to the http response, and the http
157// response to the client.
158type writeFlusher interface {
159 io.Writer
160 Flush() error
161}
162
163// nopFlusher is a standin for writeFlusher if gzip is not used.
164type nopFlusher struct {
165 io.Writer
166}
167
168func (f nopFlusher) Flush() error {
169 return nil
170}
171
172// httpFlusher wraps Flush for a writeFlusher with a call to an http.Flusher.
173type httpFlusher struct {
174 writeFlusher
175 f http.Flusher
176}
177
178// Flush flushes the underlying writeFlusher, and calls Flush on the http.Flusher
179// (which doesn't return an error).
180func (f httpFlusher) Flush() error {
181 err := f.writeFlusher.Flush()
182 f.f.Flush()
183 return err
184}
185