1package moxio
2
3import (
4 "sync"
5)
6
7// Work is a slot for work that needs to be done.
8type Work[T, R any] struct {
9 In T
10 Err error
11 Out R
12
13 i int
14 done bool
15}
16
17// WorkQueue can be used to execute a work load where many items are processed
18// with a slow step and where a pool of workers goroutines to execute the slow
19// step helps. Reading messages from the database file is fast and cannot be
20// easily done concurrently, but reading the message file from disk and parsing
21// the headers is the bottleneck. The workqueue can manage the goroutines that
22// read the message file from disk and parse.
23type WorkQueue[T, R any] struct {
24 max int
25 ring []Work[T, R]
26 start int
27 n int
28
29 wg sync.WaitGroup // For waiting for workers to stop.
30 work chan Work[T, R]
31 done chan Work[T, R]
32
33 process func(T, R) error
34}
35
36// NewWorkQueue creates a new work queue with "procs" goroutines, and a total work
37// queue size of "size" (e.g. 2*procs). The worker goroutines run "preparer", which
38// should be a loop receiving work from "in" and sending the work result (with Err
39// or Out set) on "out". The preparer function should return when the "in" channel
40// is closed, the signal to stop. WorkQueue processes the results in the order they
41// went in, so prepared work that was scheduled after earlier work that is not yet
42// prepared will wait and be queued.
43func NewWorkQueue[T, R any](procs, size int, preparer func(in, out chan Work[T, R]), process func(T, R) error) *WorkQueue[T, R] {
44 wq := &WorkQueue[T, R]{
45 max: size,
46 ring: make([]Work[T, R], size),
47 work: make(chan Work[T, R], size), // Ensure scheduling never blocks for main goroutine.
48 done: make(chan Work[T, R], size), // Ensure sending result never blocks for worker goroutine.
49 process: process,
50 }
51
52 wq.wg.Add(procs)
53 for i := 0; i < procs; i++ {
54 go func() {
55 defer wq.wg.Done()
56 preparer(wq.work, wq.done)
57 }()
58 }
59
60 return wq
61}
62
63// Add adds new work to be prepared to the queue. If the queue is full, it
64// waits until space becomes available, i.e. when the head of the queue has
65// work that becomes prepared. Add processes the prepared items to make space
66// available.
67func (wq *WorkQueue[T, R]) Add(in T) error {
68 // Schedule the new work if we can.
69 if wq.n < wq.max {
70 wq.work <- Work[T, R]{i: (wq.start + wq.n) % wq.max, done: true, In: in}
71 wq.n++
72 return nil
73 }
74
75 // We cannot schedule new work. Wait for finished work until start is done.
76 for {
77 w := <-wq.done
78 wq.ring[w.i] = w
79 if w.i == wq.start {
80 break
81 }
82 }
83
84 // Process as much finished work as possible. Will be at least 1.
85 if err := wq.processHead(); err != nil {
86 return err
87 }
88
89 // Schedule this message as new work.
90 wq.work <- Work[T, R]{i: (wq.start + wq.n) % wq.max, done: true, In: in}
91 wq.n++
92 return nil
93}
94
95// processHead processes the work at the head of the queue by calling process
96// on the work.
97func (wq *WorkQueue[T, R]) processHead() error {
98 for wq.n > 0 && wq.ring[wq.start].done {
99 wq.ring[wq.start].done = false
100 w := wq.ring[wq.start]
101 wq.start = (wq.start + 1) % len(wq.ring)
102 wq.n -= 1
103
104 if w.Err != nil {
105 return w.Err
106 }
107 if err := wq.process(w.In, w.Out); err != nil {
108 return err
109 }
110 }
111 return nil
112}
113
114// Finish waits for the remaining work to be prepared and processes the work.
115func (wq *WorkQueue[T, R]) Finish() error {
116 var err error
117 for wq.n > 0 && err == nil {
118 w := <-wq.done
119 wq.ring[w.i] = w
120
121 err = wq.processHead()
122 }
123 return err
124}
125
126// Stop shuts down the worker goroutines and waits until they have returned.
127// Stop must always be called on a WorkQueue, otherwise the goroutines never stop.
128func (wq *WorkQueue[T, R]) Stop() {
129 close(wq.work)
130 wq.wg.Wait()
131}
132