package
1.17.16
Repository: https://github.com/golangid/candi.git
Documentation: pkg.go.dev

# README

Example

Create delivery handler

package workerhandler

import (
	"time"

	"github.com/golangid/candi/candishared"
	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
	"github.com/golangid/candi/codebase/factory/types"
)

// TaskQueueHandler struct
type TaskQueueHandler struct {
}

// NewTaskQueueHandler constructor
func NewTaskQueueHandler() *TaskQueueHandler {
	return &TaskQueueHandler{}
}

// MountHandlers return map topic to handler func
func (h *TaskQueueHandler) MountHandlers(group *types.WorkerHandlerGroup) {

	group.Add("task-one", h.taskOne)
	group.Add("task-two", h.taskTwo)
}

func (h *TaskQueueHandler) taskOne(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   10 * time.Second,
		Message: "Error",
	}
}

func (h *TaskQueueHandler) taskTwo(eventContext *candishared.EventContext) error {

	fmt.Printf("executing task '%s' has been %s retry, with message: %s\n",
		eventContext.HandlerRoute(),
		eventContext.Header()["retries"],
		eventContext.Message(),
	)

	return &taskqueueworker.ErrorRetrier{
		Delay:   3 * time.Second,
		Message: "Error",
	}
}

Register in module

package examplemodule

import (

	"example.service/internal/modules/examplemodule/delivery/workerhandler"

	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type Module struct {
	// ...another delivery handler
	workerHandlers map[types.Worker]interfaces.WorkerHandler
}

func NewModules(deps dependency.Dependency) *Module {
	return &Module{
		workerHandlers: map[types.Worker]interfaces.WorkerHandler{
			// ...another worker handler
			// ...
			types.TaskQueue: workerhandler.NewTaskQueueHandler(),
		},
	}
}

// ...another method

Add task in each usecase module

From internal service (same runtime)

package usecase

import (
	"context"
	"log"

	taskqueueworker "github.com/golangid/candi/codebase/app/task_queue_worker"
)

func someUsecase(ctx context.Context) {
	// add task queue for `{{task_name}}` with 5 retry
	if err := taskqueueworker.AddJob(ctx, "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 5, Args: []byte(`{{arguments/message}}`),
	}); err != nil {
		log.Println(err)
	}
}

Or if running on a separate server

  • Via GraphQL API:

POST {{task-queue-worker-host}}/graphql

mutation addJob {
  add_job(
	  param: {
		task_name: "{{task_name}}"
		max_retry: 5
		args: "{\"params\": \"test-one\"}"
	  }
  )
}
  • cURL:
curl --location --request POST '{{task-queue-worker-host}}/graphql' \
--header 'Content-Type: application/json' \
--data-raw '{
    "operationName": "addJob",
    "variables": {
        "param": {
            "task_name": "{{task_name}}",
            "max_retry": 1,
            "args": "{{arguments/message}}"
        }
    },
    "query": "mutation addJob($param: AddJobInputResolver!) {\n  add_job(param: $param)\n}\n"
}'
  • Direct call function:
// add task queue for `task-one` via HTTP request
jobID, err := taskqueueworker.AddJobViaHTTPRequest(context.Background(), "{{task-queue-worker-host}}", &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
if err != nil {
	log.Println(err)
}
fmt.Println("Queued job id is ", jobID)
  • Auto call to external worker via HTTP request using basic function AddJob(), please set configuration:
taskqueueworker.SetExternalWorkerHost({{task-queue-worker-host}})
jobID, err := taskqueueworker.AddJob(context.Background(), &taskqueueworker.AddJobRequest{
	TaskName: "{{task_name}}", MaxRetry: 1, Args: []byte(`{{arguments/message}}`),
})
fmt.Println("Queued job id is ", jobID)

# Functions

AddJob public function for add new job in same runtime.
AddJobViaHTTPRequest public function for add new job via http request.
AddJobWorkerHandler worker handler, bridging request from another worker for add job, default max retry is 5.
GetDetailJob api for get detail job by id.
NewInMemQueue init inmem queue.
NewInMemSummary constructor, store & read summary from in memory.
NewMongoPersistent create mongodb persistent.
NewNoopPersistent constructor.
NewRedisQueue init inmem queue.
NewSQLPersistent init new persistent SQL.
NewTaskQueueWorker create new task queue worker.
RecalculateSummary func.
RetryJob api for retry job by id.
SetAutoRemoveClientInterval option func.
SetDashboardBanner option func.
SetDashboardBasicAuth option func.
SetDashboardHTTPPort option func.
SetDebugMode option func.
SetExternalWorkerHost option func, setting worker host for add job, if not empty default using http request when add job.
SetLocker option func.
SetMaxClientSubscriber option func.
SetPersistent option func.
SetQueue option func.
SetSecondaryPersistent option func.
SetTLSConfig option func.
StopJob api for stop job by id.
StreamAllJob api func for stream fetch all job.
UpdateProgressJob api for update progress job.

# Constants

HeaderCurrentProgress const.
HeaderInterval const.
HeaderMaxProgress const.
HeaderMaxRetries const.
HeaderRetries const.
StatusFailure const.
StatusQueueing const.
StatusRetrying const.
StatusStopped const.
StatusSuccess const.
TaskOptionDeleteJobAfterSuccess const.

# Structs

AddJobInputResolver model.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
QueueStorage abstraction for queue storage backend.
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author