# README
Golang Queue Processing Library
This library offers a robust and flexible solution for managing and processing queued jobs in Go applications. Built on top of the Asynq task processing library, which uses Redis for storage, it provides advanced features like custom error handling, retries, priority queues, rate limiting, and job retention. Whether you're building a simple task runner or a complex distributed system, this library is designed to meet your needs with efficiency and ease. It also supports the setup of multiple workers across different machines, allowing for scalable and distributed job processing.
Getting Started
Installation
Ensure your Go environment is ready (requires Go version 1.21.4 or higher), then install the library:
go get -u github.com/kaptinlin/queue
Configuring Redis
Set up your Redis connection with minimal hassle:
import "github.com/kaptinlin/queue"
redisConfig := queue.NewRedisConfig(
queue.WithRedisAddress("localhost:6379"),
queue.WithRedisDB(0),
queue.WithRedisPassword("your_password"),
)
Client Initialization
Create a client using the Redis configuration:
client, err := queue.NewClient(redisConfig)
if err != nil {
log.Fatalf("Error initializing client: %v", err)
}
Job Enqueueing
Enqueue jobs by specifying their type and a structured payload for clear and concise data handling:
type EmailPayload struct {
Email string `json:"email"`
Content string `json:"content"`
}
jobType := "email:send"
payload := EmailPayload{Email: "[email protected]", Content: "Welcome to our service!"}
_, err = client.Enqueue(jobType, payload, queue.WithDelay(5*time.Second))
if err != nil {
log.Printf("Failed to enqueue job: %v", err)
}
Alternatively, for direct control over job configuration, use a Job
instance:
job := queue.NewJob(jobType, payload, queue.WithDelay(5*time.Second))
if _, err := client.EnqueueJob(job); err != nil {
log.Printf("Failed to enqueue job: %v", err)
}
This approach allows you to specify additional job options such as execution delay, directly within the Job
object.
Handling Jobs
Define a function to process jobs of a specific type. Utilize the EmailPayload
struct for type-safe payload handling:
func handleEmailSendJob(ctx context.Context, job *queue.Job) error {
var payload EmailPayload
if err := job.DecodePayload(&payload); err != nil {
return fmt.Errorf("failed to decode payload: %w", err)
}
log.Printf("Sending email to: %s with content: %s", payload.Email, payload.Content)
// Implement the email sending logic here.
return nil
}
To achieve scalable and distributed job processing, you can register your function and start workers on different machines. Each worker independently processes jobs enqueued by the client:
worker, err := queue.NewWorker(redisConfig, queue.WithWorkerQueue("default", 1))
if err != nil {
log.Fatalf("Error creating worker: %v", err)
}
err = worker.Register("email:send", handleEmailSendJob)
if err != nil {
log.Fatalf("Failed to register handler: %v", err)
}
if err := worker.Start(); err != nil {
log.Fatalf("Failed to start worker: %v", err)
}
Graceful Shutdown
Ensure a clean shutdown process:
func main() {
// Initialization...
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
if err := client.Stop(); err != nil {
log.Fatalf("Failed to stop client: %v", err)
}
worker.Stop()
}
Advanced Features
Learn more about the library's advanced features by exploring our documentation on:
- Priority Queues
- Rate Limiting
- Job Retention and Results
- Job Retries
- Timeouts and Deadlines
- Scheduler
- Config Provider for Scheduler
- Using Middleware
- Error Handling
- Manager for Web UI Development
Contributing
We welcome contributions! Please submit issues or pull requests on GitHub.
License
This library is licensed under the MIT License.
Credits
Special thanks to the creators of neoq and Asynq for inspiring this library.