package
10.30.0+incompatible
Repository: https://github.com/status-im/status-go.git
Documentation: pkg.go.dev

# README

pubsub

A type-safe, generic publish-subscribe pattern implementation for Go.

Overview

The pubsub package provides a thread-safe, type-safe implementation of the publish-subscribe pattern using Go generics. It allows different parts of your application to communicate through typed events without direct coupling.

Key Features

  • Type Safety: Uses Go generics to ensure type safety at compile time
  • Thread Safe: All operations are protected by appropriate mutexes
  • Non-blocking: Publishers never block, events are dropped if subscribers can't keep up
  • Automatic Cleanup: Channels are automatically closed when unsubscribing
  • Multiple Event Types: Single publisher can handle multiple different event types
  • Buffer Control: Configurable buffer sizes for subscriber channels

API Reference

Core Functions

NewPublisher() *Publisher

Creates a new publisher instance that can handle multiple event types.

Publish[T any](p *Publisher, val T)

Publishes a value of type T to all subscribers of that type. This operation is non-blocking.

Subscribe[T any](p *Publisher, bufferSize uint) (chan T, UnsubFn)

Subscribes to events of type T and returns a channel to receive events and an unsubscribe function.

Publisher Methods

Close()

Closes the publisher and all its subscriptions, cleaning up resources.

Usage Examples

Basic Usage

package main

import (
    "fmt"
    "time"
    "github.com/status-im/status-go/pkg/pubsub"
)

func main() {
    // Create a publisher
    publisher := pubsub.NewPublisher()
    defer publisher.Close()

    // Define event types
    type UserEvent struct {
        UserID   string
        Action   string
        Timestamp time.Time
    }

    type SystemEvent struct {
        Level   string
        Message string
    }

    // Subscribe to UserEvent
    userCh, unsubUser := pubsub.Subscribe[UserEvent](publisher, 10)
    defer unsubUser()

    // Subscribe to SystemEvent
    systemCh, unsubSystem := pubsub.Subscribe[SystemEvent](publisher, 5)
    defer unsubSystem()

    // Start listening for events
    go func() {
        for event := range userCh {
            fmt.Printf("User event: %+v\n", event)
        }
    }()

    go func() {
        for event := range systemCh {
            fmt.Printf("System event: %+v\n", event)
        }
    }()

    // Publish events
    pubsub.Publish(publisher, UserEvent{
        UserID:    "user123",
        Action:    "login",
        Timestamp: time.Now(),
    })

    pubsub.Publish(publisher, SystemEvent{
        Level:   "info",
        Message: "Server started",
    })
}

Working with Different Types

// Integer events
intCh, unsubInt := pubsub.Subscribe[int](publisher, 100)
defer unsubInt()

// String events
stringCh, unsubString := pubsub.Subscribe[string](publisher, 50)
defer unsubString()

// Custom struct events
type CustomEvent struct {
    ID   int
    Data string
}

customCh, unsubCustom := pubsub.Subscribe[CustomEvent](publisher, 25)
defer unsubCustom()

// Publish different types
pubsub.Publish(publisher, 42)
pubsub.Publish(publisher, "hello world")
pubsub.Publish(publisher, CustomEvent{ID: 1, Data: "test"})

Multiple Subscribers

// Multiple subscribers for the same event type
ch1, unsub1 := pubsub.Subscribe[UserEvent](publisher, 10)
ch2, unsub2 := pubsub.Subscribe[UserEvent](publisher, 10)
ch3, unsub3 := pubsub.Subscribe[UserEvent](publisher, 10)

defer unsub1()
defer unsub2()
defer unsub3()

// All three subscribers will receive the same event
pubsub.Publish(publisher, UserEvent{UserID: "user1", Action: "login"})

Buffer Management

// Unbuffered channel (bufferSize = 0)
// Events will be dropped if subscriber can't keep up
ch, unsub := pubsub.Subscribe[UserEvent](publisher, 0)
defer unsub()

// Large buffer for high-throughput scenarios
highThroughputCh, unsubHigh := pubsub.Subscribe[UserEvent](publisher, 1000)
defer unsubHigh()

Best Practices

1. Always Clean Up

Always call the unsubscribe function returned by Subscribe to prevent memory leaks:

ch, unsub := pubsub.Subscribe[MyEvent](publisher, 10)
defer unsub() // Important!

2. Choose Appropriate Buffer Sizes

  • Use smaller buffers for low-frequency events or events that are processed quickly
  • Use larger buffers for high-frequency events or events that take longer to process
  • Use unbuffered channels (0) when you want to drop events if subscribers can't keep up
  • For events that require heavy processing, consider doing the work in a separate goroutine to release the channel quickly:
ch, unsub := pubsub.Subscribe[HeavyEvent](publisher, 10)
defer unsub()

for event := range ch {
    // Process the event in a separate goroutine to avoid blocking the channel
    go func(e HeavyEvent) {
        // Do heavy processing here
        processHeavyEvent(e)
    }(event)
}

3. Handle Channel Closure

Always check if channels are closed in your event processing loops:

for event := range ch {
    // Process event
}
// Channel is closed when loop exits

4. Publisher Lifecycle

Always close the publisher when you're done with it:

publisher := pubsub.NewPublisher()
defer publisher.Close()

Thread Safety

All operations in the pubsub package are thread-safe:

  • Multiple goroutines can publish events simultaneously
  • Multiple goroutines can subscribe/unsubscribe simultaneously
  • Event delivery is safe across concurrent operations

Error Handling

The pubsub package is designed to be robust:

  • Publishing to a closed publisher is safe (events are ignored)
  • Subscribing to a closed publisher is safe (returns closed channel)
  • Unsubscribing multiple times is safe (idempotent)
  • Events are dropped if subscriber channels are full (non-blocking behavior)

Examples in Tests

For more comprehensive examples, see the test file pubsub_test.go which demonstrates:

  • Working with different data types (int, string, struct, slice, map, channel, function, interface)
  • Multiple subscribers
  • Unsubscription behavior
  • Concurrent operations