Categorygithub.com/gopy-art/gocelery
modulepackage
1.0.0
Repository: https://github.com/gopy-art/gocelery.git
Documentation: pkg.go.dev

# README

GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module GitHub go.mod Go version of a Go module

gocelery

Gocelery is a task queue implementation for Go modules used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept.

How it works?

this package has two side for does its work!
1 - controller
2 - worker

controller : it implements functions which work with broker and backend , and the responsebilty of them are to declare and insert data to the broker!

worker : it implements functions which work with broker and backend, and the responsebilty of them are to read from broker and set result to the backend!

Note : we have two concept in this package, broker and backend. broker is a system that we can share and read our data from that. backend is a system that we can set our results from package in that, for example the result of our workers!

Distributed Systems you can use

  1. RabbitMQ = RabbitMQ is an open source message agent software that implements the Advanced Message Queuing protocol.
    The files that impelement rabbitMQ celery in this package are :

    • amqp_backend.go
    • amqp_broker.go
    • amqp.go
  2. Redis = Redis is a very high-performance extensible key-value database management system written in C ANSI. It is part of the NoSQL movement and aims to provide the highest possible performance.
    The files that impelement redis celery in this package are :

    • redis_backend.go
    • redis_broker.go

How to use it?

if you want to use this package you can follow this steps :

  • clone this module
  • make gocelery folder in your project
  • copy and paste all of the files to the gocelery folder
  • run go mod tidy command in your terminal now enjoy!

Implement controller code example

you can write controller for redis and rabbitMQ, and as you read at the top, the controller will set and declare the messages and data to the broker and backend.
code example with RabbitMQ :

	// declare the url of the rabbitMQ
	rabbit_url := fmt.Sprintf("amqp://%s:%s@%s/", username, password, url)

	// make the backend and broker with rabbitMQ
	CeleryBackend := gocelery.NewAMQPCeleryBackend(rabbit_url)
	CeleryBroker := gocelery.NewAMQPCeleryBroker(rabbit_url, "test", true)

	// initialize celery client
	client, err := gocelery.NewCeleryClient(
		CeleryBroker,
		CeleryBackend,
		1, // number of worker for the client
	)
	if err != nil {
		log.Println("error in client , ", err)
	}
	////// Error handling

	// declare the task name for the message
	taskName := fmt.Sprintf("worker.%s", "test")

	for v := range 3 {
		_ = v
		// prepare the message for set to the queue
		msg := make(map[string]interface{})
		msg["a"] = rand.Intn(10)
		msg["b"] = rand.Intn(10)

		// send the message to the rabbitMQ
		_, err = client.DelayKwargs(taskName, msg)
		if err != nil {
			panic(err)
		}
		time.Sleep(1 * time.Second)
	}

	client.WaitForStopWorker()

code example with Redis :

    // create redis connection pool
	redisPool := &redis.Pool{
		MaxIdle:     3,                 // maximum number of idle connections in the pool
		MaxActive:   0,                 // maximum number of connections allocated by the pool at a given time
		IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}

	// initialize celery client
	client, err := gocelery.NewCeleryClient(
		gocelery.NewRedisBroker(redisPool),
		&gocelery.RedisCeleryBackend{Pool: redisPool},
		1, // number of workers
	)
	if err != nil {
		log.Println("error in client , ", err)
	}
	////// Error handling

	// declare the task name for the message
	taskName := fmt.Sprintf("worker.%s", "test")

	for v := range 10 {
		_ = v
		// prepare the message for set to the queue
		msg := make(map[string]interface{})
		msg["a"] = rand.Intn(10)
		msg["b"] = rand.Intn(10)

		// send the message to the rabbitMQ
		_, err = client.DelayKwargs(taskName, msg)
		if err != nil {
			panic(err)
		}
		time.Sleep(1 * time.Second)
	}

	client.WaitForStopWorker()

Implement worker code example

you can write worker for redis and rabbitMQ too, and as you know worker will read from redis or rabbitMQ and set the result to the redis or rabbitMQ.
code example with RabbitMQ :

	// exampleAddTask is integer addition task
	// with named arguments
	type ExampleAddTask struct {
		TaskID string
		a      int
		b      int
	}

	// this function is for reading the argument that has been passed to the message from 'Kwargs'
	func (a *ExampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
		kwargA, ok := kwargs["a"]
		if !ok {
			return fmt.Errorf("undefined kwarg a")
		}
		kwargAFloat, ok := kwargA.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg a")
		}
		a.a = int(kwargAFloat)
		kwargB, ok := kwargs["b"]
		if !ok {
			return fmt.Errorf("undefined kwarg b")
		}
		kwargBFloat, ok := kwargB.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg b")
		}
		a.b = int(kwargBFloat)
		return nil
	}

	func (a *ExampleAddTask) ParseId(id string) error {
		a.TaskID = id
		return nil
	}

	// The main function that will be execute
	func (a *ExampleAddTask) RunTask() (interface{}, error) {
		result := a.a + a.b
		fmt.Printf("Task with uuid %v has result %v \n", a.TaskID, result)
		return result, nil
	}

	// declare the url of the rabbitMQ
	rabbit_url := fmt.Sprintf("amqp://%s:%s@%s/", "guest", "guest", "localhost:5672")

	// make the backend and broker with rabbitMQ
	CeleryBackend := gocelery.NewAMQPCeleryBackend(rabbit_url)
	CeleryBroker := gocelery.NewAMQPCeleryBroker(rabbit_url, "test", true)

	// initialize celery client
	worker := gocelery.NewCeleryWorker(
		CeleryBroker,
		CeleryBackend,
		3, // number of worker for the client
	)

	ch := make(chan int)
	// register task
	worker.Register("worker.test", &ExampleAddTask{})

	// start workers (non-blocking call)
	worker.StartWorker()

	// wait for client request
	<-ch

	// stop workers gracefully (blocking call)
	worker.StopWorker()

