9 "github.com/mjl-/bstore"
11 "github.com/mjl-/mox/message"
12 "github.com/mjl-/mox/mlog"
13 "github.com/mjl-/mox/store"
16// Replace relaces a message for another, atomically, possibly in another mailbox,
17// without needing a sequence of: append message, store \deleted flag, expunge.
20func (c *conn) cmdxReplace(isUID bool, tag, cmd string, p *parser) {
35 var storeFlags store.Flags
38 // Error must be a syntax error, to properly abort the connection due to literal.
40 storeFlags, keywords, err = store.ParseFlagsKeywords(p.xflagList())
42 xsyntaxErrorf("parsing flags: %v", err)
55 // todo: only with utf8 should we we accept message headers with utf-8. we currently always accept them.
56 // todo: this is only relevant if we also support the CATENATE extension?
58 utf8 := p.take("UTF8 (")
63 // For utf8, we already consumed the required ~ above.
64 size, synclit := p.xliteralSize(!utf8, false)
66 // Check the request, including old message in database, whether the message fits
67 // in quota. If a non-nil func is returned, an error was found. Calling the
68 // function aborts handling this command.
70 checkMessage := func(tx *bstore.Tx) func() {
72 return func() { xuserErrorf("mailbox open in read-only mode") }
75 mb, err := c.account.MailboxFind(tx, name)
77 return func() { xserverErrorf("finding mailbox: %v", err) }
80 return func() { xusercodeErrorf("TRYCREATE", "%w", store.ErrUnknownMailbox) }
83 // Resolve "*" for UID or message sequence.
86 return func() { xuserErrorf("cannot use * on empty mailbox") }
89 num = uint32(c.uids[len(c.uids)-1])
91 num = uint32(len(c.uids))
96 // Find or verify UID of message to replace.
99 seq = c.sequence(store.UID(num))
101 return func() { xuserErrorf("unknown uid %d", num) }
103 } else if num > uint32(len(c.uids)) {
104 return func() { xuserErrorf("invalid msgseq") }
109 uidOld = c.uids[int(seq)-1]
111 // Check the message still exists in the database. If it doesn't, it may have been
112 // deleted just now and we won't check the quota. We'll raise an error later on,
113 // when we are not possibly reading a sync literal and can respond with unsolicited
115 q := bstore.QueryTx[store.Message](tx)
116 q.FilterNonzero(store.Message{MailboxID: c.mailboxID, UID: uidOld})
117 q.FilterEqual("Expunged", false)
119 if err == bstore.ErrAbsent {
123 return func() { xserverErrorf("get message to replace: %v", err) }
126 // Check if we can add size bytes. We can't necessarily remove the current message yet.
127 ok, maxSize, err := c.account.CanAddMessageSize(tx, size)
129 return func() { xserverErrorf("check quota: %v", err) }
133 return func() { xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize) }
140 // Check request, if it cannot succeed, fail it now before client is sending the data.
142 name = xcheckmailboxname(name, true)
144 c.account.WithRLock(func() {
145 c.xdbread(func(tx *bstore.Tx) {
146 errfn = checkMessage(tx)
156 name, _, err = store.CheckMailboxName(name, true)
158 errfn = func() { xusercodeErrorf("CANNOT", "%s", err) }
160 c.account.WithRLock(func() {
161 c.xdbread(func(tx *bstore.Tx) {
162 errfn = checkMessage(tx)
169 var newID int64 // Delivered message ID, file removed on error.
174 // We got a non-sync literal, we will consume some data, but abort if there's too
175 // much. We draw the line at 1mb. Client should have used synchronizing literal.
176 if size > 1000*1000 {
178 err := errors.New("error condition and non-synchronizing literal too big")
179 bye := "* BYE [ALERT] " + err.Error()
180 panic(syntaxError{bye, "TOOBIG", err.Error(), err})
182 // Message will not be accepted.
185 // Read the message into a temporary file.
187 file, err = store.CreateMessageTemp(c.log, "imap-replace")
188 xcheckf(err, "creating temp file for message")
189 defer store.CloseRemoveTempFile(c.log, file, "temporary message file")
193 if !commit && newID != 0 {
194 p := c.account.MessagePath(newID)
196 c.xsanity(err, "remove message file for replace after error")
201 // Read the message data.
202 defer c.xtrace(mlog.LevelTracedata)()
203 mw := message.NewWriter(f)
204 msize, err := io.Copy(mw, io.LimitReader(c.br, size))
205 c.xtrace(mlog.LevelTrace) // Restore.
207 // Cannot use xcheckf due to %w handling of errIO.
208 c.xbrokenf("reading literal message: %s (%w)", err, errIO)
211 c.xbrokenf("read %d bytes for message, expected %d (%w)", msize, size, errIO)
214 // Finish reading the command.
215 line := c.readline(false)
216 p = newParser(line, c)
222 // If an error was found earlier, abort the command now that we've read the message.
227 var oldMsgExpunged bool
229 var om, nm store.Message
230 var mbSrc, mbDst store.Mailbox // Src and dst mailboxes can be different.
../rfc/8508:263
231 var pendingChanges []store.Change
233 c.account.WithWLock(func() {
234 var changes []store.Change
236 c.xdbwrite(func(tx *bstore.Tx) {
237 mbSrc = c.xmailboxID(tx, c.mailboxID)
239 // Get old message. If it has been expunged, we should have a pending change for
240 // it. We'll send untagged responses and fail the command.
242 qom := bstore.QueryTx[store.Message](tx)
243 qom.FilterNonzero(store.Message{MailboxID: mbSrc.ID, UID: uidOld})
245 xcheckf(err, "get old message to replace from database")
247 oldMsgExpunged = true
251 // Check quota for addition of new message. We can't necessarily yet remove the old message.
252 ok, maxSize, err := c.account.CanAddMessageSize(tx, mw.Size)
253 xcheckf(err, "checking quota")
256 xusercodeErrorf("OVERQUOTA", "account over maximum total message size %d", maxSize)
259 modseq, err := c.account.NextModSeq(tx)
260 xcheckf(err, "get next mod seq")
262 chremuids, _, err := c.account.MessageRemove(c.log, tx, modseq, &mbSrc, store.RemoveOpts{}, om)
263 xcheckf(err, "expunge old message")
264 changes = append(changes, chremuids)
265 // Note: we only add a mbSrc counts change later on, if it is not equal to mbDst.
267 err = tx.Update(&mbSrc)
268 xcheckf(err, "updating source mailbox counts")
270 mbDst = c.xmailbox(tx, name, "TRYCREATE")
271 mbDst.ModSeq = modseq
273 nkeywords := len(mbDst.Keywords)
275 // Make new message to deliver.
278 MailboxOrigID: mbDst.ID,
287 err = c.account.MessageAdd(c.log, tx, &mbDst, &nm, file, store.AddOpts{})
288 xcheckf(err, "delivering message")
291 changes = append(changes, nm.ChangeAddUID(), mbDst.ChangeCounts())
292 if nkeywords != len(mbDst.Keywords) {
293 changes = append(changes, mbDst.ChangeKeywords())
296 err = tx.Update(&mbDst)
297 xcheckf(err, "updating destination mailbox")
300 // Fetch pending changes, possibly with new UIDs, so we can apply them before adding our own new UID.
301 pendingChanges = c.comm.Get()
307 // Success, make sure messages aren't cleaned up anymore.
310 // Broadcast the change to other connections.
311 if mbSrc.ID != mbDst.ID {
312 changes = append(changes, mbSrc.ChangeCounts())
317 // Must update our msgseq/uids tracking with latest pending changes.
318 c.applyChanges(pendingChanges, false)
320 // If we couldn't find the message, send a NO response. We've just applied pending
321 // changes, which should have expunged the absent message.
323 xuserErrorf("message to be replaced has been expunged")
326 // If the destination mailbox is our currently selected mailbox, we register and
327 // announce the new message.
328 if mbDst.ID == c.mailboxID {
330 // We send an untagged OK with APPENDUID, for sane bookkeeping in clients.
../rfc/8508:401
331 c.bwritelinef("* OK [APPENDUID %d %d] ", mbDst.UIDValidity, nm.UID)
332 c.bwritelinef("* %d EXISTS", len(c.uids))
335 // We must return vanished instead of expunge, and also highestmodseq, when qresync
337 qresync := c.enabled[capQresync]
339 // Now that we are in sync with msgseq, we can find our old msgseq and say it is
341 omsgseq := c.xsequence(om.UID)
342 c.sequenceRemove(omsgseq, om.UID)
344 c.bwritelinef("* VANISHED %d", om.UID)
347 c.bwritelinef("* %d EXPUNGE", omsgseq)
349 c.writeresultf("%s OK [HIGHESTMODSEQ %d] replaced", tag, nm.ModSeq.Client())