1package imapserver
2
3import (
4 "errors"
5 "io"
6 "os"
7 "time"
8
9 "github.com/mjl-/bstore"
10
11 "github.com/mjl-/mox/message"
12 "github.com/mjl-/mox/mlog"
13 "github.com/mjl-/mox/store"
14)
15
16// Replace relaces a message for another, atomically, possibly in another mailbox,
17// without needing a sequence of: append message, store \deleted flag, expunge.
18//
19// State: Selected
20func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) {
21 // Command: ../rfc/8508:158 ../rfc/8508:198
22
23 // Request syntax: ../rfc/8508:471
24 p.xspace()
25 star := p.take("*")
26 var num uint32
27 if !star {
28 num = p.xnznumber()
29 }
30 p.xspace()
31 name := p.xmailbox()
32
33 // ../rfc/4466:473
34 p.xspace()
35 var storeFlags store.Flags
36 var keywords []string
37 if p.hasPrefix("(") {
38 // Error must be a syntax error, to properly abort the connection due to literal.
39 var err error
40 storeFlags, keywords, err = store.ParseFlagsKeywords(p.xflagList())
41 if err != nil {
42 xsyntaxErrorf("parsing flags: %v", err)
43 }
44 p.xspace()
45 }
46
47 var tm time.Time
48 if p.hasPrefix(`"`) {
49 tm = p.xdateTime()
50 p.xspace()
51 } else {
52 tm = time.Now()
53 }
54
55 // todo: only with utf8 should we we accept message headers with utf-8. we currently always accept them.
56 // todo: this is only relevant if we also support the CATENATE extension?
57 // ../rfc/6855:204
58 utf8 := p.take("UTF8 (")
59 if utf8 {
60 p.xtake("~")
61 }
62 // Always allow literal8, for binary extension. ../rfc/4466:486
63 // For utf8, we already consumed the required ~ above.
64 size, synclit := p.xliteralSize(!utf8, false)
65
66 // Check the request, including old message in database, whether the message fits
67 // in quota. If a non-nil func is returned, an error was found. Calling the
68 // function aborts handling this command.
69 var uidOld store.UID
70 checkMessage := func(tx *bstore.Tx) func() {
71 if c.readonly {
72 return func() { xuserErrorf("mailbox open in read-only mode") }
73 }
74
75 mb, err := c.account.MailboxFind(tx, name)
76 if err != nil {
77 return func() { xserverErrorf("finding mailbox: %v", err) }
78 }
79 if mb == nil {
80 return func() { xusercodeErrorf("TRYCREATE", "%w", store.ErrUnknownMailbox) }
81 }
82
83 // Resolve "*" for UID or message sequence.
84 if star {
85 if c.uidonly {
86 q := bstore.QueryTx[store.Message](tx)
87 q.FilterNonzero(store.Message{MailboxID: c.mailboxID})
88 q.FilterEqual("Expunged", false)
89 q.FilterLess("UID", c.uidnext)
90 q.SortDesc("UID")
91 q.Limit(1)
92 m, err := q.Get()
93 if err == bstore.ErrAbsent {
94 return func() { xsyntaxErrorf("cannot use * on empty mailbox") }
95 }
96 xcheckf(err, "get last message in mailbox")
97 num = uint32(m.UID)
98 } else if c.exists == 0 {
99 return func() { xsyntaxErrorf("cannot use * on empty mailbox") }
100 } else if isUID {
101 num = uint32(c.uids[c.exists-1])
102 } else {
103 num = uint32(c.exists)
104 }
105 star = false
106 }
107
108 // Find or verify UID of message to replace.
109 if isUID {
110 uidOld = store.UID(num)
111 } else if num > c.exists {
112 return func() { xuserErrorf("invalid msgseq") }
113 } else {
114 uidOld = c.uids[int(num)-1]
115 }
116
117 // Check the message still exists in the database. If it doesn't, it may have been
118 // deleted just now and we won't check the quota. We'll raise an error later on,
119 // when we are not possibly reading a sync literal and can respond with unsolicited
120 // expunges.
121 q := bstore.QueryTx[store.Message](tx)
122 q.FilterNonzero(store.Message{MailboxID: c.mailboxID, UID: uidOld})
123 q.FilterEqual("Expunged", false)
124 q.FilterLess("UID", c.uidnext)
125 _, err = q.Get()
126 if err == bstore.ErrAbsent {
127 return nil
128 }
129 if err != nil {
130 return func() { xserverErrorf("get message to replace: %v", err) }
131 }
132
133 // Check if we can add size bytes. We can't necessarily remove the current message yet.
134 ok, maxSize, err := c.account.CanAddMessageSize(tx, size)
135 if err != nil {
136 return func() { xserverErrorf("check quota: %v", err) }
137 }
138 if !ok {
139 // ../rfc/9208:472
140 return func() { xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize) }
141 }
142 return nil
143 }
144
145 var errfn func()
146 if synclit {
147 // Check request, if it cannot succeed, fail it now before client is sending the data.
148
149 name = xcheckmailboxname(name, true)
150
151 c.account.WithRLock(func() {
152 c.xdbread(func(tx *bstore.Tx) {
153 errfn = checkMessage(tx)
154 if errfn != nil {
155 errfn()
156 }
157 })
158 })
159
160 c.xwritelinef("+ ")
161 } else {
162 var err error
163 name, _, err = store.CheckMailboxName(name, true)
164 if err != nil {
165 errfn = func() { xusercodeErrorf("CANNOT", "%s", err) }
166 } else {
167 c.account.WithRLock(func() {
168 c.xdbread(func(tx *bstore.Tx) {
169 errfn = checkMessage(tx)
170 })
171 })
172 }
173 }
174
175 var file *os.File
176 var newID int64 // Delivered message ID, file removed on error.
177 var f io.Writer
178 var commit bool
179
180 if errfn != nil {
181 // We got a non-sync literal, we will consume some data, but abort if there's too
182 // much. We draw the line at 1mb. Client should have used synchronizing literal.
183 if size > 1000*1000 {
184 // ../rfc/9051:357 ../rfc/3501:347
185 err := errors.New("error condition and non-synchronizing literal too big")
186 bye := "* BYE [ALERT] " + err.Error()
187 panic(syntaxError{bye, "TOOBIG", err.Error(), err})
188 }
189 // Message will not be accepted.
190 f = io.Discard
191 } else {
192 // Read the message into a temporary file.
193 var err error
194 file, err = store.CreateMessageTemp(c.log, "imap-replace")
195 xcheckf(err, "creating temp file for message")
196 defer store.CloseRemoveTempFile(c.log, file, "temporary message file")
197 f = file
198
199 defer func() {
200 if !commit && newID != 0 {
201 p := c.account.MessagePath(newID)
202 err := os.Remove(p)
203 c.xsanity(err, "remove message file for replace after error")
204 }
205 }()
206 }
207
208 // Read the message data.
209 defer c.xtraceread(mlog.LevelTracedata)()
210 mw := message.NewWriter(f)
211 msize, err := io.Copy(mw, io.LimitReader(c.br, size))
212 c.xtraceread(mlog.LevelTrace) // Restore.
213 if err != nil {
214 // Cannot use xcheckf due to %w handling of errIO.
215 c.xbrokenf("reading literal message: %s (%w)", err, errIO)
216 }
217 if msize != size {
218 c.xbrokenf("read %d bytes for message, expected %d (%w)", msize, size, errIO)
219 }
220
221 // Finish reading the command.
222 line := c.xreadline(false)
223 p = newParser(line, c)
224 if utf8 {
225 p.xtake(")")
226 }
227 p.xempty()
228
229 // If an error was found earlier, abort the command now that we've read the message.
230 if errfn != nil {
231 errfn()
232 }
233
234 var oldMsgExpunged bool
235
236 var om, nm store.Message
237 var mbSrc, mbDst store.Mailbox // Src and dst mailboxes can be different. ../rfc/8508:263
238 var overflow bool
239 var pendingChanges []store.Change
240 defer func() {
241 // In case of panic.
242 c.flushChanges(pendingChanges)
243 }()
244
245 c.account.WithWLock(func() {
246 var changes []store.Change
247
248 c.xdbwrite(func(tx *bstore.Tx) {
249 mbSrc = c.xmailboxID(tx, c.mailboxID)
250
251 // Get old message. If it has been expunged, we should have a pending change for
252 // it. We'll send untagged responses and fail the command.
253 var err error
254 qom := bstore.QueryTx[store.Message](tx)
255 qom.FilterNonzero(store.Message{MailboxID: mbSrc.ID, UID: uidOld})
256 om, err = qom.Get()
257 xcheckf(err, "get old message to replace from database")
258 if om.Expunged {
259 oldMsgExpunged = true
260 return
261 }
262
263 // Check quota for addition of new message. We can't necessarily yet remove the old message.
264 ok, maxSize, err := c.account.CanAddMessageSize(tx, mw.Size)
265 xcheckf(err, "checking quota")
266 if !ok {
267 // ../rfc/9208:472
268 xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
269 }
270
271 modseq, err := c.account.NextModSeq(tx)
272 xcheckf(err, "get next mod seq")
273
274 chremuids, _, err := c.account.MessageRemove(c.log, tx, modseq, &mbSrc, store.RemoveOpts{}, om)
275 xcheckf(err, "expunge old message")
276 changes = append(changes, chremuids)
277 // Note: we only add a mbSrc counts change later on, if it is not equal to mbDst.
278
279 err = tx.Update(&mbSrc)
280 xcheckf(err, "updating source mailbox counts")
281
282 mbDst = c.xmailbox(tx, name, "TRYCREATE")
283 mbDst.ModSeq = modseq
284
285 nkeywords := len(mbDst.Keywords)
286
287 // Make new message to deliver.
288 nm = store.Message{
289 MailboxID: mbDst.ID,
290 MailboxOrigID: mbDst.ID,
291 Received: tm,
292 Flags: storeFlags,
293 Keywords: keywords,
294 Size: mw.Size,
295 ModSeq: modseq,
296 CreateSeq: modseq,
297 }
298
299 err = c.account.MessageAdd(c.log, tx, &mbDst, &nm, file, store.AddOpts{})
300 xcheckf(err, "delivering message")
301 newID = nm.ID
302
303 changes = append(changes, nm.ChangeAddUID(mbDst), mbDst.ChangeCounts())
304 if nkeywords != len(mbDst.Keywords) {
305 changes = append(changes, mbDst.ChangeKeywords())
306 }
307
308 err = tx.Update(&mbDst)
309 xcheckf(err, "updating destination mailbox")
310 })
311
312 // Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
313 overflow, pendingChanges = c.comm.Get()
314
315 if oldMsgExpunged {
316 return
317 }
318
319 // Success, make sure messages aren't cleaned up anymore.
320 commit = true
321
322 // Broadcast the change to other connections.
323 if mbSrc.ID != mbDst.ID {
324 changes = append(changes, mbSrc.ChangeCounts())
325 }
326 c.broadcast(changes)
327 })
328
329 // Must update our msgseq/uids tracking with latest pending changes.
330 l := pendingChanges
331 pendingChanges = nil
332 c.xapplyChanges(overflow, l, false)
333
334 // If we couldn't find the message, send a NO response. We've just applied pending
335 // changes, which should have expunged the absent message.
336 if oldMsgExpunged {
337 xuserErrorf("message to be replaced has been expunged")
338 }
339
340 // If the destination mailbox is our currently selected mailbox, we register and
341 // announce the new message.
342 if mbDst.ID == c.mailboxID {
343 c.uidAppend(nm.UID)
344 // We send an untagged OK with APPENDUID, for sane bookkeeping in clients. ../rfc/8508:401
345 c.xbwritelinef("* OK [APPENDUID %d %d] ", mbDst.UIDValidity, nm.UID)
346 c.xbwritelinef("* %d EXISTS", c.exists)
347 }
348
349 // We must return vanished instead of expunge, and also highestmodseq, when qresync
350 // was enabled. ../rfc/8508:422 ../rfc/7162:1883
351 qresync := c.enabled[capQresync]
352
353 // Now that we are in sync with msgseq, we can find our old msgseq and say it is
354 // expunged or vanished. ../rfc/7162:1900
355 var oseq msgseq
356 if c.uidonly {
357 c.exists--
358 } else {
359 oseq = c.xsequence(om.UID)
360 c.sequenceRemove(oseq, om.UID)
361 }
362 if qresync || c.uidonly {
363 c.xbwritelinef("* VANISHED %d", om.UID)
364 // ../rfc/7162:1916
365 } else {
366 c.xbwritelinef("* %d EXPUNGE", oseq)
367 }
368 c.xwriteresultf("%s OK [HIGHESTMODSEQ %d] replaced", tag, nm.ModSeq.Client())
369}
370