# README
nsq-go

Go package providing tools for building NSQ clients, servers and middleware.
Motivations
We ran into production issues with the standard nsq-go package where our workers would enter deadlock situations where they would stop consuming messages and just hang in endless termination loops. After digging through the code and trying to figure out how to address the problem it became clear that a rewrite was going to be faster than dealing with the amount of state synchronization that was done in nsq-go (multiple mutexes, channels and atomic variables involved).
This package is designed to offer less features than the standard nsq-go package and instead focus on simplicity and ease of maintenance and integration.
Consumer
package main
import (
"github.com/xenking/nsq-go"
)
func main() {
// Create a new consumer, looking up nsqd nodes from the listed nsqlookup
// addresses, pulling messages from the 'world' channel of the 'hello' topic
// with a maximum of 250 in-flight messages.
consumer, _ := nsq.StartConsumer(nsq.ConsumerConfig{
Topic: "hello",
Channel: "world",
Lookup: []string{
"nsqlookup-001.service.local:4161",
"nsqlookup-002.service.local:4161",
"nsqlookup-003.service.local:4161",
},
MaxInFlight: 250,
})
// Consume messages, the consumer automatically connects to the nsqd nodes
// it discovers and handles reconnections if something goes wrong.
for msg := range consumer.Messages() {
// handle the message, then call msg.Finish or msg.Requeue
// ...
msg.Finish()
}
}
Producer
package main
import (
"github.com/xenking/nsq-go"
)
func main() {
// Starts a new producer that publishes to the TCP endpoint of a nsqd node.
// The producer automatically handles connections in the background.
producer, _ := nsq.StartProducer(nsq.ProducerConfig{
Topic: "hello",
Address: "localhost:4150",
})
// Publishes a message to the topic that this producer is configured for,
// the method returns when the operation completes, potentially returning an
// error if something went wrong.
producer.Publish([]byte("Hello World!"))
// Stops the producer, all in-flight requests will be canceled and no more
// messages can be published through this producer.
producer.Stop()
}
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
NewConsumer configures a new consumer instance.
NewMessage is a helper for creating Message instances directly.
NewProducer configures a new producer instance.
No description provided by the author
ParseMessageID attempts to parse s, which should be an hexadecimal representation of an 8 byte message ID.
RateLimit consumes messages from the messages channel and limits the rate at which they are produced to the channel returned by this function.
ReadCommand reads a command from the buffered input r, returning it or an error if something went wrong.
ReadFrame reads a frame from the buffer input r, returning it or an error if something went wrong.
StartConsumer creates and starts consuming from NSQ right away.
StartProducer starts and returns a new producer p, configured with the variables from the config parameter, or returning an non-nil error if some of the configuration variables were invalid.
# Constants
CloseWait is the response sent to the CLS command.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
FrameTypeError is the code for frames that carry error responses to commands.
FrameTypeMessage is the code for frames that carry messages.
FrameTypeResponse is the code for frames that carry success responses to commands.
Heartbeat is the response used by NSQ servers for health checks of the connections.
No description provided by the author
OK is returned for most successful responses.
# Structs
Auth represents the AUTH command.
No description provided by the author
Cls represents the CLS command.
No description provided by the author
No description provided by the author
No description provided by the author
Fin represents the FIN command.
Identify represents the IDENTIFY command.
No description provided by the author
No description provided by the author
No description provided by the author
Message is a frame type representing a NSQ message.
MPub represents the MPUB command.
Nop represents the NOP command.
Producer provide an abstraction around using direct connections to nsqd nodes to send messages.
ProducerConfig carries the different variables to tune a newly started producer.
No description provided by the author
ProducerRequest are used to represent operations that are submitted to producers.
Pub represents the PUB command.
Rdy represents the RDY command.
Req represents the REQ command.
Sub represents the SUB command.
No description provided by the author
Touch represents the TOUCH command.
UnknownFrame is used to represent frames of unknown types for which the library has no special implementation.
# Type aliases
Error is a frame type representing error responses to commands.
FrameType is used to represent the different types of frames that a consumer may receive.
MessageID is used to represent NSQ message IDs.
Response is a frame type representing success responses to commands.