Categorygithub.com/botchris/go-pubsub
modulepackage
0.1.4
Repository: https://github.com/botchris/go-pubsub.git
Documentation: pkg.go.dev

# README

Overview

The pubsub package provides a simple helper library for doing publish-subscribe asynchronous tasks in Golang, usually in a web or microservice. pubsub allows you to write publishers and subscribers, fully statically typed, and swap out Broker implementations (e.g. Memory, AWS SQS, etc) as required.

broker overview

What is a PubSub System?

A PubSub system is a messaging system that has, as its name implies, two components: Publisher of messages and subscriber to messages. In contrast to synchronous communication, the publisher doesn't have to wait for a message to be received, as well as the receiver doesn't have to be online to retrieve messages sent earlier. As such, a PubSub system acts like a buffer for asynchronous messaging.

Features

  • Multi-topic support, the same subscriber may listen for messages on multiple topics at the same time.
  • Hybrid message filtering, subscriber are free to decide whether they want to receive messages for a concrete type or not (content-based), or just receive everything that is pushed to a given topic/s (topic-based).
  • Pluggable providers.

Providers

Providers are concrete implementations of the Broker interface. Examples of providers could be messaging services such as Google's PubSub, Amazon's SNS or Nats.io. Broker interface acts as a generalization for suh services.

The pubsub package comes with a built-in memory provider: a simple Broker which allows communicating local process of your system by interchanging messages, which can be used as a simple "Message Bus" replacement.

Interceptors

The pubsub package provides a simple API to implement and install interceptors. Interceptor intercepts each message being published or being delivered to Subscribers. Users can use interceptors to do logging, metrics collection, and many other functionalities that can be shared across PubSub Providers.

To use interception capabilities you must simply instantiate a new Interceptor as follows:

provider := interceptor.New(myProvider, ...opts)

This "Interceptors" acts as a wrapper for the given provider by adding interception capabilities. Moreover, the interceptor itself can be used interchangeably as a Broker; it implements the Broker interface.

Publishing Interceptors

Allows to intercept each message before its handled by underlying Provider.

provider := interceptor.New(myProvider,
    interceptor.WithPublishInterceptor(myPublishingInterceptor),
)

Subscribing Interceptors

Provides a hook to intercept each message before it gets delivered to subscribers. In this case, interception occurs for each subscriber receiving a message. For instance, if two subscribers S1 and S2 receives the same message M, then interception logic will be triggered twice for the same message M.

provider := interceptor.New(myProvider,
    interceptor.WithSubscribeInterceptor(mySubscribingInterceptor),
)

Creating Interceptors

You can create your own publishing/subscribing interceptor by defining functions that follows the signatures:

type PublishInterceptor func(ctx context.Context, next PublishHandler) PublishHandler
type SubscribeInterceptor func(ctx context.Context, next SubscribeMessageHandler) SubscribeMessageHandler

See the printer interceptor for a more detailed example.

Example

package main

import (
  "context"
  "fmt"
  "time"

  "github.com/botchris/go-pubsub"
  "github.com/botchris/go-pubsub/provider/memory"
)

var Topic pubsub.Topic = "my-topic"

type MyMessage struct {
  Body string
}

func main() {
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  broker := memory.NewBroker(memory.NopSubscriberErrorHandler)

  s1 := pubsub.NewSubscriber(func(ctx context.Context, m MyMessage) error {
    fmt.Printf("[s1] -> %+v\n", m)

    return nil
  })

  s2 := pubsub.NewSubscriber(func(ctx context.Context, m *MyMessage) error {
    fmt.Printf("[s2] -> %+v\n", m)

    return nil
  })

  s3 := pubsub.NewSubscriber(func(ctx context.Context, m string) error {
    fmt.Printf("[s3] -> %+v\n", m)

    return nil
  })

  if err := broker.Subscribe(ctx, Topic, s1); err != nil {
    panic(err)
  }

  if err := broker.Subscribe(ctx, Topic, s2); err != nil {
    panic(err)
  }

  if err := broker.Subscribe(ctx, Topic, s3); err != nil {
    panic(err)
  }

  if err := broker.Publish(ctx, Topic, MyMessage{Body: "value(hello world)"}); err != nil {
    panic(err)
  }
  // Output:
  //  [s1] -> {Body:value(hello world)}

  if err := broker.Publish(ctx, Topic, &MyMessage{Body: "pointer(hello world)"}); err != nil {
    panic(err)
  }
  // Output:
  //  [s2] -> &{Body:pointer(hello world)}

  if err := broker.Publish(ctx, Topic, "string(hello world)"); err != nil {
    panic(err)
  }
  // Output:
  //  [s3] -> string(hello world)
}

# Packages

No description provided by the author
No description provided by the author
No description provided by the author

# Functions

NewSubscriber builds a new subscriber instance for the given function.

# Variables

List of known errors for subscriber signature validation process.
List of known errors for subscriber signature validation process.
List of known errors for subscriber signature validation process.
List of known errors for subscriber signature validation process.
List of known errors for subscriber signature validation process.
List of known errors for subscriber signature validation process.

# Structs

Subscriber represents a handling function capable of receiving messages.

# Interfaces

Broker pub-sub broker definition.

# Type aliases

Topic represents/identifies a particular Topic.