Categorygithub.com/kaptinlin/queue
modulepackage
0.4.9
Repository: https://github.com/kaptinlin/queue.git
Documentation: pkg.go.dev

# README

Golang Queue Processing Library

This library offers a robust and flexible solution for managing and processing queued jobs in Go applications. Built on top of the Asynq task processing library, which uses Redis for storage, it provides advanced features like custom error handling, retries, priority queues, rate limiting, and job retention. Whether you're building a simple task runner or a complex distributed system, this library is designed to meet your needs with efficiency and ease. It also supports the setup of multiple workers across different machines, allowing for scalable and distributed job processing.

Getting Started

Installation

Ensure your Go environment is ready (requires Go version 1.21.4 or higher), then install the library:

go get -u github.com/kaptinlin/queue

Configuring Redis

Set up your Redis connection with minimal hassle:

import "github.com/kaptinlin/queue"

redisConfig := queue.NewRedisConfig(
    queue.WithRedisAddress("localhost:6379"),
    queue.WithRedisDB(0),
    queue.WithRedisPassword("your_password"),
)

Client Initialization

Create a client using the Redis configuration:

client, err := queue.NewClient(redisConfig)
if err != nil {
    log.Fatalf("Error initializing client: %v", err)
}

Job Enqueueing

Enqueue jobs by specifying their type and a structured payload for clear and concise data handling:

type EmailPayload struct {
    Email   string `json:"email"`
    Content string `json:"content"`
}

jobType := "email:send"
payload := EmailPayload{Email: "[email protected]", Content: "Welcome to our service!"}

_, err = client.Enqueue(jobType, payload, queue.WithDelay(5*time.Second))
if err != nil {
    log.Printf("Failed to enqueue job: %v", err)
}

Alternatively, for direct control over job configuration, use a Job instance:

job := queue.NewJob(jobType, payload, queue.WithDelay(5*time.Second))
if _, err := client.EnqueueJob(job); err != nil {
    log.Printf("Failed to enqueue job: %v", err)
}

This approach allows you to specify additional job options such as execution delay, directly within the Job object.

Handling Jobs

Define a function to process jobs of a specific type. Utilize the EmailPayload struct for type-safe payload handling:

func handleEmailSendJob(ctx context.Context, job *queue.Job) error {
    var payload EmailPayload
    if err := job.DecodePayload(&payload); err != nil {
        return fmt.Errorf("failed to decode payload: %w", err)
    }

    log.Printf("Sending email to: %s with content: %s", payload.Email, payload.Content)
    // Implement the email sending logic here.
    return nil
}

To achieve scalable and distributed job processing, you can register your function and start workers on different machines. Each worker independently processes jobs enqueued by the client:

worker, err := queue.NewWorker(redisConfig, queue.WithWorkerQueue("default", 1))
if err != nil {
    log.Fatalf("Error creating worker: %v", err)
}

err = worker.Register("email:send", handleEmailSendJob)
if err != nil {
    log.Fatalf("Failed to register handler: %v", err)
}

if err := worker.Start(); err != nil {
    log.Fatalf("Failed to start worker: %v", err)
}

Graceful Shutdown

Ensure a clean shutdown process:

func main() {
    // Initialization...

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c

    if err := client.Stop(); err != nil {
        log.Fatalf("Failed to stop client: %v", err)
    }
    worker.Stop()
}

Advanced Features

Learn more about the library's advanced features by exploring our documentation on:

Contributing

We welcome contributions! Please submit issues or pull requests on GitHub.

License

This library is licensed under the MIT License.

Credits

Special thanks to the creators of neoq and Asynq for inspiring this library.

# Packages

No description provided by the author
No description provided by the author

# Functions

