# README
asq
Another Stupid task Queue
ASQ is a distributed task queue inprised by machinery and dingo with clean API.
Features
- Invoke functions with arbitary signature
- Task chain supported
- Delayed task supported
- Retry when error
- Supported brokers: redis (standalone and clustered)
Example
Here is a quick demo
package asq
import (
"context"
"database/sql"
"flag"
"fmt"
"testing"
"time"
"emperror.dev/errors"
"github.com/cheekybits/is"
"github.com/zigzed/asq/redis"
"github.com/zigzed/asq/task"
)
func TestMakeAppLink(t *testing.T) {
t1 := task.NewTask(nil, "a", 1)
t2 := task.NewTask(nil, "b", 2)
t3 := task.NewTask(nil, "c", 3)
app := App{}
x1 := app.makeTaskLink(t1, t2, t3)
x := x1
for n := 1; x != nil; n++ {
for i := 0; i < n*2; i++ {
fmt.Print(" ")
}
fmt.Printf("taskLink: %+v\n", x)
if len(x.OnSuccess) > 0 {
x = x.OnSuccess[0]
} else {
x = nil
}
}
}
func testA() error {
fmt.Printf(" in testA done\n")
return nil
}
func testB(i int) error {
fmt.Printf(" in testB(%d) done\n", i)
return nil
}
func testC(i int) (int, error) {
fmt.Printf(" in testC(%d) %d done\n", i, i*2)
return i * 2, nil
}
type simpleStruct struct {
A string
B int
C struct {
M string
N sql.NullString
P *int
}
}
func testD(v simpleStruct) error {
fmt.Printf(" in testD(%+v) done\n", v)
return nil
}
var testEv = 5
func testE(v *int) (*simpleStruct, error) {
testEv--
r := &simpleStruct{
A: "result_A",
B: testEv,
C: struct {
M string
N sql.NullString
P *int
}{
M: "result_M",
P: v,
},
}
if testEv > 0 {
fmt.Printf(" inTestE(%v) error\n", testEv)
return r, errors.Errorf("test error %d", testEv)
}
fmt.Printf(" inTestE(%v) done\n", testEv)
return r, nil
}
func testF() error {
panic(" testF panic")
}
func testG(a, b, c int) (int, int, int, error) {
fmt.Printf(" inTestG(%d, %d, %d) (%d, %d, %d)\n",
a, b, c, a*2, b*2, c*2)
return a * 2, b * 2, c * 2, nil
}
func testH() (time.Time, error) {
ts := time.Now()
fmt.Printf(" inTestH() %s", ts.Format("2006-01-02 15:04:05.000"))
return ts, nil
}
func testI(a string, b int) (int, string, error) {
fmt.Printf(" inTestI(%s, %d) (%d, %s)\n", a, b, b, a)
return b, a, nil
}
var interval = 50 * time.Millisecond
func TestAsq(t *testing.T) {
flag.Parse()
is := is.New(t)
app, err := NewAppFromRedis(redis.Option{
Addrs: []string{"127.0.0.1:6379"},
PollInterval: interval,
}, "test")
is.NoErr(err)
err = app.Register("testA", testA)
is.NoErr(err)
err = app.Register("testB", testB)
is.NoErr(err)
err = app.Register("testC", testC)
is.NoErr(err)
err = app.Register("testD", testD)
is.NoErr(err)
err = app.Register("testE", testE)
is.NoErr(err)
err = app.Register("testF", testF)
is.NoErr(err)
err = app.Register("testG", testG)
is.NoErr(err)
err = app.Register("testH", testH)
is.NoErr(err)
err = app.Register("testI", testI)
is.NoErr(err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err = app.StartWorker(ctx, 2)
is.NoErr(err)
}()
_, err = app.SubmitTask(ctx, task.NewTask(nil, "testA"))
is.NoErr(err)
br()
_, err = app.SubmitTask(ctx, task.NewTask(nil, "testB", 3))
is.NoErr(err)
br()
ar, err := app.SubmitTask(ctx, task.NewTask(nil, "testC", 30))
is.NoErr(err)
is.NotNil(ar)
var (
valc int
errc error
)
ok, err := ar.Wait(ctx, &valc)
is.True(ok)
is.Equal(valc, 60)
is.NoErr(errc)
fmt.Printf("testC result: %+v, %+v\n", valc, errc)
br()
_, err = app.SubmitTask(ctx, task.NewTask(nil, "testD", simpleStruct{
A: "a",
B: 1,
C: struct {
M string
N sql.NullString
P *int
}{
M: "m",
N: sql.NullString{Valid: true, String: "n"},
P: new(int),
},
}))
is.NoErr(err)
br()
testEv = 5
v := 2
ar, err = app.SubmitTask(ctx,
task.NewTask(
task.NewTaskOption(3, 2*time.Second).WithResultExpired(2*time.Minute),
"testE",
&v))
is.NoErr(err)
is.NotNil(ar)
br()
var (
vale simpleStruct
)
ok, err = ar.Wait(ctx, &vale)
is.True(ok)
is.Err(err)
fmt.Printf("testE result: %v, %v, %v\n", vale, ok, err)
br()
testEv = 2
ar, err = app.SubmitTask(ctx,
task.NewTask(
task.NewTaskOption(3, 2*time.Second).WithResultExpired(2*time.Minute),
"testE",
&v))
ok, err = ar.Wait(ctx, &vale)
is.NoErr(err)
is.True(ok)
is.Equal(vale.A, "result_A")
is.Equal(vale.B, 0)
is.Equal(vale.C.M, "result_M")
is.NotNil(vale.C.P)
is.Equal(*vale.C.P, v)
fmt.Printf("testE result: %v, %v, %v\n", vale, ok, err)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(
nil,
"testF"))
ok, err = ar.Wait(ctx)
fmt.Printf("testF result: %v, %v\n", ok, err)
br()
_, err = app.SubmitTask(ctx,
task.NewTask(
nil,
"testA"))
is.NoErr(err)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testC", 300),
task.NewTask(nil, "testB"))
is.NoErr(err)
ok, err = ar.Wait(ctx)
is.NoErr(err)
is.True(ok)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testC", 500),
task.NewTask(nil, "testC"),
task.NewTask(nil, "testC"))
is.NoErr(err)
ok, err = ar.Wait(ctx, &valc)
is.NoErr(err)
is.True(ok)
is.Equal(valc, 4000)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testA"),
task.NewTask(nil, "testC", 200),
task.NewTask(nil, "testC"))
is.NoErr(err)
ok, err = ar.Wait(ctx, &valc)
is.NoErr(err)
is.True(ok)
is.Equal(valc, 800)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testG", 1, 2, 3))
is.NoErr(err)
var x, y, z int
ok, err = ar.Wait(ctx, &x, &y, &z)
is.NoErr(err)
is.True(ok)
is.Equal(x, 2)
is.Equal(y, 4)
is.Equal(z, 6)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testG", 100, 200, 300),
task.NewTask(nil, "testG"),
task.NewTask(nil, "testG"))
is.NoErr(err)
ok, err = ar.Wait(ctx, &x, &y, &z)
is.NoErr(err)
is.True(ok)
is.Equal(x, 800)
is.Equal(y, 1600)
is.Equal(z, 2400)
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testH"))
is.NoErr(err)
var ts time.Time
ok, err = ar.Wait(ctx, &ts)
is.NoErr(err)
is.True(ok)
diff := time.Since(ts)
fmt.Printf("testH timing: %.6f\n", diff.Seconds())
br()
ar, err = app.SubmitTask(ctx,
task.NewTask(nil, "testI", "mm", 111))
is.NoErr(err)
var m string
var n int
ok, err = ar.Wait(ctx, &n, &m)
is.NoErr(err)
is.True(ok)
is.Equal(m, "mm")
is.Equal(n, 111)
br()
}
func br() {
fmt.Printf("\n")
}
# Packages
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
# Structs
No description provided by the author
No description provided by the author
No description provided by the author
# Type aliases
No description provided by the author