Categorygithub.com/ankorstore/yokai-contrib/fxgcppubsub
modulepackage
1.4.1
Repository: https://github.com/ankorstore/yokai-contrib.git
Documentation: pkg.go.dev

# README

Yokai GCP Pub/Sub Module

ci go report codecov Deps PkgGoDev

Yokai module for GCP Pub/Sub.

Overview

This module provides to your Yokai application the possibility to publish and/or subscribe on a GCP Pub/Sub instance.

It also provides the support of Avro and Protobuf schemas.

Installation

First install the module:

go get github.com/ankorstore/yokai-contrib/fxgcppubsub

Then activate it in your application bootstrapper:

// internal/bootstrap.go
package internal

import (
	"github.com/ankorstore/yokai/fxcore"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
)

var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
	// load fxgcppubsub module
	fxgcppubsub.FxGcpPubSubModule,
	// ...
)

Configuration

Configuration reference:

# ./configs/config.yaml
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}  # GCP project id
    healthcheck:
      topics:                # list of topics to check for the topics probe
        - some-topic         # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
      subscriptions:         # list of subscriptions to check for the subscriptions probe
        - some-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription

Publish

This module provides a high level Publisher that you can inject anywhere to publish messages on a topic.

If the topic is associated to an avro or protobuf schema, the publisher will automatically handle the message encoding.

This module also provides a pubsub.Client, that you can use for low level publishing.

Raw message

To publish a raw message (without associated schema) on a topic:

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", "some message")

Avro message

The publisher can accept any struct, and will automatically handle the avro encoding based on the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To publish a message on a topic associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "some string",
    FloatField:   12.34,
    BooleanField: true,
})

Protobuf message

The publisher can accept any proto.Message, and will automatically handle the protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
    string string_field = 1;
    float float_field = 2;
    bool boolean_field = 3;
}

To publish a message on a topic associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "test proto",
    FloatField:   56.78,
    BooleanField: false,
})

Subscribe

This module provides a high level Subscriber that you can inject anywhere to subscribe messages from a subscription.

If the subscription's topic is associated to an avro or protobuf schema, the subscriber will offer a message from which you can handle the decoding.

This module also provides a pubsub.Client, that you can use for low level subscribing.

Raw message

To subscribe on a subscription receiving raw messages (without associated schema):

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    fmt.Printf("%s", m.Data())
    
    m.Ack()
})

Avro message

The subscriber message can be decoded into any struct with the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To subscribe from a subscription associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Protobuf message

The subscriber message can be decoded into any proto.Message, for protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

To subscribe from a subscription associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Health Check

This module provides ready to use health check probes, to be used by the Health Check module:

Considering the following configuration:

# ./configs/config.yaml
app:
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}   # GCP project id
    healthcheck:
      topics:                 # list of topics to check for the topics probe
        - some-topic          # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
        - other-topic         # refers to projects/${GCP_PROJECT_ID}/topics/other-topic
      subscriptions:          # list of subscriptions to check for the subscriptions probe
        - some-subscription   # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
        - other-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/other-subscription

To activate those probes, you just need to register them:

// internal/services.go
package internal

import (
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
	"go.uber.org/fx"
)

func ProvideServices() fx.Option {
	return fx.Options(
		// register the GcpPubSubTopicsProbe for some-topic and other-topic
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubTopicsProbe),
		// register the GcpPubSubSubscriptionsProbe for some-subscription and other-subscription
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubSubscriptionsProbe),
		// ...
	)
}

Notes:

  • if your application is interested only in publishing, activate the GcpPubSubTopicsProbe only
  • if it is interested only in subscribing, activate the GcpPubSubSubscriptionsProbe only

Testing

In test mode:

are all configured to work against a pstest.Server, avoiding the need to spin up any Pub/Sub real (or emulator) instance, for better tests portability.

This means that you can create topics, subscriptions and schemas locally only for your tests.

For example:

// internal/example/example_test.go
package example_test

