Categorygithub.com/bsm/bps
modulepackage
0.2.5
Repository: https://github.com/bsm/bps.git
Documentation: pkg.go.dev

# README

BPS

Build Status GoDoc Go Report Card Gem Version

Multi-backend abstraction for message processing and pubsub queues for Go and Ruby.

Documentation

Check auto-generated documentation:

Install

# go:
go get -u github.com/bsm/bps
go get -u github.com/bsm/bps/kafka
go get -u github.com/bsm/bps/nats
go get -u github.com/bsm/bps/stan

# ruby:
bundle add 'bps-kafka'
bundle add 'bps-nats'
bundle add 'bps-stan'

Backends: Go

Backends: Ruby

Publishing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}

Publishing: Ruby

require 'bps/kafka'

pub = BPS::Publisher.resolve('kafka://localhost:9092')
top = pub.topic('topic')

top.publish('foo')
top.publish('bar')

pub.close

To seed multiple brokers, use:

BPS::Publisher.resolve('kafka://10.0.0.1,10.0.0.2,10.0.0.3:9092')

If your brokers are on different ports, try:

BPS::Publisher.resolve('kafka://10.0.0.1%3A9092,10.0.0.2%3A9093,10.0.0.3%3A9094')

Subscribing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}

# Functions

GenClientID generates random client ID.
IgnoreSubscriptionErrors configures subscription to silently ignore errors.
NewInMemPublisher returns an initialised publisher.
NewInMemSubscriber returns new subscriber, that consumes messages from seeded data.
NewInMemSubTopic returns new seeded in-memory subscriber topic handle.
NewPublisher inits to a publisher via URL.
NewSubscriber inits to a subscriber via URL.
RegisterPublisher registers a new protocol with a scheme and a corresponding PublisherFactory.
RegisterSubscriber registers a new protocol with a scheme and a corresponding SubscriberFactory.
SafeHandler wraps a handler with a mutex to synchronize access.
StartAt configures subscription start position.
WithErrorHandler configures subscription error handler.

# Constants

PositionNewest tells to start consuming messages from the newest available (published AFTER subscribing).
PositionOldest tells to start consuming messages from the oldest available (published BEFORE subscribing).

# Structs

InMemPublisher is an in-memory publisher implementation which can be used for tests.
InMemPubTopic is an in-memory implementation of a Topic.
InMemSubscriber is a subscriber, that consumes messages from seeded data.
InMemSubTopic is a subscriber topic handle, that consumes messages from seeded data.
PubMessage represents a single message for publishing.
SubOptions holds subscription options.

# Interfaces

Handler defines a message handler.
Publisher defines the main publisher interface.
PubTopic is a publisher handle to a topic.
SubMessage defines a subscription message details.
Subscriber defines the main subscriber interface.
Subscription defines a subscription-manager interface.
SubTopic defines a subscriber topic handle.

# Type aliases

HandlerFunc is a func-based handler adapter.
PublisherFactory constructs a publisher from a URL.
RawSubMessage is an adapter for raw slice of bytes that behaves as a SubMessage.
StartPosition defines starting position to consume messages.
SubOption defines a single subscription option.
SubscriberFactory constructs a subscriber from a URL.