Categorygithub.com/nats-io/jsm.go
modulepackage
0.2.0
Repository: https://github.com/nats-io/jsm.go.git
Documentation: pkg.go.dev

# README

Go Reference

Overview

This is a Go based library to manage and interact with JetStream.

This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs. It's essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical knowledge of the JetStream internals.

For typical end users we suggest the nats.go package.

Initialization

This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.

Multiple Managers can be used in your application each with own timeouts and connection.

mgr, _ := jsm.New(nc, jsm.WithTimeout(10*time.Second))

This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.

Schema Registry

All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.

The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.

Example Message

To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:

{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "TESTING",
    "subjects": [
      "js.in.testing"
    ],
    "retention": "limits",
    "max_consumers": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "discard": "old",
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "num_replicas": 1,
    "duplicate_window": 120000000000
  },
  "created": "2020-10-09T12:40:07.648216464Z",
  "state": {
    "messages": 1,
    "bytes": 81,
    "first_seq": 1017,
    "first_ts": "2020-10-09T19:43:40.867729419Z",
    "last_seq": 1017,
    "last_ts": "2020-10-09T19:43:40.867729419Z",
    "consumer_count": 1
  }
}

Here the type of the message is io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.

Message Schemas

Given a message kind one can retrieve the full JSON Schema as bytes:

schema, _ := api.Schema("io.nats.jetstream.api.v1.stream_info_response")

Once can also retrieve it based on a specific message content:

schemaType, _ := api.SchemaTypeForMessage(m.Data)
schema, _ := api.Schema(schemaType)

Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api Reference.

Parsing Message Content

JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)
    log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit

    switch e := event.(type){
    case advisory.JetStreamAPIAuditV1:
        fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)
    }
})

Above we gain full access to all contents of the message in it's native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)

    if kind == "io.nats.unknown_message" {
        return // a message without metadata or of a unknown format was received
    }

    ne, ok := event.(api.Event)
    if !ok {
        return fmt.Errorf("event %q does not implement the Event interface", kind)
    }

    err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
    if err != nil {
        return fmt.Errorf("display failed: %s", err)
    }
})

This will produce output like:

11:25:49 [JS API] $JS.API.STREAM.INFO.TESTING $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:53 [JS API] $JS.API.STREAM.INFO.TESTING $G

The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.

API Validation

The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.

type SchemaValidator struct{}

func (v SchemaValidator) ValidateStruct(data any, schemaType string) (ok bool, errs []string) {
	s, err := api.Schema(schemaType)
	if err != nil {
		return false, []string{"unknown schema type %s", schemaType}
	}

	ls := gojsonschema.NewBytesLoader(s)
	ld := gojsonschema.NewGoLoader(data)
	result, err := gojsonschema.Validate(ls, ld)
	if err != nil {
		return false, []string{fmt.Sprintf("validation failed: %s", err)}
	}

	if result.Valid() {
		return true, nil
	}

	errors := make([]string, len(result.Errors()))
	for i, verr := range result.Errors() {
		errors[i] = verr.String()
	}

	return false, errors
}

This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.

This can be used by the Manager to validate all API access.

mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))

Build tag

This library provides a noexprlang build tag that disables expression matching for Streams and Consumers queries. The purpose of this build tag is to disable the use of the github.com/expr-lang/expr module that disables go compiler's dead code elimination because it uses some types and functions of the reflect package.

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
No description provided by the author
No description provided by the author

# Functions

AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages.
AcknowledgeExplicit requires that every message received be acknowledged.
AcknowledgeNone disables message acknowledgement.
AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted.
No description provided by the author
No description provided by the author
No description provided by the author
APISubject returns API subject with prefix applied.
No description provided by the author
BackoffIntervals sets a series of intervals by which retries will be attempted for this consumr.
BackoffPolicy sets a consumer policy.
No description provided by the author
ConsumerDescription is a textual description of this consumer to provide additional context.
No description provided by the author
No description provided by the author
ConsumerName sets a name for the consumer, when creating a durable consumer use DurableName, using ConsumerName allows for creating named ephemeral consumers, else a random name will be generated.
No description provided by the author
ConsumerOverrideReplicas override the replica count inherited from the Stream with this value.
ConsumerQueryApiLevelMin limits results to assets requiring API Level above or equal to level.
ConsumerQueryExpression filters the consumers using the expr expression language Using this option with a binary built with the `noexprlang` build tag will always return [ErrNoExprLangBuild].
ConsumerQueryInvert inverts the logic of filters, older than becomes newer than and so forth.
ConsumerQueryIsBound finds push consumers that are bound or pull consumers with waiting pulls.
ConsumerQueryIsPinned finds consumers with pinned clients on all their groups.
ConsumerQueryIsPull finds only Pull consumers.
ConsumerQueryIsPush finds only Push consumers.
ConsumerQueryLeaderServer finds clustered consumers where a certain node is the leader.
ConsumerQueryOlderThan finds consumers older than age.
ConsumerQueryReplicas finds streams with a certain number of replicas or less.
ConsumerQueryWithDeliverySince finds only consumers that has had deliveries since ts.
ConsumerQueryWithFewerAckPending finds consumers with fewer pending messages.
ConsumerQueryWithFewerPending finds consumers with fewer unprocessed messages.
ConsumerQueryWithFewerWaiting finds consumers with fewer waiting pulls.
DeliverAllAvailable delivers messages starting with the first available in the stream.
DeliverBodies configures the consumer to deliver the headers and the bodies for each message.
DeliverGroup when set will only deliver messages to subscriptions matching that group.
DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies.
DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer.
DeliverySubject is the subject where a Push consumer will deliver its messages.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
DurableName is the name given to the consumer, when not set an ephemeral consumer is created.
EventSubject returns Event subject with prefix applied.
No description provided by the author
FilterServerMetadata copies metadata with the server generated metadata removed.
FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject.
No description provided by the author
IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive.
InactiveThreshold is the idle time an ephemeral consumer allows before it is removed.
No description provided by the author
IsErrorResponse checks if the message holds a standard JetStream error.
IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state.
IsKVBucketStream determines if a stream is a KV bucket.
IsMQTTStateStream determines if a stream holds internal MQTT state.
IsNatsError checks if err is a ApiErr matching code.
IsObjectBucketStream determines if a stream is a Object bucket.
IsOKResponse checks if the message holds a standard JetStream error.
IsValidName verifies if n is a valid stream, template or consumer name.
No description provided by the author
LinearBackoffPeriods creates a backoff policy without any jitter suitable for use in a consumer backoff policy The periods start from min and increase linearly until ~max.
LinearBackoffPolicy creates a backoff policy with linearly increasing steps between min and max.
MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended.
No description provided by the author
No description provided by the author
No description provided by the author
MaxDeliveryAttempts is the number of times a message will be attempted to be delivered.
No description provided by the author
No description provided by the author
No description provided by the author
MaxRequestBatch is the largest batch that can be specified when doing pulls against the consumer.
MaxRequestExpires is the longest pull request expire the server will allow.
MaxRequestMaxBytes sets the limit of max bytes a consumer my request.
MaxWaiting is the number of outstanding pulls that are allowed on any one consumer.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
NewConsumerConfiguration generates a new configuration based on template modified by opts.
NewStreamConfiguration generates a new configuration based on template modified by opts.
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer.
No description provided by the author
No description provided by the author
No description provided by the author
OverflowPriorityGroups sets the consumer to support overflow pull requests.
PagerFilterSubject sets a filter subject for the pager.
PagerSize is the size of pages to walk.
PagerStartDelta sets a starting time delta for the pager.
PagerStartId sets a starting stream sequence for the pager.
PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached.
ParseDuration parse durations with additional units over those from standard go parser.
ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil.
ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage.
ParseJSMsgMetadata parse the reply subject metadata to determine message metadata When given a message obtained using Direct Get APIs several fields will be filled in but consumer related ones will not as there is no consumer involved in that case.
ParseJSMsgMetadataDirect parses the DIRECT GET headers into a MsgInfo, in this case all consumer related properties will not be filled in as there is no consumer involved.
ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message.
ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed.
No description provided by the author
PinnedClientPriorityGroups sets the consumer to be a pinned client priority consumer with a certain list of groups.
No description provided by the author
No description provided by the author
No description provided by the author
PushFlowControl enables flow control for push based consumers.
RateLimitBitsPerSecond limits message delivery to a rate in bits per second.
ReplayAsReceived delivers messages at the rate they were received at.
ReplayInstantly delivers messages to the consumer as fast as possible.
No description provided by the author
No description provided by the author
RestoreConfiguration overrides the configuration used to restore.
RestoreNotify notifies cb about progress of the restore operation.
SamplePercent configures sampling of a subset of messages expressed as a percentage.
ServerCidString takes a kind like server.CLIENT a similar cid like the server would, eg cid:10.
ServerKindString takes a kind like server.CLIENT and returns a string describing it.
SnapshotChunkSize sets the size of messages holding data the server will send, good values are 64KB and 128KB.
SnapshotConsumers includes consumer configuration and state in backups.
SnapshotDebug enables logging using the standard go logging library.
SnapshotHealthCheck performs a health check prior to starting the snapshot.
SnapshotNotify notifies cb about progress of the snapshot operation.
No description provided by the author
StartAtSequence starts consuming messages at a specific sequence in the stream.
StartAtTime starts consuming messages at a specific point in time in the stream.
StartAtTimeDelta starts delivering messages at a past point in time.
StartWithLastReceived starts delivery at the last messages received in the stream.
StartWithNextReceived starts delivery at the next messages received in the stream.
StreamDescription is a textual description of this stream to provide additional context.
No description provided by the author
StreamQueryApiLevelMin limits results to assets requiring API Level above or equal to level.
StreamQueryClusterName limits results to servers within a cluster matched by a regular expression.
StreamQueryExpression filters the stream using the expr expression language Using this option with a binary built with the `noexprlang` build tag will always return [ErrNoExprLangBuild].
StreamQueryFewerConsumersThan limits results to streams with fewer than or equal consumers than c.
StreamQueryIdleLongerThan limits results to streams that has not received messages for a period longer than p.
StreamQueryInvert inverts the logic of filters, older than becomes newer than and so forth.
No description provided by the author
No description provided by the author
StreamQueryLeaderServer finds clustered streams where a certain node is the leader.
StreamQueryOlderThan limits the results to streams older than p.
StreamQueryReplicas finds streams with a certain number of replicas or less.
StreamQueryServerName limits results to servers matching a regular expression.
StreamQuerySubjectWildcard limits results to streams with subject interest matching standard a nats wildcard.
StreamQueryWithoutMessages limits results to streams with no messages.
No description provided by the author
SubjectIsSubsetMatch tests if a subject matches a standard nats wildcard.
No description provided by the author
No description provided by the author
WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES.
WithAPIValidation validates responses sent from the NATS server using a validator.
WithDomain sets a JetStream domain, incompatible with WithApiPrefix().
WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY.
WithPedanticRequests enables pedantic mode in certain API calls that would avoid the server changing user configurations during request handling.
WithProgress enables progress tracking.
WithTimeout sets a timeout for the requests.
WithTrace enables logging of JSON API requests and responses.
No description provided by the author

# Constants

No description provided by the author
No description provided by the author

# Variables

DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer.
DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age.
DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream.
DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age.
No description provided by the author
ErrMemoryStreamNotSupported is an error indicating a memory stream was being snapshotted which is not supported.
ErrNoExprLangBuild warns that expression matching is disabled when compiling a go binary with the `noexprlang` build tag.
SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer.

# Structs

Consumer represents a JetStream consumer.
No description provided by the author
MsgInfo holds metadata about a message that was received from JetStream.
Stream represents a JetStream Stream.
StreamNamesFilter limits the names being returned by the names API.
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author

# Type aliases

ConsumerOption configures consumers.
No description provided by the author
Option is a option to configure the JetStream Manager.
PagerOption configures the stream pager.
No description provided by the author
StreamOption configures a stream.
No description provided by the author