Categorygithub.com/segmentio/nsq-go
modulepackage
1.2.8
Repository: https://github.com/segmentio/nsq-go.git
Documentation: pkg.go.dev

# README

nsq-go CircleCI Go Report Card GoDoc

Go package providing tools for building NSQ clients, servers and middleware.

Motivations

We ran into production issues with the standard nsq-go package where our workers would enter deadlock situations where they would stop consuming messages and just hang in endless termination loops. After digging through the code and trying to figure out how to address the problem it became clear that a rewrite was going to be faster than dealing with the amount of state synchronization that was done in nsq-go (multiple mutexes, channels and atomic variables involved).

This package is designed to offer less features than the standard nsq-go package and instead focus on simplicity and ease of maintenance and integration.

Consumer

package main

import (
    "github.com/segmentio/nsq-go"
)

func main() {
    // Create a new consumer, looking up nsqd nodes from the listed nsqlookup
    // addresses, pulling messages from the 'world' channel of the 'hello' topic
    // with a maximum of 250 in-flight messages.
    consumer, _ := nsq.StartConsumer(nsq.ConsumerConfig{
        Topic:   "hello",
        Channel: "world",
        Lookup:  []string{
            "nsqlookup-001.service.local:4161",
            "nsqlookup-002.service.local:4161",
            "nsqlookup-003.service.local:4161",
        },
        MaxInFlight: 250,
    })

    // Consume messages, the consumer automatically connects to the nsqd nodes
    // it discovers and handles reconnections if something goes wrong.
    for msg := range consumer.Messages() {
        // handle the message, then call msg.Finish or msg.Requeue
        // ...
        msg.Finish()
    }
}

Producer

package main

import (
    "github.com/segmentio/nsq-go"
)

func main() {
     // Starts a new producer that publishes to the TCP endpoint of a nsqd node.
     // The producer automatically handles connections in the background.
    producer, _ := nsq.StartProducer(nsq.ProducerConfig{
        Topic:   "hello",
        Address: "localhost:4150",
    })

    // Publishes a message to the topic that this producer is configured for,
    // the method returns when the operation completes, potentially returning an
    // error if something went wrong.
    producer.Publish([]byte("Hello World!"))

    // Stops the producer, all in-flight requests will be canceled and no more
    // messages can be published through this producer.
    producer.Stop()
}

Testing

Unit tests expect Consul and a set of NSQ daemons to be running. Start them using Docker Compose, and then run unit tests.

docker-compose up -d
go test -v -race ./...
docker-compose down

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
NewConsumer configures a new consumer instance.
NewMessage is a helper for creating Message instances directly.
NewProducer configures a new producer instance.
ParseMessageID attempts to parse s, which should be an hexadecimal representation of an 8 byte message ID.
RateLimit consumes messages from the messages channel and limits the rate at which they are produced to the channel returned by this function.
ReadCommand reads a command from the buffered input r, returning it or an error if something went wrong.
ReadFrame reads a frame from the buffer input r, returning it or an error if something went wrong.
StartConsumer creates and starts consuming from NSQ right away.
StartProducer starts and returns a new producer p, configured with the variables from the config parameter, or returning an non-nil error if some of the configuration variables were invalid.

# Constants

CloseWait is the response sent to the CLS command.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
FrameTypeError is the code for frames that carry error responses to commands.
FrameTypeMessage is the code for frames that carry messages.
FrameTypeResponse is the code for frames that carry success responses to commands.
Heartbeat is the response used by NSQ servers for health checks of the connections.
No description provided by the author
OK is returned for most successful responses.

# Structs

Auth represents the AUTH command.
No description provided by the author
Cls represents the CLS command.
No description provided by the author
No description provided by the author
No description provided by the author
Fin represents the FIN command.
Identify represents the IDENTIFY command.
No description provided by the author
No description provided by the author
Message is a frame type representing a NSQ message.
MPub represents the MPUB command.
Nop represents the NOP command.
Producer provide an abstraction around using direct connections to nsqd nodes to send messages.
ProducerConfig carries the different variables to tune a newly started producer.
No description provided by the author
ProducerRequest are used to represent operations that are submitted to producers.
Pub represents the PUB command.
Rdy represents the RDY command.
Req represents the REQ command.
Sub represents the SUB command.
Touch represents the TOUCH command.
UnknownFrame is used to represent frames of unknown types for which the library has no special implementation.

# Interfaces

The Command interface is implemented by types that represent the different commands of the NSQ protocol.
The Frame interface is implemented by types that represent the different types of frames that a consumer may receive.

# Type aliases

Error is a frame type representing error responses to commands.
FrameType is used to represent the different types of frames that a consumer may receive.
MessageID is used to represent NSQ message IDs.
Response is a frame type representing success responses to commands.