Categorygithub.com/quintans/go-scheduler
module
0.2.0
Repository: https://github.com/quintans/go-scheduler.git
Documentation: pkg.go.dev

# README

scheduler

Simple durable and distributed scheduling library for go.

About

inspired by go-quartz and expanded to be able to run running in a distributed mode depending on the storage implementation.

How To

Job interface

responsible to do the work

type Job interface {
    // Kind returns the type of the job.
    Kind() string
    // Execute Called by the Scheduler when a Trigger fires that is associated with the Job.
    // If a nil StoreTask is returned, it will be removed from the scheduler.
    // If an error is returned it will be rescheduled with a backoff.
    Execute(context.Context, *StoreTask) (*StoreTask, error)
}

Implemented examples:

  • PrintJob
  • ShellJob
  • CurlJob

These exists for demonstration purposes only

Trigger interface

responsible to compute the next run time

// Triggers are the 'mechanism' by which Jobs are scheduled.
type Trigger interface {
    // NextFireTime returns the next time at which the Trigger is scheduled to fire.
    NextFireTime(prev time.Time) (time.Time, error)
}

Available implementations:

  • CronTrigger
  • SimpleTrigger

Scheduler interface

the orchestrator

// A Scheduler is the Jobs orchestrator.
// Schedulers responsible for executing Jobs when their associated Triggers fire (when their scheduled time arrives).
type Scheduler interface {
	// RegisterJob registers the job and trigger
	// Fails with ErrJobAlreadyScheduled if job already registered.
	//
	// This should be used to register the jobs before starting the scheduler.
	RegisterJob(job Job, options ...RegisterJobOption) error
	// Start starts the scheduler
	Start(context.Context)
	// ScheduleJob will attempt to schedule a job, failing with ErrJobAlreadyScheduled if it was already scheduled by another process
	// and since the job is already registered it will be picked up by one of the concurrent processes.
	ScheduleJob(ctx context.Context, slug string, job Job, options ...ScheduleJobOption) error
	// GetJobSlugs get slugs of all of the scheduled jobs
	GetJobSlugs(context.Context) ([]string, error)
	// GetScheduledJob get the scheduled job metadata
	GetScheduledJob(ctx context.Context, slug string) (*ScheduledJob, error)
	// DeleteJob remove the job from the execution queue
	DeleteJob(ctx context.Context, slug string) error
	// Clear clear all the scheduled jobs
	Clear(context.Context) error
}

Available implementations:

  • StdScheduler

JobStore interface

where to store job tasks

// JobStore represents the store for the jobs to be executed
type JobStore interface {
	// Create schedule a new task
	Create(context.Context, *StoreTask) error
	// NextRun finds the next run time
	NextRun(context.Context) (*StoreTask, error)
	// Lock find and locks a the next task to be run
	Lock(context.Context, *StoreTask) (*StoreTask, error)
	// Reschedule releases the acquired lock and updates the data for the next run
	Reschedule(context.Context, *StoreTask) error
	// GetSlugs gets all the slugs
	GetSlugs(context.Context) ([]string, error)
	// Get gets a stored task
	Get(ctx context.Context, slug string) (*StoreTask, error)
	// Delete deletes a stored task
	Delete(ctx context.Context, slug string) error
	// Clear all the tasks
	Clear(context.Context) error
}

Available implementations:

  • Memory
  • PostgreSQL
  • Firestore

Example

Repeating

    store := memory.New()
    sched := scheduler.NewStdScheduler(store)

    cronTrigger, _ := trigger.NewCronTrigger("1/5 * * * * *")
    curlJob, err := scheduler.NewCurlJob(
        "curl-clock",
        http.MethodGet, 
        "http://worldclockapi.com/api/json/est/now", 
        "", 
        nil,
    )

    sched.RegisterJob(curlJob, scheduler.WithTrigger(cronTrigger))

    ctx, cancel := context.WithCancel(context.Background())
    sched.Start(ctx)
    delay, _ := cronTrigger.FirstDelay()
    // if the store already has this schedule, meaning this was already scheduled by another process, it will be ignore
    sched.ScheduleJob(ctx, "curl-now", curlJob, scheduler.WithDelay(delay))
    time.Sleep(time.Second * 2)
    cancel()

Once

    conn = ...
    store := postgres.New(conn)
    sched := scheduler.NewStdScheduler(store)

    cancelTxJob := scheduler.NewCancelTransactionJob()
    // no trigger means non repeatable
    sched.RegisterJob(cancelTxJob) 

    ctx, cancel := context.WithCancel(context.Background())
    sched.Start(ctx)
    // if the store already has this schedule, meaning this was already scheduled by another process, it will be ignore
    sched.ScheduleJob(ctx, "cancel-tx", cancelTxJob, scheduler.WithDelay(time.Second), scheduler.WithPayload([]byte("9bfadfd6-8e62-4c58-9b6e-636d666b6643"))
    time.Sleep(time.Second * 2)
    cancel()

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author