DefaultRedisConfig returns a RedisConfig initialized with default values.
IsErrRateLimit checks if the provided error is or wraps an ErrRateLimit error.
IsValidJobState checks if the provided job state is valid and supported.
NewClient initializes a new Client with specified Redis configuration and client options.
NewErrRateLimit constructs a new ErrRateLimit with a specified retry delay.
NewHandler initializes a new Handler with the provided job type, processing function, and options.
NewJob initializes a new Job with the provided type, payload, and configuration options.
No description provided by the author
NewManager creates a new instance of Manager.
NewMemoryConfigProvider initializes a new instance of MemoryConfigProvider.
NewRedisConfig creates a new RedisConfig with the given options applied.
NewScheduler creates a new Scheduler instance with the provided Redis configuration and options.
NewSkipRetryError creates and wraps a SkipRetry error with a custom message.
NewWorker creates and returns a new Worker based on the given Redis configuration and WorkerConfig options.
WithClientErrorHandler sets a custom error handler for the client.
WithClientRetention sets a default retention duration for jobs.
WithConfigProvider sets a custom config provider for the Scheduler.
No description provided by the author
No description provided by the author
WithJobQueue specifies the queue that the handler will process jobs from.
WithJobTimeout sets a timeout for job processing, terminating the job if it exceeds this duration.
No description provided by the author
WithMiddleware returns a HandlerOption that appends provided middlewares to the handler and recomposes the middleware chain.
WithPostEnqueueFunc sets a function to be called after enqueuing a job.
WithPreEnqueueFunc sets a function to be called before enqueuing a job.
No description provided by the author
WithRateLimiter configures a rate limiter for the handler to control the rate of job processing.
WithRedisAddress sets the Redis server address.
WithRedisDB sets the Redis database number.
WithRedisDialTimeout sets the timeout for connecting to Redis.
WithRedisPassword sets the password for Redis authentication.
WithRedisPoolSize sets the size of the connection pool for Redis.
WithRedisReadTimeout sets the timeout for reading from Redis.
WithRedisTLSConfig sets the TLS configuration for the Redis connection.
WithRedisUsername sets the username for Redis authentication.
WithRedisWriteTimeout sets the timeout for writing to Redis.
No description provided by the author
WithRetryDelayFunc sets a custom function to determine the delay before retrying a failed job.
No description provided by the author
WithSchedulerLocation sets the time location for the Scheduler.
WithSchedulerLogger sets a custom logger for the Scheduler.
WithSyncInterval sets the sync interval for the Scheduler's task manager.
WithWorkerConcurrency sets the number of concurrent workers.
WithWorkerErrorHandler configures the error handler for the worker.
WithWorkerLogger configures a custom logger for the worker.
WithWorkerQueue adds a queue with the specified priority to the worker configuration.
WithWorkerQueues sets the queue names and their priorities for the worker.
WithWorkerRateLimiter configures a global rate limiter for the worker.
WithWorkerStopTimeout configures the stop timeout for the worker.

# Constants

No description provided by the author
StateActive represents jobs that are currently being processed.
StateAggregating represents jobs that are part of a batch or group waiting to be processed together.
StateArchived represents jobs that have been moved to the archive.
StateCompleted represents jobs that have been completed successfully.
StatePending represents jobs that are waiting to be processed.
StateRetry represents jobs that will be retried after a failure.
StateScheduled represents jobs that are scheduled to be run in the future.

# Variables

DefaultQueues defines default queue names and their priorities.
No description provided by the author
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
No description provided by the author
Define package-level error variables with descriptive names.
No description provided by the author
No description provided by the author
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
No description provided by the author
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
No description provided by the author
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
Define package-level error variables with descriptive names.
ErrSkipRetry indicates a specific Asynq framework condition to skip retries and move the job to the archive.
Define package-level error variables with descriptive names.
No description provided by the author
Define package-level error variables with descriptive names.
No description provided by the author

# Structs

ActiveJobInfo wraps detailed information about a job currently being processed by a worker.
Client encapsulates an Asynq client instance with custom error handling and job retention settings.
ClientConfig defines the configuration options for the Client.
DefaultClientErrorHandler logs errors encountered during job enqueue operations.
DefaultWorkerErrorHandler is a default implementation of WorkerErrorHandler that logs errors.
ErrRateLimit defines a custom error type for rate limiting scenarios.
Group represents a collection of handlers with specific middleware applied.
Handler encapsulates the configuration and logic for processing jobs.
Job represents a task that will be executed by a worker.
No description provided by the author
JobInfo includes detailed information for a job, mirroring relevant parts of asynq's TaskInfo and WorkerInfo for active jobs.
JobOptions encapsulates settings that control job execution.
Manager provides an implementation for the ManagerInterface.
MemoryConfigProvider stores and provides job configurations for periodic execution.
QueueDailyStats includes detailed daily statistics for a queue.
QueueInfo includes detailed queue information.
QueueLocation contains information about the location of queues in a Redis cluster.
RedisConfig holds the configuration for the Redis connection.
RedisInfo contains detailed information about the Redis instance or cluster.
Scheduler manages job scheduling with Asynq.
SchedulerOptions contains options for the Scheduler.
Worker represents a worker that processes tasks using the asynq package.
WorkerConfig holds configuration parameters for a worker, including concurrency, queue priorities, and error handling.
WorkerInfo wraps detailed information about an Asynq server, which we treat as a "worker.".

# Interfaces

ClientErrorHandler provides an interface for handling enqueue errors.
No description provided by the author
Logger supports logging at various log levels.
ManagerInterface defines operations for managing and retrieving information about workers and their jobs.
WorkerErrorHandler defines an interface for handling errors that occur during job processing.

# Type aliases

ClientOption defines a function signature for configuring the Client.
HandlerFunc defines the signature for job processing functions.
HandlerOption defines the signature for configuring a Handler.
JobOption defines a function signature for job configuration options.
JobState represents the state of a job in the queue.
No description provided by the author
RedisOption defines a function signature for configuring RedisConfig.
SchedulerOption defines a function signature for configuring the Scheduler.
WorkerOption defines a function signature for configuring a Worker.