Categorygithub.com/golang-queue/nsq
modulepackage
0.1.1
Repository: https://github.com/golang-queue/nsq.git
Documentation: pkg.go.dev

# README

NSQ

Run Testing Go Report Card codecov

NSQ as backend with Queue package (A realtime distributed messaging platform)

screen

Setup

start the NSQ lookupd

nsqlookupd

start the NSQ server

nsqd --lookupd-tcp-address=localhost:4160

start the NSQ admin dashboard

nsqadmin --lookupd-http-address localhost:4161

Testing

go test -v ./...

Example

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      var v *job
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    10,
    queue.WithWorker(w),
  )
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

# Functions

NewWorker for struc.
WithAddr setup the addr of NSQ.
WithChannel setup the channel of NSQ.
WithLogger set custom logger.
WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob).
WithRunFunc setup the run func of queue.
WithTopic setup the topic of NSQ.

# Structs

No description provided by the author
Worker for NSQ.

# Interfaces

An Option configures a mutex.

# Type aliases

OptionFunc is a function that configures a queue.