package
0.4.11
Repository: https://github.com/rbell/toolchest.git
Documentation: pkg.go.dev

# README

WorkQueue

Thw workqueue package is inspired by the taskQueue at https://github.com/richardwilkes/toolbox/tree/master/taskqueue. The differences being:

  • Work queued can be prioritized
  • Work's priority can be adjusted
  • Errors can be monitored by observing a channel

The workqueue.queue allows the processing of work in a queued fashion. By default, the queue's length is set to two times the number of CPUs, and the number of go routines working on the queue equal to the number of CPUs.

Work submitted to the queue takes the form of a function signature of func() error. When work is performed off the queue any errors are communicated back via a channel accessible via a call to the queue's Errors() function.

Work submitted to the queue, by default, all have equal priority and are processed on the queue's go routines in the order they were placed. Of course since the queue's go routines may operate in parallel, the work on the queue may not be finished in the same sequence they were put on the queue depending on go's scheduler.

However, there is an option to set a priority when submitting work to the queue. This will influence the order which work is processed. Work with a lower priority number is taken first (i.e. Work X is priority 1, while work Y is priority 2, resulting in work X taking precedence over work Y).

Sometimes the priority of queued work needs adjusted. For instance if a low priority work is queued for quite a while due to higher priority work being added to the queue. The work's priority can be dynamically adjuested with an option when the work is submitted (see example below).

License

This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt.

Simple Example

package main

import (
	"fmt"
	"github.com/rbell/toolchest/workqueue"
	"sync"
	"sync/atomic"
	"time"
)

func main() {
	// Create queue with number of workers equal to number of cpus and queue length equal to number cpus * 2
	q := workqueue.NewQueue()

	// wg to prevent app from exiting before all work is done
	wg := &sync.WaitGroup{}

	count := atomic.Int32{}

	// make 100 work functions to perform on the queue - each work function will increment count and print the resulting value, then emulate doing some work.
	work := make([]workqueue.Work, 100)
	for i := 0; i < 100; i++ {
		work[i] = func() error {
			defer wg.Done()
			index := count.Add(1)
			fmt.Printf("Doing some work! %v\n", index) // emulate logging
			time.Sleep(time.Millisecond * 100)         // emulate doing some processing
			return nil
		}
	}

	// Queue the work
	for _, w := range work {
		wg.Add(1)
		q.Enqueue(w)
	}

	wg.Wait()
}

Configuration

Configure the queue's length:

q := workqueue.NewQueue(workqueue.WithQueueLength(10))

Configure the number of go routines performing work on the queue:

q := workqueue.NewQueue(workqueue.WithWorkers(2))

Configuration options can be combined:

q := workqueue.NewQueue(workqueue.WithWorkers(2), workqueue.WithQueueLength(10))

Prioritize Work

Work can be prioritized when submitting the work to the queue:

q.QueueWork(workX, workqueue.WithPriority(1))
q.QueueWork(workY, workqueue.WithPriority(2))

(workX will take precidence over workY)

Dynamically Re Prioritizing Work

Work can be re prioritized dynamically:

adjustAt := time.Now().Add(time.Minute)
q.QueueWork(workX, 
	workqueue.WithPriority(100),
	workqueue.WithAdjustPriority(
		func() int {
			if time.Now() > adjustAt {
				return 1
			}
		}
    ))

In the above example, if workX is still queued after one minute, its priority will be set to priority one and, assuming no other work is prioritized above it, workX will be performed ahead of all other work.

Error Monitoring

Errors returned by work placed on the queue can be monitored via a call to the queue's Errors() function:

	q := workqueue.NewQueue()

	// launch go routine to monitor errors
	go func() {
		errCh := q.Errors()
		for {
			err := <-errCh
			if err != nil {
				fmt.Println(err)
			} else {
				break
			}
		}
	}()

Each call to Errors() returns a unique channel allowing multiple routines to "subscribe" to errors being reported by the queue.

Examples

Runnable examples can be found in the toolchest/workqueue/examples folder.

# Packages

No description provided by the author

# Functions

NewQueue returns a reference to an initialized Queue.
WithAdjustPriority adds an adjustment priorty function to the workItem such that it's priority can be dynamically adjusted in queue.
WithName adds a name for the work (for reporting).
WithPriority sets the priority of the work to be done.
WithQueueLength sets the number of functions that can be queued up before routines queueing are blocked.
WithWorkers sets the number of go routines working on the workChan.

# Constants

work states.
work states.

# Structs

Queue allow work to be queued up and worked on in a set number of go routines.
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author