code example with Redis :

	// exampleAddTask is integer addition task
	// with named arguments
	type ExampleAddTask struct {
		TaskID string
		a      int
		b      int
	}

	// this function is for reading the argument that has been passed to the message from 'Kwargs'
	func (a *ExampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
		kwargA, ok := kwargs["a"]
		if !ok {
			return fmt.Errorf("undefined kwarg a")
		}
		kwargAFloat, ok := kwargA.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg a")
		}
		a.a = int(kwargAFloat)
		kwargB, ok := kwargs["b"]
		if !ok {
			return fmt.Errorf("undefined kwarg b")
		}
		kwargBFloat, ok := kwargB.(float64)
		if !ok {
			return fmt.Errorf("malformed kwarg b")
		}
		a.b = int(kwargBFloat)
		return nil
	}

	func (a *ExampleAddTask) ParseId(id string) error {
		a.TaskID = id
		return nil
	}

	// The main function that will be execute
	func (a *ExampleAddTask) RunTask() (interface{}, error) {
		result := a.a + a.b
		fmt.Printf("Task with uuid %v has result %v \n", a.TaskID, result)
		return result, nil
	}

	// create redis connection pool
	redisPool := &redis.Pool{
		MaxIdle:     3,                 // maximum number of idle connections in the pool
		MaxActive:   0,                 // maximum number of connections allocated by the pool at a given time
		IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}

	// initialize celery client
	worker := gocelery.NewCeleryWorker(
		gocelery.NewRedisBroker(redisPool),
		&gocelery.RedisCeleryBackend{Pool: redisPool},
		6, // number of workers
	)

	////// Error handling

	ch := make(chan int)
	// register task
	worker.Register("worker.test", &ExampleAddTask{})

	// start workers (non-blocking call)
	worker.StartWorker()

	// wait for client request
	<-ch

	// stop workers gracefully (blocking call)
	worker.StopWorker()

NOTE : you can also use both distributed systems (redis, rabbitMQ) at the same time. for example you can use rabbitMQ for broker and redis for backend!

Structure of messages in broker

the message type in broker is json and its in this structure :

	// TaskMessage is celery-compatible message
	type TaskMessage struct {
		ID      string                 `json:"id"`
		Task    string                 `json:"task"`
		Args    []interface{}          `json:"args"`
		Kwargs  map[string]interface{} `json:"kwargs"`
		Retries int                    `json:"retries"`
		ETA     *string                `json:"eta"`
		Expires *time.Time             `json:"expires"`
	}

Note : Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

I hope you can enjoy using this package!

# Packages

No description provided by the author

# Functions

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object.
GetRealValue returns real value of reflect.Value Required for JSON Marshalling.
NewAMQPCeleryBackend creates new AMQPCeleryBackend.
NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel.
NewAMQPCeleryBroker creates new AMQPCeleryBroker.
NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel.
NewAMQPConnection creates new AMQP channel.
NewAMQPExchange creates new AMQPExchange.
NewAMQPQueue creates new AMQPQueue.
NewCeleryClient creates new celery client.
NewCeleryWorker returns new celery worker.
NewRedisBackend creates new RedisCeleryBackend with given redis pool.
NewRedisBroker creates new RedisCeleryBroker with given redis connection pool.
NewRedisCeleryBackend creates new RedisCeleryBackend Deprecated: NewRedisCeleryBackend exists for historical compatibility and should not be used.
NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri Deprecated: NewRedisCeleryBroker exists for historical compatibility and should not be used.
NewRedisPool creates pool of redis connections from given connection string Deprecated: newRedisPool exists for historical compatibility and should not be used.
SimpleConnectionRabbitMQ is for publish one message to the specefic queue.

# Variables

No description provided by the author

# Structs

AMQPCeleryBackend CeleryBackend for AMQP.
AMQPCeleryBroker is RedisBroker for AMQP.
AMQPExchange stores AMQP Exchange configuration.
AMQPQueue stores AMQP Queue configuration.
AsyncResult represents pending result.
CeleryClient provides API for sending celery tasks.
CeleryDeliveryInfo represents deliveryinfo json.
CeleryMessage is actual message to be sent to Redis.
CeleryProperties represents properties json.
CeleryWorker represents distributed task worker.
RedisCeleryBackend is celery backend for redis.
RedisCeleryBroker is celery broker for redis.
ResultMessage is return message received from broker.
TaskMessage is celery-compatible message.

# Interfaces

CeleryBackend is interface for celery backend database.
CeleryBroker is interface for celery broker database.
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain.