Categorygithub.com/PerformLine/gocelery
modulepackage
2.0.0-alpha+incompatible
Repository: https://github.com/performline/gocelery.git
Documentation: pkg.go.dev

# README

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status Coverage Status Go Report Card GoDoc License FOSSA Status

Why?

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications. As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

Example

GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// initialize celery client
cli, _ := NewCeleryClient(
	NewRedisCeleryBroker("redis://"),
	NewRedisCeleryBackend("redis://"),
	5, // number of workers
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)
cli.StartWorker(ctx, TIMEOUT)

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()

Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery Client Example

Submit Task from Go Client

func main() {
    // initialize celery client
	cli, _ := NewCeleryClient(
		NewRedisCeleryBroker("redis://"),
		NewRedisCeleryBackend("redis://"),
		1,
	)

	// prepare arguments
	taskName := "worker.add"
	argA := rand.Intn(10)
	argB := rand.Intn(10)

	// run task
	asyncResult, err := cli.Delay(taskName, argA, argB)
	if err != nil {
		panic(err)
	}

	// get results from backend with timeout
	res, err := asyncResult.Get(ctx, 10 * time.Second)
	if err != nil {
		panic(err)
	}

	log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
}

Sample Celery Task Message

Celery Message Protocol Version 1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Projects

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

# Packages

No description provided by the author
Package gocelery is Celery Distributed Task Queue in Go Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

# Functions

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object.
GetRealValue returns real value of reflect.Value Required for JSON Marshalling.
NewAMQPCeleryBackend creates new AMQPCeleryBackend.
NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel.
NewAMQPCeleryBroker creates new AMQPCeleryBroker.
NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel.
NewAMQPConnection creates new AMQP channel.
NewAMQPExchange creates new AMQPExchange.
NewAMQPQueue creates new AMQPQueue.
NewCeleryClient creates new celery client.
NewCeleryWorker returns new celery worker.
NewRedisCeleryBackend creates new RedisCeleryBackend.
NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri.
NewRedisClient creates a redis connection from given connection string.

# Structs

AMQPCeleryBackend CeleryBackend for AMQP.
AMQPCeleryBroker is RedisBroker for AMQP.
AMQPExchange stores AMQP Exchange configuration.
AMQPQueue stores AMQP Queue configuration.
AsyncResult represents pending result.
CeleryClient provides API for sending celery tasks.
CeleryDeliveryInfo represents deliveryinfo json.
CeleryMessage is actual message to be sent to Redis.
CeleryProperties represents properties json.
CeleryWorker represents distributed task worker.
RedisCeleryBackend is celery backend for redis.
RedisCeleryBroker is celery broker for redis.
ResultMessage is return message received from broker.
TaskMessage is celery-compatible message.

# Interfaces

CeleryBackend is interface for celery backend database.
CeleryBroker is interface for celery broker database.
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain.