Categorygithub.com/minasoft-technology/zentask
modulepackage
0.4.0
Repository: https://github.com/minasoft-technology/zentask.git
Documentation: pkg.go.dev

# README

ZenTask

Go Reference Go Report Card License: MIT

ZenTask is a distributed task processing system built on top of NATS JetStream, providing a reliable and scalable solution for handling background jobs in Go applications. It offers a simple yet powerful API for distributed task processing with features like retries, scheduling, and dead letter queues.

Features

  • Simple API: Easy to use API for enqueueing and processing tasks
  • Distributed Processing: Built on NATS JetStream for reliable message delivery
  • Flexible Task Options: Support for delayed tasks, retries, and priorities
  • Automatic Retries: Configurable retry policies with backoff
  • Dead Letter Queue: Automatic handling of failed tasks
  • Worker Pools: Configurable worker pools per queue
  • Graceful Shutdown: Clean shutdown with task completion

Installation

go get github.com/minasoft-technology/zentask

Quick Start

Server Example

package main

import (
    "context"
    "log"
    "time"
    "github.com/minasoft-technology/zentask"
)

func main() {
    // Initialize server
    server, err := zentask.NewServer(zentask.ServerConfig{
        NatsURL: "nats://localhost:4222",
        StreamName: "ZENTASK",
        Queues: []zentask.QueueConfig{
            {
                Name:        "emails",
                WorkerCount: 2,
                MaxRetries:  3,
                RetryBackoff: time.Second,
                MaxAckPending: 100,
                AckWait:      time.Minute,
            },
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    defer server.Stop()

    // Register handler
    err = server.RegisterHandler("emails", func(ctx context.Context, task *zentask.Task) error {
        email := task.Payload.(map[string]interface{})
        log.Printf("Sending email to: %v", email["to"])
        return nil
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start processing
    if err := server.Start(); err != nil {
        log.Fatal(err)
    }

    // Wait for shutdown signal
    <-make(chan struct{})
}

Client Example

package main

import (
    "context"
    "log"
    "time"
    "github.com/minasoft-technology/zentask"
)

func main() {
    // Initialize client
    client, err := zentask.NewClient(zentask.Config{
        NatsURL:    "nats://localhost:4222",
        StreamName: "ZENTASK",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create and enqueue a task
    task := &zentask.Task{
        Queue: "emails",
        Payload: map[string]interface{}{
            "to":      "[email protected]",
            "subject": "Welcome!",
            "body":    "Welcome to our service!",
        },
        Options: &zentask.TaskOptions{
            MaxRetries:  3,
            RetryDelay:  time.Second * 5,
            Priority:    1,
            ProcessAt:   time.Now().Add(time.Minute),
        },
    }
    
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Task enqueued with ID: %s", info.ID)
}

Configuration

Client Configuration

type Config struct {
    NatsURL       string        // NATS server URL
    StreamName    string        // JetStream stream name
    RetryAttempts int          // Number of retry attempts for NATS operations
    RetryBackoff  time.Duration // Backoff duration between retries
    RateLimit     float64      // Rate limit for task submission
    RateBurst     int          // Burst size for rate limiting
}

Server Configuration

type ServerConfig struct {
    NatsURL        string         // NATS server URL
    StreamName     string         // JetStream stream name
    Queues         []QueueConfig  // Queue configurations
    ErrorLogger    *log.Logger    // Error logger
    StreamReplicas int           // Number of stream replicas
    StreamMaxAge   time.Duration  // Maximum age of stream messages
    DLQEnabled     bool          // Enable Dead Letter Queue
    DLQRetention   time.Duration  // DLQ message retention period
}

type QueueConfig struct {
    Name           string        // Queue name
    WorkerCount    int          // Number of concurrent workers
    MaxRetries     int          // Maximum retry attempts
    RetryBackoff   time.Duration // Backoff duration between retries
    MaxAckPending  int          // Maximum pending acknowledgments
    AckWait        time.Duration // Acknowledgment wait timeout
    DLQSubject     string       // Dead Letter Queue subject
}

Task Options

type TaskOptions struct {
    MaxRetries  int           // Maximum retry attempts
    RetryDelay  time.Duration // Delay between retries
    Timeout     time.Duration // Task execution timeout
    Priority    int          // Task priority (higher = more priority)
    ProcessAt   time.Time    // Scheduled processing time
    UniqueKey   string       // Unique key for deduplication
    GroupID     string       // Group ID for batch processing
}

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

NewNATSServer creates a new embedded NATS server.

# Constants

Default configuration values.
Default configuration values.
Default configuration values.
Default configuration values.

# Structs

NATSConfig holds the configuration for embedded NATS server.
NATSLogger implements the NATS server.Logger interface.
NATSServer represents the embedded NATS server.