1package store
2
3import (
4 "errors"
5 "fmt"
6 "io"
7 "os"
8)
9
10// MsgReader provides access to a message. Reads return the "msg_prefix" in the
11// database (typically received headers), followed by the on-disk msg file
12// contents. MsgReader is an io.Reader, io.ReaderAt and io.Closer.
13type MsgReader struct {
14 prefix []byte // First part of the message. Typically contains received headers.
15 path string // To on-disk message file.
16 size int64 // Total size of message, including prefix and contents from path.
17 offset int64 // Current reading offset.
18 f *os.File // Opened path, automatically opened after prefix has been read.
19 err error // If set, error to return for reads. Sets io.EOF for readers, but ReadAt ignores them.
20}
21
22var errMsgClosed = errors.New("msg is closed")
23
24// FileMsgReader makes a MsgReader for an open file.
25// If initialization fails, reads will return the error.
26// Only call close on the returned MsgReader if you want to close msgFile.
27func FileMsgReader(prefix []byte, msgFile *os.File) *MsgReader {
28 mr := &MsgReader{prefix: prefix, path: msgFile.Name(), f: msgFile}
29 fi, err := msgFile.Stat()
30 if err != nil {
31 mr.err = err
32 return mr
33 }
34 mr.size = int64(len(prefix)) + fi.Size()
35 return mr
36}
37
38// Read reads data from the msg, taking prefix and on-disk msg file into account.
39// The read offset is adjusted after the read.
40func (m *MsgReader) Read(buf []byte) (int, error) {
41 return m.read(buf, m.offset, false)
42}
43
44// ReadAt reads data from the msg, taking prefix and on-disk msg file into account.
45// The read offset is not affected by ReadAt.
46func (m *MsgReader) ReadAt(buf []byte, off int64) (n int, err error) {
47 return m.read(buf, off, true)
48}
49
50// read always fill buf as far as possible, for ReadAt semantics.
51func (m *MsgReader) read(buf []byte, off int64, pread bool) (int, error) {
52 // If a reader has consumed the file and reached EOF, further ReadAt must not return eof.
53 if m.err != nil && (!pread || m.err != io.EOF) {
54 return 0, m.err
55 }
56 var o int
57 for o < len(buf) {
58 // First attempt to read from m.prefix.
59 pn := int64(len(m.prefix)) - off
60 if pn > 0 {
61 n := len(buf)
62 if int64(n) > pn {
63 n = int(pn)
64 }
65 copy(buf[o:], m.prefix[int(off):int(off)+n])
66 o += n
67 off += int64(n)
68 if !pread {
69 m.offset += int64(n)
70 }
71 continue
72 }
73
74 // Now we need to read from file. Ensure it is open.
75 if m.f == nil {
76 f, err := os.Open(m.path)
77 if err != nil {
78 m.err = err
79 break
80 }
81 m.f = f
82 }
83 n, err := m.f.ReadAt(buf[o:], off-int64(len(m.prefix)))
84 if !pread && n > 0 {
85 m.offset += int64(n)
86 }
87 if !pread || err != io.EOF {
88 m.err = err
89 }
90 if n > 0 {
91 o += n
92 off += int64(n)
93 }
94 if err == io.EOF {
95 if off > m.size && (m.err == nil || m.err == io.EOF) {
96 err = fmt.Errorf("on-disk message larger than expected (off %d, size %d)", off, m.size)
97 m.err = err
98 }
99 return o, err
100 }
101 if n <= 0 {
102 break
103 }
104 }
105 if off > m.size && (m.err == nil || m.err == io.EOF) {
106 m.err = fmt.Errorf("on-disk message larger than expected (off %d, size %d, prefix %d)", off, m.size, len(m.prefix))
107 }
108 return o, m.err
109}
110
111// Close ensures the msg file is closed. Further reads will fail.
112func (m *MsgReader) Close() error {
113 if m.f != nil {
114 if err := m.f.Close(); err != nil {
115 return err
116 }
117 m.f = nil
118 }
119 if m.err == errMsgClosed {
120 return m.err
121 }
122 m.err = errMsgClosed
123 return nil
124}
125
126// Reset rewinds the offset and clears error conditions, making it usable as a fresh reader.
127func (m *MsgReader) Reset() {
128 m.offset = 0
129 m.err = nil
130}
131
132// Size returns the total size of the contents of the message.
133func (m *MsgReader) Size() int64 {
134 return m.size
135}
136