Categorygithub.com/nats-io/go-nats
modulepackage
1.7.2
Repository: https://github.com/nats-io/go-nats.git
Documentation: pkg.go.dev

# README

NATS - Go Client

A Go client for the NATS messaging system.

License Apache 2 FOSSA Status Go Report Card Build Status GoDoc Coverage Status

Installation

# Go client
go get github.com/nats-io/go-nats

# Server
go get github.com/nats-io/gnatsd

Basic Usage

import nats "github.com/nats-io/go-nats"

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// Drain connection (Preferred for responders)
// Close() not needed if this is called.
nc.Drain()

// Close connection
nc.Close()

Encoded Connections


nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
...
sub.Unsubscribe()

// Requests
var response string
err := c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

New Authentication (Nkeys and User Credentials)

This requires server with version >= 2.0.0

NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys. The simplest form is to use the helper method UserCredentials(credsFilepath).

nc, err := nats.Connect(url, UserCredentials("user.creds"))

The helper methos creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

nc, err := nats.Connect(url, UserCredentials("user.jwt", "user.nk"))

You can also set the callback handlers directly and manage challenge signing directly.

nc, err := nats.Connect(url, UserJWT(jwtCB, sigCB))

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)

// Direct
nc, err := nats.Connect(serverUrl, Nkey(pubNkey, sigCB))

TLS

// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://nats.demo.io:4443")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Wildcard Subscriptions


// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

Advanced Usage


// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage


var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
	nats.DisconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got disconnected!\n")
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	}),
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
	})
)

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),
    nats.Token("S3cretT0ken"))

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specfied through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to locahost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

Context support (+Go 1.7)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
	Message string `json:"message"`
}
type response struct {
	Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

FOSSA Status

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

ClientCert is a helper option to provide the client certificate from a file.
ClosedHandler is an Option to set the closed handler.
Connect will attempt to connect to the NATS system.
Dialer is an Option to set the dialer which will be used when attempting to establish a connection.
DisconnectHandler is an Option to set the disconnected handler.
DiscoveredServersHandler is an Option to set the new servers handler.
DontRandomize is an Option to turn off randomizing the server pool.
DrainTimeout is an Option to set the timeout for draining a connection.
EncoderForType will return the registered Encoder for the encType.
ErrorHandler is an Option to set the async error handler.
FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
GetDefaultOptions returns default configuration options for the client.
MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go un-answered by the server before closing the connection.
MaxReconnects is an Option to set the maximum number of reconnect attempts.
Name is an Option to set the client name.
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.
NewInbox will return an inbox string which can be used for directed replies from subscribers.
Nkey will set the public Nkey and the signature callback to sign the server nonce.
NkeyOptionFromSeed will load an nkey pair from a seed file.
NoEcho is an Option to turn off messages echoing back from a server.
NoReconnect is an Option to turn off reconnect behavior.
PingInterval is an Option to set the period for client ping commands.
ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
ReconnectHandler is an Option to set the reconnected handler.
ReconnectWait is an Option to set the wait time between reconnect attempts.
RegisterEncoder will register the encType with the given Encoder.
RootCAs is a helper option to provide the RootCAs pool from a list of filenames.
Secure is an Option to enable TLS secure connections that skip server verification by default.
SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection.
SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync().
Timeout is an Option to set the timeout for Dial on a connection.
Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.
TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.
UseOldRequestStyle is an Option to force usage of the old Request style.
UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.
UserInfo is an Option to set the username and password to use when not included directly in the URLs.
UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce.

# Constants

The different types of subscription types.
AUTHORIZATION_ERR is for when nats server user authorization has failed.
The different types of subscription types.
No description provided by the author
No description provided by the author
No description provided by the author
Indexe names into the Registered Encoders.
Default Constants.
8k.
Default Constants.
Default Constants.
Default Constants.
Default Constants.
8MB.
Default Constants.
Pending Limits.
Pending Limits.
Default Constants.
Default Constants.
No description provided by the author
No description provided by the author
No description provided by the author
Indexe names into the Registered Encoders.
InboxPrefix is the prefix for all inbox subjects.
No description provided by the author
Indexe names into the Registered Encoders.
Default Constants.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
The different types of subscription types.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
PERMISSIONS_ERR is for when nats server subject authorization has failed.
No description provided by the author
Default Constants.
STALE_CONNECTION is for detection and proper handling of stale connections.
The different types of subscription types.
Default Constants.

# Variables

# Structs

A Conn represents a bare connection to a nats-server.
EncodedConn are the preferred way to interface with NATS.
Msg is a structure used by Subscribers and PublishMsg().
Options can be used to create a customized connection.
Tracks various stats received and sent on this connection, including counts for messages and bytes.
A Subscription represents interest in a given subject.

# Interfaces

CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer.
Encoder interface is for all register encoders.
Handler is a specific callback used for Subscribe.

# Type aliases

AuthTokenHandler is used to generate a new token.
ConnHandler is used for asynchronous events such as disconnected and closed connections.
ErrHandler is used to process asynchronous errors encountered while processing inbound messages.
MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.
Option is a function on the options for a connection.
SignatureHandler is used to sign a nonce from the server while authenticating with nkeys.
Status represents the state of the connection.
SubscriptionType is the type of the Subscription.
UserJWTHandler is used to fetch and return the account signed JWT for this user.