Categorygithub.com/redsift/go-beanspike
modulepackage
1.1.1
Repository: https://github.com/redsift/go-beanspike.git
Documentation: pkg.go.dev

# README

Beanspike

Beanstalk inspired job queue backed by Aerospike KVS.

Basic usage

Put

conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, err := tube.Put([]byte("hello"), 0, 0, true)

Put(body []byte, delay time.Duration, ttr time.Duration, lz bool) where delay will hold the job in a delayed mode before releasing it for processing. Note, in practice all delayed jobs will be behind any immediately available jobs in the tube. ttr is the time the consumer of the job must Touch the task within or have it return to the ready state. If lz is true, the action will try and compress the payload trading some performance for memory usage on Aerospike. Note, if the payload is too small or the LZ4 compression is unable to provide a space saving, the entry may be inserted uncompressed anyway. Passing true is largely safe from a performance prespective unless you know that body is large and high entorpy. Cases where compression was attempted and abandoned in currently present jobs are reflected in Stats.SkippedSize.

Reserve & Delete

conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, body, ttl, err := tube.Reserve()
....
exists, err := tube.Delete(id)

Touch

err = tube.Touch(id)

If a job is Put in the tube with a ttr, the job must be touched at some period smaller than this value to keep it reserved. An error on Touch should be treated as an instruction to abandon the job because it has been deleted or has timed out.

Stats

conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
stats, err := tube.Stats()

Delete a tube

This is mainly for unit test clean up but there may be a legitimate usecase.

conn, _ := DialDefault()
conn.Delete("testtube")

Connection information

If DialDefault() is used, various reasonable defaults are set.

The client uses environment variables AEROSPIKE_HOST and AEROSPIKE_PORT to connect to an Aerospike node.

The library will attempt to parse AEROSPIKE_PORT either as a port number or a Docker environment variable as might be set if the the application is in a container that has been linked to an Aerospike conatiner with --link aerospike:aerospike.

If nothing is specified, the client assumes aerospike and 3000

The alternative Dial(id string, host string, port int) requires everything to be set explicitly.

Unit Tests

$ gpm
$ source gvp
$ go test

Benchmarks

$ go test --bench=.
...
BenchmarkPut	    	2000	    589034 ns/op
BenchmarkReserve	     200	  14759261 ns/op
BenchmarkRelease	    3000	    607290 ns/op

TODO

  • Fix TTL related implementations.
  • Messaging implementation for Reserve.
  • Possibly review stats. Review UDF lifecycle management.
  • Kick and Bury unit tests

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Constants

run admin operations at most 1/10 seconds.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
these are not used, review.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Bin names.
No description provided by the author
No description provided by the author
No description provided by the author
these are not used, review.
No description provided by the author
No description provided by the author
For the scan policy.
No description provided by the author
No description provided by the author
No description provided by the author
For bin AerospikeNameStatus.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
The current projection is ~2900 years if we start incrementing the counter in 100M steps per second, so the threshold around the value for 2000y should give us both a signal we are growing quickly and buying time to solve the problem.
No description provided by the author

# Variables

TODO consider replacing this variable with "Options" pattern.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
Job represents a Beanspike Job.
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

ErrorLogger defines error logging interface app can implement to catch errors.
JobDecoder defines interface app should implement for decoding incoming jobs into the app specific objects.
No description provided by the author

# Type aliases

The ErrorLoggerFunc type is an adapter to allow the use of ordinary functions as error logger.
No description provided by the author
No description provided by the author
No description provided by the author