import (
	"context"
	"testing"
	"time"
	
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/message"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/ack"
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/stretchr/testify/assert"
	"github.com/foo/bar/internal"
	"go.uber.org/fx"
	"go.uber.org/fx/fxtest"
)

func TestPubSub(t *testing.T) {
	t.Setenv("APP_ENV", "test")
	t.Setenv("APP_CONFIG_PATH", "testdata/config")
	t.Setenv("GCP_PROJECT_ID", "test-project")

	var publisher fxgcppubsub.Publisher
	var subscriber fxgcppubsub.Subscriber
	var supervisor ack.AckSupervisor

	ctx := context.Background()

	// test app
	internal.RunTest(
		t,
		// prepare test topic and subscription
		fxgcppubsub.PrepareTopicAndSubscription(fxgcppubsub.PrepareTopicAndSubscriptionParams{
			TopicID:        "test-topic",
			SubscriptionID: "test-subscription",
		}),
		fx.Populate(&publisher, &subscriber, &supervisor),
	)
	
	// publish to test-topic
	_, err := publisher.Publish(ctx, "test-topic", "test data")
	assert.NoError(t, err)

	// subscribe from test-subscription
	waiter := supervisor.StartAckWaiter("test-subscription")
	
	go subscriber.Subscribe(ctx, "test-subscription", func(ctx context.Context, m *message.Message) {
		assert.Equal(t, []byte("test data"), m.Data())

		m.Ack()
	})

	// wait for subscriber message ack
	_, err = waiter.WaitMaxDuration(ctx, time.Second)
	assert.NoError(t, err)
}

Notes:

  • you can prepare the test topics, subscriptions and schemas using the provided helpers
  • you can find tests involving avro and protobuf schemas in the module test examples

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

AsPubSubTestServerReactor registers a [Reactor] into Fx.
AsPubSubTestServerReactors registers a list of [Reactor] into Fx.
NewDefaultPublisher returns a new DefaultPublisher instance.
NewDefaultSubscriber returns a new DefaultSubscriber instance.
NewFxGcpPubSubClient returns a [pubsub.Client].
NewFxGcpPubSubPublisher returns a [Publisher].
NewFxGcpPubSubSchemaClient returns a [pubsub.SchemaClient].
NewFxGcpPubSubSubscriber returns a [Subscriber].
NewFxGcpPubSubTestServer returns a [pstest.Server].
PrepareSchema prepares a pub/sub schema.
PrepareTopic prepares a pub/sub topic.
PrepareTopicAndSubscription prepares a pub/sub topic and an associated subscription.
PrepareTopicAndSubscriptionWithSchema prepares a pub/sub topic and an associated subscription with a schema.
PrepareTopicWithSchema prepares a pub/sub topic with a schema.

# Constants

ModuleName is the module name.

# Variables

FxGcpPubSubModule is the [Fx] GCP pub/sub module.

# Structs

DefaultPublisher is the default Publisher implementation.
DefaultSubscriber is the default Subscriber implementation.
FxGcpPubSubClientParam allows injection of the required dependencies in [NewFxGcpPubSubClient].
FxGcpPubSubPublisherParam allows injection of the required dependencies in [NewFxGcpPubSubPublisher].
FxGcpPubSubSchemaClientParam allows injection of the required dependencies in [NewFxGcpPubSubSchemaClient].
FxGcpPubSubSubscriberParam allows injection of the required dependencies in [NewFxGcpPubSubPublisher].
No description provided by the author
PrepareSchemaParams represents the parameters used in PrepareSchema.
PrepareTopicAndSubscriptionParams represents the parameters used in PrepareTopicAndSubscription.
PrepareTopicAndSubscriptionWithSchemaParams represents the parameters used in PrepareTopicAndSubscriptionWithSchema.
PrepareTopicParams represents the parameters used in PrepareTopic.
PrepareTopicWithSchemaParams represents the parameters used in PrepareTopicWithSchema.

# Interfaces

Publisher is the interface for high level publishers.
Reactor is the interface for pub/sub test server reactors.
Subscriber is the interface for high level subscribers.