# README
BPS
Multi-backend abstraction for message processing and pubsub queues for Go and Ruby.
Documentation
Check auto-generated documentation:
Install
# go:
go get -u github.com/bsm/bps
go get -u github.com/bsm/bps/kafka
go get -u github.com/bsm/bps/nats
go get -u github.com/bsm/bps/stan
# ruby:
bundle add 'bps-kafka'
bundle add 'bps-nats'
bundle add 'bps-stan'
Backends: Go
Backends: Ruby
Publishing: Go
package main
import (
"context"
"fmt"
"github.com/bsm/bps"
)
func main() {
ctx := context.Background()
pub := bps.NewInMemPublisher()
defer pub.Close()
topicA := pub.Topic("topic-a")
topicB := pub.Topic("topic-b")
_ = topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-1"),
})
_ = topicB.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
_ = topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))
}
Publishing: Ruby
require 'bps/kafka'
pub = BPS::Publisher.resolve('kafka://localhost:9092')
top = pub.topic('topic')
top.publish('foo')
top.publish('bar')
pub.close
To seed multiple brokers, use:
BPS::Publisher.resolve('kafka://10.0.0.1,10.0.0.2,10.0.0.3:9092')
If your brokers are on different ports, try:
BPS::Publisher.resolve('kafka://10.0.0.1%3A9092,10.0.0.2%3A9093,10.0.0.3%3A9094')
Subscribing: Go
package main
import (
"context"
"fmt"
"github.com/bsm/bps"
)
func main() {
ctx := context.Background()
pub := bps.NewInMemPublisher()
defer pub.Close()
topicA := pub.Topic("topic-a")
topicB := pub.Topic("topic-b")
_ = topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-1"),
})
_ = topicB.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
_ = topicA.Publish(ctx, &bps.PubMessage{
Data: []byte("message-2"),
})
fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))
}
# Functions
GenClientID generates random client ID.
IgnoreSubscriptionErrors configures subscription to silently ignore errors.
NewInMemPublisher returns an initialised publisher.
NewInMemSubscriber returns new subscriber, that consumes messages from seeded data.
NewInMemSubTopic returns new seeded in-memory subscriber topic handle.
NewPublisher inits to a publisher via URL.
NewSubscriber inits to a subscriber via URL.
RegisterPublisher registers a new protocol with a scheme and a corresponding PublisherFactory.
RegisterSubscriber registers a new protocol with a scheme and a corresponding SubscriberFactory.
SafeHandler wraps a handler with a mutex to synchronize access.
StartAt configures subscription start position.
WithErrorHandler configures subscription error handler.
# Constants
PositionNewest tells to start consuming messages from the newest available (published AFTER subscribing).
PositionOldest tells to start consuming messages from the oldest available (published BEFORE subscribing).
# Structs
InMemPublisher is an in-memory publisher implementation which can be used for tests.
InMemPubTopic is an in-memory implementation of a Topic.
InMemSubscriber is a subscriber, that consumes messages from seeded data.
InMemSubTopic is a subscriber topic handle, that consumes messages from seeded data.
PubMessage represents a single message for publishing.
SubOptions holds subscription options.
# Interfaces
Handler defines a message handler.
Publisher defines the main publisher interface.
PubTopic is a publisher handle to a topic.
SubMessage defines a subscription message details.
Subscriber defines the main subscriber interface.
Subscription defines a subscription-manager interface.
SubTopic defines a subscriber topic handle.
# Type aliases
HandlerFunc is a func-based handler adapter.
PublisherFactory constructs a publisher from a URL.
RawSubMessage is an adapter for raw slice of bytes that behaves as a SubMessage.
StartPosition defines starting position to consume messages.
SubOption defines a single subscription option.
SubscriberFactory constructs a subscriber from a URL.