Categorygithub.com/marselester/gopher-celery
modulepackage
1.0.0
Repository: https://github.com/marselester/gopher-celery.git
Documentation: pkg.go.dev

# README

Gopher Celery 🥬

Documentation Go Report Card

The objective of this project is to provide the very basic mechanism to efficiently produce and consume Celery tasks on Go side. Therefore there are no plans to support all the rich features the Python version provides, such as tasks chains, etc. Even task result backend has no practical value in the context of Gopher Celery, so it wasn't taken into account. Note, Celery has no result backend enabled by default (it incurs overhead).

Typically one would want to use Gopher Celery when certain tasks on Python side take too long to complete or there is a big volume of tasks requiring lots of Python workers (expensive infrastructure).

This project offers a little bit more convenient API of https://github.com/gocelery/gocelery including support for Celery protocol v2.

Usage

The Celery app can be used as either a producer or consumer (worker). To send tasks to a queue for a worker to consume, use Delay method. In order to process a task you should register it using Register method.

def mytask(a, b):
    print(a + b)

For example, whenever a task mytask is popped from important queue, the Go function is executed with args and kwargs obtained from the task message. By default Redis broker (localhost) is used with json task message serialization.

app := celery.NewApp()
app.Register(
	"myproject.apps.myapp.tasks.mytask",
	"important",
	func(ctx context.Context, p *celery.TaskParam) error {
		p.NameArgs("a", "b")
		// Methods prefixed with Must panic if they can't find an argument name
		// or can't cast it to the corresponding type.
		// The panic doesn't affect other tasks execution; it's logged.
		fmt.Println(p.MustInt("a") + p.MustInt("b"))
		// Non-nil errors are logged.
		return nil
	},
)
if err := app.Run(context.Background()); err != nil {
	log.Printf("celery worker error: %v", err)
}

Here is an example of sending mytask task to important queue with a=2, b=3 arguments. If a task is processed on Python side, you don't need to register the task or run the app.

app := celery.NewApp()
err := app.Delay(
	"myproject.apps.myapp.tasks.mytask",
	"important",
	2,
	3,
)
if err != nil {
	log.Printf("failed to send mytask: %v", err)
}

More examples can be found in the examples dir. Note, you'll need a Redis server to run them.

$ redis-server
$ cd ./examples/
Sending tasks from Go and receiving them on Python side.
$ go run ./producer/
{"err":null,"msg":"task was sent using protocol v2"}
{"err":null,"msg":"task was sent using protocol v1"}
$ celery --app myproject worker --queues important --loglevel=debug --without-heartbeat --without-mingle
...
[... WARNING/ForkPoolWorker-1] received a=fizz b=bazz
[... WARNING/ForkPoolWorker-8] received a=fizz b=bazz
Sending tasks from Python and receiving them on Go side.
$ python producer.py
$ go run ./consumer/
{"msg":"waiting for tasks..."}
received a=fizz b=bazz
received a=fizz b=bazz

Most likely your Redis server won't be running on localhost when the service is deployed, so you would need to pass a connection pool to the broker.

Redis connection pool.
$ go run ./producer/
{"err":null,"msg":"task was sent using protocol v2"}
{"err":null,"msg":"task was sent using protocol v1"}
$ go run ./redis/
Prometheus task metrics.
$ go run ./producer/
$ go run ./metrics/
$ curl http://0.0.0.0:8080/metrics
# HELP task_duration_seconds How long it took in seconds to process a task.
# TYPE task_duration_seconds histogram
task_duration_seconds_bucket{task="myproject.mytask",le="0.016"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.032"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.064"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.128"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.256"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="0.512"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="1.024"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="2.048"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="4.096"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="8.192"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="16.384"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="32.768"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="60"} 2
task_duration_seconds_bucket{task="myproject.mytask",le="+Inf"} 2
task_duration_seconds_sum{task="myproject.mytask"} 7.2802e-05
task_duration_seconds_count{task="myproject.mytask"} 2
# HELP tasks_total How many Celery tasks processed, partitioned by task name and error.
# TYPE tasks_total counter
tasks_total{error="false",task="myproject.mytask"} 2

Although there is no built-in support for task retries (publishing a task back to Redis), you can still retry the operation within the same goroutine.

Task retries.
$ go run ./retry/
...
{"attempt":1,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:23.401191Z"}
{"attempt":2,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:28.337204Z"}
{"attempt":3,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:37.279873Z"}

Testing

Tests require a Redis server running locally.

$ go test -v -count=1 ./...

Benchmarks help to spot performance changes as the project evolves and also compare performance of serializers. For example, based on the results below the protocol v2 is faster than v1 when encoding args:

  • 350 nanoseconds mean time, 3 allocations (248 bytes) with 0% variation across the samples
  • 1.21 microseconds mean time, 4 allocations (672 bytes) with 0% variation across the samples

It is recommended to run benchmarks multiple times and check how stable they are using Benchstat tool.

$ go test -bench=. -benchmem -count=10 ./internal/... | tee bench-new.txt
$ benchstat bench-old.txt
name                                  time/op
JSONSerializerEncode_v2NoParams-12    2.97ns ± 1%
JSONSerializerEncode_v2Args-12         350ns ± 0%
JSONSerializerEncode_v2Kwargs-12       582ns ± 0%
JSONSerializerEncode_v2ArgsKwargs-12   788ns ± 1%
JSONSerializerEncode_v1NoParams-12    1.12µs ± 1%
JSONSerializerEncode_v1Args-12        1.21µs ± 0%
JSONSerializerEncode_v1Kwargs-12      1.68µs ± 0%
JSONSerializerEncode_v1ArgsKwargs-12  1.77µs ± 0%

name                                  alloc/op
JSONSerializerEncode_v2NoParams-12     0.00B
JSONSerializerEncode_v2Args-12          248B ± 0%
JSONSerializerEncode_v2Kwargs-12        472B ± 0%
JSONSerializerEncode_v2ArgsKwargs-12    528B ± 0%
JSONSerializerEncode_v1NoParams-12      672B ± 0%
JSONSerializerEncode_v1Args-12          672B ± 0%
JSONSerializerEncode_v1Kwargs-12      1.00kB ± 0%
JSONSerializerEncode_v1ArgsKwargs-12  1.00kB ± 0%

name                                  allocs/op
JSONSerializerEncode_v2NoParams-12      0.00
JSONSerializerEncode_v2Args-12          3.00 ± 0%
JSONSerializerEncode_v2Kwargs-12        7.00 ± 0%
JSONSerializerEncode_v2ArgsKwargs-12    8.00 ± 0%
JSONSerializerEncode_v1NoParams-12      4.00 ± 0%
JSONSerializerEncode_v1Args-12          4.00 ± 0%
JSONSerializerEncode_v1Kwargs-12        10.0 ± 0%
JSONSerializerEncode_v1ArgsKwargs-12    10.0 ± 0%

The old and new stats are compared as follows.

$ benchstat bench-old.txt bench-new.txt

# Packages

Package goredis implements a Celery broker using Redis and https://github.com/redis/go-redis.
Package protocol provides means to encode/decode task messages as described in https://github.com/celery/celery/blob/master/docs/internals/protocol.rst.
Package redis implements a Celery broker using Redis and github.com/gomodule/redigo.

# Functions

NewApp creates a Celery app.
NewTaskParam returns a task param which facilitates access to args and kwargs.
WithBroker allows a caller to replace the default broker.
WithCustomTaskSerializer registers a custom serializer where mime is the mime-type describing the serialized structure, e.g., application/json, and encoding is the content encoding which is usually utf-8 or binary.
WithLogger sets a structured logger.
WithMaxWorkers sets an upper limit of goroutines allowed to process Celery tasks.
WithMiddlewares sets a chain of task middlewares.
WithTaskProtocol sets the default task message protocol version used to send tasks.
WithTaskSerializer sets a serializer mime-type, e.g., the message's body is encoded in JSON when a task is sent to the broker.

# Constants

ContextKeyTaskName is a context key to access task names.
DefaultMaxWorkers is the default upper limit of goroutines allowed to process Celery tasks.

# Structs

App is a Celery app to produce or consume tasks asynchronously.
AsyncParam represents parameters for sending a task message.
Config represents Celery settings.
TaskParam provides access to task's positional and keyword arguments.

# Interfaces

Broker is responsible for receiving and sending task messages.

# Type aliases

Middleware is a chainable behavior modifier for tasks.
Option sets up a Config.
TaskF represents a Celery task implemented by the client.