Categorygithub.com/oze4/workerpoolxt
modulepackage
1.1.1
Repository: https://github.com/oze4/workerpoolxt.git
Documentation: pkg.go.dev

# README

workerpoolxt GitHub

GitHub Workflow Status Coveralls github
Codacy grade

Worker pool library that extends https://github.com/gammazero/workerpool

Synopsis


Hello World

  • Obligatory "as simple as it gets" example
package main

import (
    "context"
    "fmt"
    wpxt "github.com/oze4/workerpoolxt"
)

func main() {
    ctx := context.Background()
    numWorkers := 10

    wp := wpxt.New(ctx, numWorkers)

    wp.SubmitXT(wpxt.Job{
        Name: "My first job",
        Task: func(o wpxt.Options) wpxt.Response {
            return wpxt.Response{Data: "Hello, world!"}
        },
    })

    jobResults := wp.StopWaitXT()

    for _, jobresult := range jobResults {
        fmt.Println(jobresult)
    }
}

How we extend workerpool

Results

// ...
// ... pretend we submitted jobs here
// ...

results := wp.StopWaitXT() // -> []wpxt.Response

for _, result := range results {
    // If job failed, `result.Error != nil`
}

Error Handling

  • What if I encounter an error in one of my jobs?
  • How can I handle or check for errors/timeout?

Return Error From Job

// Just set the `Error` field on the `wpxt.Response` you return
wp.SubmitXT(wpxt.Job{
    Name: "How to handle errors",
    Task: func(o wpxt.Options) wpxt.Response {
        // Pretend we got an error doing something
        if theError != nil {
            return wpxt.Response{Error: theError}
        }
    },
})

Check For Errors In Response

// ... pretend we submitted a bunch of jobs
//
// StopWaitXT() returns []wpxt.Response
// Each response has an `Error` field
// Whether a timeout, or an error you set
// Check for it like
if someResponseFromSomeJob.Error != nil {
    // ....
}

Context

  • Required default context when creating new workerpoolxt
  • You can override default context per job

Default Context

myctx := context.Background() // Any `context.Context`
numWorkers := 10
wp := wpxt.New(myctx, numWorkers)

Per Job Context

Timeouts

defaultCtx := context.Background()
numWorkers := 10
wp := wpxt.New(defaultCtx, numWorkers)
timeout := time.Duration(time.Millisecond)

myCtx, done := context.WithTimeout(context.Background(), timeout)
defer done()

wp.SubmitXT(wpxt.Job{
    Name: "my ctx job",
    Context: myCtx,
    Task: func(o wpxt.Options) wpxt.Response {
        // Simulate long running task
        time.Sleep(time.Second*10) 
        return wpxt.Response{Data: "I could be anything"}
    },
})
// > `Response.Error` will be `context.DeadlineExceeded`

Retry

  • Optional
  • Seamlessly retry failed jobs
wp.SubmitXT(wpxt.Job{
    // This job is configured to fail immediately, 
    // therefore it will retry 5 times
    // (as long as we have not exceeded our job timeout)
    timeoutctx, _ := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*500))
    Retry: 5,
    // ^^^^^^
    Name: "I will retry 5 times",
    // Set timeout field on job
    Context: timeoutctx,
    Task: func(o wpxt.Options) wpxt.Response {
        return wpxt.Response{Error: errors.New("some_err")}
    },
})

Options

  • Help make jobs flexible

Default Options

myopts := map[string]interface{}{
    "myclient": &http.Client{},
}

wp := wpxt.New(context.Background(), 10)
wp.WithOptions(myopts)

wp.SubmitXT(wpxt.Job{
    Name: "myjob",
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        client := o["myclient"]
    },
})

Per Job Options

myhttpclient := &http.Client{}
myk8sclient := kubernetes.Clientset{}

// This Job Only Needs an HTTP Client
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs an HTTP Client",
    Options: map[string]interface{}{
        "http": myhttpclient,
    },
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        httpclient := o["http"]
        // ... do work with `httpclient`
    },
})

// This Job Only Needs Kubernetes Clientset
wp.SubmitXT(wpxt.Job{
    Name: "This Job Only Needs Kubernetes Clientset",
    Options: map[string]interface{}{
        "kube": myk8sclient,
    },
    Task: func(o wpxt.Options) wpxt.Response {
        // access options here
        kubernetesclient := o["kube"]
        // ... do work with `kubernetesclient`
    },
})

# Functions

New creates WorkerPoolXT.

# Structs

Job holds job data.
Response holds job results.
WorkerPoolXT extends `github.com/gammazero/workerpool`.

# Type aliases

Options hold misc options.