Categorygithub.com/dammitjim/celeriac.v1
modulepackage
0.1.0
Repository: https://github.com/dammitjim/celeriac.v1.git
Documentation: pkg.go.dev

# README

Celeriac

Golang client library for adding support for interacting and monitoring Celery workers and tasks.

It provides functionality to place tasks on the task queue, as well as monitor both task and worker events.

Dependencies

This library depends upon the following packages:

  • github.com/streadway/amqp
  • github.com/sirupsen/logrus
  • github.com/nu7hatch/gouuid
  • github.com/mailru/easyjson

Install easyjson

$ go get -u github.com/mailru/easyjson/...

Usage

Installation: go get github.com/svcavallar/celeriac.v1

This imports a new namespace called celeriac

package main

import (
	"log"
	"os"

	"github.com/svcavallar/celeriac.v1"
)

func main() {
	taskBrokerURI := "amqp://user:pass@localhost:5672/vhost"

	// Connect to RabbitMQ task queue
	TaskQueueMgr, err := celeriac.NewTaskQueueMgr(taskBrokerURI)
	if err != nil {
		log.Printf("Failed to connect to task queue: %v", err)
		os.Exit(-1)
	}

	log.Printf("Service connected to task queue - (URL: %s)", taskBrokerURI)

	// Go routine to monitor the Celery events emitted on the celeriac events channel
	go func() {
        for {
            select {
            default:
                ev := <-TaskQueueMgr.Monitor.EventsChannel

                if ev != nil {

                    if x, ok := ev.(*celeriac.WorkerEvent); ok {
                        log.Printf("Celery Event Channel: Worker event - %s [Hostname]: %s", x.Type, x.Hostname)
                    } else if x, ok := ev.(*celeriac.TaskEvent); ok {
                        log.Printf("Celery Event Channel: Task event - %s [ID]: %s", x.Type, x.UUID)
                    } else if x, ok := ev.(*celeriac.Event); ok {
                        log.Printf("Celery Event Channel: General event - %s [Hostname]: %s - [Data]: %v", x.Type, x.Hostname, x.Data)
                    } else {
                        log.Printf("Celery Event Channel: Unhandled event: %v", ev)
                    }
                }
            }
        }
	}()
}

Dispatching Tasks

By Name

This will create and dispatch a task incorporating the supplied data. The task will automatically be allocated and identified by a UUID returned in the task object. The UUID is represented in the form of "6ba7b810-9dad-11d1-80b4-00c04fd430c8".

// Dispatch a new task
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTask(taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}

By ID & Name

This will create and dispatch a task incorporating the supplied data, and identified by the user-supplied task identifier.

// Dispatch a new task
taskID := "my_task_id_123456789"
taskName := "root.test.task"
taskData := map[string]interface{}{
	"foo": "bar"
}
routingKey := "root.test"

task, err := TaskQueueMgr.DispatchTaskWithID(taskID, taskName, taskData, routingKey)
if err != nil {
	log.Errorf("Failed to dispatch task to queue: %v", err)
}

Modifying task_event.go

If you modify the properties of any of the structs in task_event.go you will need to re-generate the easyjson version of this file. This is easily achieved by issuing the following command:

$ easyjson -all task_event.go

Processing Redis Backend Result Automatically

If you are using a Redis backend for storing results you can easily process new/updated entries by subscribing to Redis keyspace events. This will save polling for results, and is made convenient to integrate by using my golang helper package go-redis-event-sink, available at the repo https://github.com/svcavallar/go-redis-event-sink

An example test on how to use are provided within the repository. Essentially, for Celery, just provide it with the celery task naming mask patten to watch: celery-task-meta-*

# Functions

Fail logs the error and exits the program Only use this to handle critical errors.
Log only logs the error but doesn't exit the program Use this to log errors that should not exit the program.
NewEvent is a factory function to create a new Event object */.
NewPingCmd creates a new command for pinging workers.
NewRateLimitTaskCmd creates a new command for rate limiting a task taskName: Name of task to change rate limit for rateLimit: The rate limit as tasks per second, or a rate limit string (`"100/m"`, etc.
NewRevokeTaskCmd creates a new command for revoking a task by given id If a task is revoked, the workers will ignore the task and not execute it after all.
NewTask is a factory function that creates and returns a pointer to a new task object */.
NewTaskEvent is a factory function to create a new TaskEvent object */.
NewTaskMonitor is a factory function that creates a new Celery consumer */.
NewTaskQueueMgr is a factory function that creates a new instance of the TaskQueueMgr */.
NewTaskWithID is a factory function that creates and returns a pointer to a new task object, allowing caller to specify the task ID.
NewTimeLimitTaskCmd creates a new command for rate limiting a task taskName: Name of task to change rate limit for hardLimit: New hard time limit (in seconds) softLimit: New soft time limit (in seconds) */.
NewWorkerEvent is a factory function to create a new WorkerEvent object */.

# Constants

ConstEventsMonitorBindingKey is the binding key for the events monitor.
ConstEventsMonitorConsumerTag is the consumer tag name for the events monitor.
ConstEventsMonitorExchangeName is the exchange name used for Celery events.
ConstEventsMonitorExchangeType is the exchange type for the events monitor.
ConstEventsMonitorQueueName is the queue name of the events monitor.
ConstEventTypeTaskFailed is the event type when a Celery worker fails to complete a task.
ConstEventTypeTaskReceived is the event type when a Celery worker receives a task.
ConstEventTypeTaskRetried is the event type when a Celery worker retries a task.
ConstEventTypeTaskRevoked is the event type when a Celery worker has its task revoked.
ConstEventTypeTaskSent is the event type when a Celery task is sent.
ConstEventTypeTaskStarted is the event type when a Celery worker starts a task.
ConstEventTypeTaskSucceeded is the event type when a Celery worker completes a task.
ConstEventTypeWorkerHeartbeat is the event type when a Celery worker is online and "alive".
ConstEventTypeWorkerOffline is the event type when a Celery worker goes offline.
ConstEventTypeWorkerOnline is the event type when a Celery worker comes online.
ConstPublishTaskContentEncoding is the content encoding type of the task data to be published.
ConstPublishTaskContentType is the content type of the task data to be published.
ConstTaskControlExchangeName is the exchange name for dispatching task control commands.
ConstTaskDefaultExchangeName is the default exchange name to use when publishing a task.
ConstTaskDefaultRoutingKey is the default routing key to use when publishing a task.
ConstTimeFormat is the general format for all timestamps.

# Variables

ErrInvalidTaskID is raised when an invalid task ID has been detected.
ErrInvalidTaskName is raised when an invalid task name has been detected.

# Structs

Event defines a base event emitted by Celery workers.
PingCmd is a wrapper to a command.
RateLimitTaskCmd is a wrapper to a command.
RevokeTaskCmd is a wrapper to a command.
Task is a representation of a Celery task */.
TaskEvent is the JSON schema for Celery task events */.
TaskMonitor is a Celery task event consumer */.
TaskQueueMgr defines a manager for interacting with a Celery task queue */.
TimeLimitTaskCmd is a wrapper to a command.
WorkerEvent defines an event emitted by workers, specific to its operation.

# Type aliases

TaskEventsList is an array of task events */.