# README
NSQ
NSQ as backend with Queue package (A realtime distributed messaging platform)
Setup
start the NSQ lookupd
nsqlookupd
start the NSQ server
nsqd --lookupd-tcp-address=localhost:4160
start the NSQ admin dashboard
nsqadmin --lookupd-http-address localhost:4161
Testing
go test -v ./...
Example
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/golang-queue/nsq"
"github.com/golang-queue/queue"
)
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := nsq.NewWorker(
nsq.WithAddr("127.0.0.1:4150"),
nsq.WithTopic("example"),
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
var v *job
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
rets <- v.Message
return nil
}),
)
// define the queue
q := queue.NewPool(
10,
queue.WithWorker(w),
)
defer q.Release()
// assign tasks in queue
for i := 0; i < taskN; i++ {
go func(i int) {
if err := q.Queue(&job{
Message: fmt.Sprintf("handle the job: %d", i+1),
}); err != nil {
log.Fatal(err)
}
}(i)
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
}
# Functions
NewWorker for struc.
WithAddr setup the addr of NSQ.
WithChannel setup the channel of NSQ.
WithLogger set custom logger.
WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob).
WithRunFunc setup the run func of queue.
WithTopic setup the topic of NSQ.
# Interfaces
An Option configures a mutex.
# Type aliases
OptionFunc is a function that configures a queue.