# README
Mongopher-Scheduler
A distributed task scheduler for Go applications with MongoDB and in-memory storage support.
Features
- Simple and lightweight
- Supports asynchronous task processing
- Built-in retry mechanisms with configurable strategies
- Custom task handlers
- Scheduled tasks with future execution
- Task history tracking
- In-memory and MongoDB storage options
- Concurrent-safe operations
- Distributed leader election with TTL-based failover (in-memory and MongoDB)
Leader Election
The scheduler includes a distributed leader election mechanism for coordinating multiple instances. Note: Currently only available for the in-memory version.
The leader election is used to ensure only one scheduler instance processes tasks at a time in a distributed environment. When multiple scheduler instances are running:
- The leader instance handles all task processing
- Non-leader instances remain idle
- If the leader fails, another instance takes over automatically
- The leader periodically renews its status to maintain leadership
Key Features
- TTL-based leadership - Leaders must renew their status periodically (default: 30 seconds)
- Automatic failover - If a leader fails, another instance will take over
- Graceful resignation - Leaders can voluntarily step down
- Concurrent-safe - Uses sync.Map for thread-safe operations
- Mock time provider - Enables precise testing of time-based operations
Usage
In-Memory Example
// Create leader election instance
le := inmemory.NewLeaderElection("instance-1")
// Elect leader
isLeader, err := le.ElectLeader(context.Background())
if err != nil {
log.Fatal(err)
}
if isLeader {
log.Println("This instance is now the leader")
// Start task processing
scheduler.StartScheduler(ctx)
} else {
log.Println("Another instance is the leader")
// Wait for leadership
}
// Check leadership status
isLeader, err = le.IsLeader(context.Background())
if err != nil {
log.Fatal(err)
}
// Gracefully resign
err = le.Resign(context.Background())
if err != nil {
log.Fatal(err)
}
MongoDB Example
// Create MongoDB client
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
client, err := mongo.Connect(context.Background(), clientOptions)
// Create leader election
leaderElection := mongo.NewLeaderElection("instance-1", client, "scheduler_db")
// Start leadership
ctx := context.Background()
leaderElection.Start(ctx)
// Check leadership status
isLeader, err := leaderElection.IsLeader(ctx)
if err != nil {
log.Fatal(err)
}
if isLeader {
log.Println("This instance is now the leader")
// Start task processing
scheduler.StartScheduler(ctx)
}
MongoDB Leader Election
The MongoDB implementation provides distributed leader election with:
- MongoDB-based locking - Uses atomic document updates for leadership acquisition
- TTL-based leadership - Leaders must renew status periodically (default: 30s)
- Automatic failover - Leadership automatically transfers if current leader fails
- Concurrent-safe - Uses MongoDB transactions for thread-safe operations
Collection Structure
The leader election uses a leader_election
collection with:
{
"leader_key": "singleton", // Unique key for leader document
"instance_id": "instance-1", // Current leader instance
"last_seen": ISODate("2025-01-23T20:12:05Z") // Last heartbeat timestamp
}
Indexes
- Unique index on
leader_key
- TTL-like behavior through
last_seen
index
Configuration
Leader election can be configured with:
- TTL Duration - Time-to-live for leadership (default: 30s)
- Refresh Interval - How often leader renews status (default: TTL/2)
- Instance ID - Unique identifier for each scheduler instance
Architecture
The scheduler follows a modular architecture with these core components:
- TaskScheduler Interface - Defines the core scheduling operations
- Store Interface - Abstract storage layer for task persistence
- InMemory Implementation - Non-persistent storage for testing/development
- MongoDB Implementation - Persistent storage for production
- Retry Mechanism - Configurable retry strategies with backoff
- Leader Election - Distributed coordination for multiple instances
- InMemory - Local coordination
- MongoDB - Distributed coordination
Usage
Basic Example
package main
import (
"context"
"github.com/andrzejwitkowski/Mongopher-Scheduler/task_scheduler/scheduler/inmemory"
"github.com/andrzejwitkowski/Mongopher-Scheduler/task_scheduler/store"
"log"
"time"
)
func main() {
// Create new scheduler
scheduler := inmemory.NewInMemoryTaskScheduler()
// Register task handler
scheduler.RegisterHandler("example-task", func(task *store.Task[any, int]) error {
log.Printf("Processing task %d", task.ID)
return nil
})
// Start scheduler
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scheduler.StartScheduler(ctx)
// Schedule new task
_, err := scheduler.RegisterTask("example-task", nil, nil)
if err != nil {
log.Fatal(err)
}
// Wait for task completion
done, err := scheduler.WaitForAllTasksToBeDone()
if err != nil {
log.Fatal(err)
}
log.Printf("All tasks completed: %v", done)
}
Retry Mechanism Example
scheduler.RegisterHandler("retry-task", func(task *store.Task[any, int]) error {
if task.RetryConfig.Attempts < 3 {
return fmt.Errorf("simulated failure attempt %d", task.RetryConfig.Attempts + 1)
}
return nil
})
// Register task with custom retry config
task := store.Task[any, int]{
Name: "retry-task",
RetryConfig: store.RetryConfig{
MaxRetries: 5,
StrategyType: store.RetryStrategyBackpressure,
BaseDelay: 1000, // milliseconds
},
}
Concurrent Map Usage
The scheduler uses Go's sync.Map for concurrent-safe operations in several areas:
- Handler Storage - Stores registered task handlers
- Task Processing - Tracks in-progress tasks
- Status Observers - Manages task status listeners
- Leader Election - Tracks leader state (in-memory only)
Example sync.Map usage in handler registration:
var handlers sync.Map
func RegisterHandler(name string, handler TaskHandler) {
handlers.Store(name, handler)
}
func GetHandler(name string) (TaskHandler, bool) {
if handler, ok := handlers.Load(name); ok {
return handler.(TaskHandler), true
}
return nil, false
}
Configuration
Retry Strategies
- Linear - Fixed delay between retries
- Backpressure - Exponential backoff with jitter
Configure via RetryConfig:
type RetryConfig struct {
MaxRetries int `bson:"max_retries"`
StrategyType string `bson:"strategy_type"` // "linear" or "backpressure"
BaseDelay int `bson:"base_delay"` // in milliseconds
}
Best Practices
- Use MongoDB storage for production deployments
- Implement idempotent task handlers
- Set reasonable retry limits and backoff strategies
- Monitor task history for error patterns
- Use context cancellation for graceful shutdowns
- Implement proper error handling in task handlers
- Configure appropriate TTL for leader election based on your deployment
Testing
The scheduler includes comprehensive tests covering:
- Single task execution
- Concurrent task processing
- Retry mechanisms
- Error handling
- Status tracking
- Leader election scenarios:
- Basic election
- Failover after TTL expiration
- Graceful resignation
- Concurrent elections
- Multiple scheduler coordination (leader handles tasks while non-leader remains idle)
Run tests with:
go test ./...
Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss proposed changes.