Categorygithub.com/stevecallear/pram
modulepackage
0.2.1
Repository: https://github.com/stevecallear/pram.git
Documentation: pkg.go.dev

# README

pram

Build Status codecov Go Report Card

pram is a lightweight messaging framework using AWS SNS/SQS and Google Protobuf with convention based infrastructure creation.

Publisher

Publisher publishes messages to the appropriate topic. The topic ARN is resolved using the PublisherOptions.TopicARNFn function. A Registry instance can be used to resolve/create infrastucture by convention.

Published messages are wrapped with the prampb.Message type, then encoded to a base64 representation of the marshalled byte slice prior to being sent to SNS.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
p := pram.NewPublisher(snsClient, pram.WithTopicRegistry(r))

err := p.Publish(context.Background(), &testpb.Message{Value: "value"})
if err != nil {
    log.Fatalln(err)
}

Metadata

Message metadata can be modified at the point of publish, for example to add a correlation ID.

err := p.Publish(context.Background(), m, pram.WithCorrelationID(correlationID))

Subscriber

Subscriber receives messages published to the appropriate queue. The queue URL is resolved using the SubscriberOptions.QueueURLFn function. A Registry instance can be used to resolve/create infrastructure by convention.

Handler

Each message subscription requires an implementation of pram.Handler to generate empty messages of the appropriate type and handle received messages. A one-to-one mapping between message types and handlers is assumed, with the message instance from Message guaranteed to be the input to Handle.

type handler struct {}

func (h *handler) Message() proto.Message {
    return new(testpb.Message)
}

func (h *handler) Handle(ctx context.Context, m proto.Message, md pram.Metadata) error {
	tm := m.(*testpb.Message)
	// handle the message
	return nil
}

Subscribe

A message subscription can be created using Subscribe. Each received message will spawn a new goroutine to execute the supplied handler.

By default message receive and handling errors are discarded. This behaviour can be changed using pram.WithErrorHandler.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(r))

// Subscribe will block until the supplied context is cancelled
err := s.Subscribe(context.Background(), new(handler))

While each call to Subscribe is blocking, a single subscriber can handle multiple message types by using goroutines.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(r))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wg := new(sync.WaitGroup)
for _, h := range []pram.Handler{new(handlerA), new(handlerB)} {
    wg.Add(1)

    go func(h pram.Handler) {
        defer wg.Done()

        err := s.Subscribe(ctx, h)
        if err != nil {
            log.Println(err)
        }
    }(h)
}

wg.Wait()

Registry

Registry is responsible for creating SNS/SQS infrastructure by convention. The adopted naming convention defines how messages will be routed.

By default each published/subscribed message will result in a single SNS topic and associated SQS queue. This effectively results in competing consumers for all message types, which will likely be inappropriate for all but the simplest implementations.

To support typical routing patterns, the registry should be configured to generate queues for each subscribing service. This convention can be applied using pram.WithPrefixNaming.

Example

Service 'a' publishes a message to the dev-package-Message SNS topic. All instances of service 'a' will publish to the same topic.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "a"))
p := pram.NewPublisher(snsc, pram.WithTopicRegistry(r))

p.Publish(ctx, new(package.Message))

Service 'b' subscribes to the dev-package-Message topic using the dev-b-package-Message SQS queue. All instances of service 'b' will act as competing consumers.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "b"))
s := pram.NewSubscriber(snsc, sqsc, pram.WithQueueRegistry(r))

s.Subscribte(ctx, new(handler))

Service 'c' subscribes to the same topic, but uses the dev-c-package-Message queue. All instances of service 'c' will act as competing consumers, but are served by a separate queue from that used by service 'b'.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "c"))
s := pram.NewSubscriber(snsc, sqsc, pram.WithQueueRegistry(r))

s.Subscribte(ctx, new(handler))

Logging

Info level logs, such as infrastructure creation and message publish/receive can be output by providing a pram.Logger implementation to pram.SetLogger. This can be used to understand the underlying AWS SDK calls being made. For example, the following configuration uses a standard library logger.

l := log.New(os.Stdout, "", log.Ldate|log.Ltime)
pram.SetLogger(l)

In the case of message publishing, registry and handler errors are returned immediately to the calling code, so can be handled in the usual manner. For message subscriptions, however, the Subscribe function will only return an error if the required queue cannot be resolved. Handler and AWS SDK errors will not be returned. To log these errors, an handler should be supplied when creating the subscriber.

s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(reg), pram.WithErrorHandler(func(err error) {
    pram.Logf("subscriber: %v", err)
}))

# Packages

No description provided by the author
Package mocks is a generated GoMock package.
No description provided by the author

# Functions

Log logs the input to the configured logger.
Logf logs the input to the configured logger.
Marshal marshals the specified message.
MessageName returns the message name with hyphen separation, e.g.
NewPublisher returns a new publisher.
NewRegistry returns a new registry.
NewSubscriber returns a new subscriber.
SetLogger sets the logger.
Unmarshal unmarshals the specified message.
WithCorrelationID sets the message correlation id.
WithErrorHandler configures the subscriber to use the specified error handler func.
WithPrefixNaming configures the registry to use prefix naming to support complex message routing It applies the following format, assuming a protobuf type name of package.Message: topic: stage-package-Message queue: stage-service-package-Message error: stage-service-package-Message_error.
WithQueueRegistry configures the subscriber to use the specified registry to resolve queues, creating them if they do not exist.
WithStore configures the registry to use the specified store.
WithTopicRegistry configures the subscriber to use the specified registry to resolve topics, creating them if they do not exist.

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author