7// Work is a slot for work that needs to be done.
 
8type Work[T, R any] struct {
 
17// WorkQueue can be used to execute a work load where many items are processed
 
18// with a slow step and where a pool of workers goroutines to execute the slow
 
19// step helps. Reading messages from the database file is fast and cannot be
 
20// easily done concurrently, but reading the message file from disk and parsing
 
21// the headers is the bottleneck. The workqueue can manage the goroutines that
 
22// read the message file from disk and parse.
 
23type WorkQueue[T, R any] struct {
 
29	wg   sync.WaitGroup // For waiting for workers to stop.
 
33	process func(T, R) error
 
36// NewWorkQueue creates a new work queue with "procs" goroutines, and a total work
 
37// queue size of "size" (e.g. 2*procs). The worker goroutines run "preparer", which
 
38// should be a loop receiving work from "in" and sending the work result (with Err
 
39// or Out set) on "out". The preparer function should return when the "in" channel
 
40// is closed, the signal to stop. WorkQueue processes the results in the order they
 
41// went in, so prepared work that was scheduled after earlier work that is not yet
 
42// prepared will wait and be queued.
 
43func NewWorkQueue[T, R any](procs, size int, preparer func(in, out chan Work[T, R]), process func(T, R) error) *WorkQueue[T, R] {
 
44	wq := &WorkQueue[T, R]{
 
46		ring:    make([]Work[T, R], size),
 
47		work:    make(chan Work[T, R], size), // Ensure scheduling never blocks for main goroutine.
 
48		done:    make(chan Work[T, R], size), // Ensure sending result never blocks for worker goroutine.
 
53	for i := 0; i < procs; i++ {
 
56			preparer(wq.work, wq.done)
 
63// Add adds new work to be prepared to the queue. If the queue is full, it
 
64// waits until space becomes available, i.e. when the head of the queue has
 
65// work that becomes prepared. Add processes the prepared items to make space
 
67func (wq *WorkQueue[T, R]) Add(in T) error {
 
68	// Schedule the new work if we can.
 
70		wq.work <- Work[T, R]{i: (wq.start + wq.n) % wq.max, done: true, In: in}
 
75	// We cannot schedule new work. Wait for finished work until start is done.
 
84	// Process as much finished work as possible. Will be at least 1.
 
85	if err := wq.processHead(); err != nil {
 
89	// Schedule this message as new work.
 
90	wq.work <- Work[T, R]{i: (wq.start + wq.n) % wq.max, done: true, In: in}
 
95// processHead processes the work at the head of the queue by calling process
 
97func (wq *WorkQueue[T, R]) processHead() error {
 
98	for wq.n > 0 && wq.ring[wq.start].done {
 
99		wq.ring[wq.start].done = false
 
100		w := wq.ring[wq.start]
 
101		wq.start = (wq.start + 1) % len(wq.ring)
 
107		if err := wq.process(w.In, w.Out); err != nil {
 
114// Finish waits for the remaining work to be prepared and processes the work.
 
115func (wq *WorkQueue[T, R]) Finish() error {
 
117	for wq.n > 0 && err == nil {
 
121		err = wq.processHead()
 
126// Stop shuts down the worker goroutines and waits until they have returned.
 
127// Stop must always be called on a WorkQueue, otherwise the goroutines never stop.
 
128func (wq *WorkQueue[T, R]) Stop() {