Categorygithub.com/ankorstore/yokai-contrib/fxgcppubsub
package
1.4.1
Repository: https://github.com/ankorstore/yokai-contrib.git
Documentation: pkg.go.dev

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# README

Yokai GCP Pub/Sub Module

ci go report codecov Deps PkgGoDev

Yokai module for GCP Pub/Sub.

Overview

This module provides to your Yokai application the possibility to publish and/or subscribe on a GCP Pub/Sub instance.

It also provides the support of Avro and Protobuf schemas.

Installation

First install the module:

go get github.com/ankorstore/yokai-contrib/fxgcppubsub

Then activate it in your application bootstrapper:

// internal/bootstrap.go
package internal

import (
	"github.com/ankorstore/yokai/fxcore"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
)

var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
	// load fxgcppubsub module
	fxgcppubsub.FxGcpPubSubModule,
	// ...
)

Configuration

Configuration reference:

# ./configs/config.yaml
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}  # GCP project id
    healthcheck:
      topics:                # list of topics to check for the topics probe
        - some-topic         # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
      subscriptions:         # list of subscriptions to check for the subscriptions probe
        - some-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription

Publish

This module provides a high level Publisher that you can inject anywhere to publish messages on a topic.

If the topic is associated to an avro or protobuf schema, the publisher will automatically handle the message encoding.

This module also provides a pubsub.Client, that you can use for low level publishing.

Raw message

To publish a raw message (without associated schema) on a topic:

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", "some message")

Avro message

The publisher can accept any struct, and will automatically handle the avro encoding based on the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To publish a message on a topic associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "some string",
    FloatField:   12.34,
    BooleanField: true,
})

Protobuf message

The publisher can accept any proto.Message, and will automatically handle the protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
    string string_field = 1;
    float float_field = 2;
    bool boolean_field = 3;
}

To publish a message on a topic associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// publish on projects/${GCP_PROJECT_ID}/topics/some-topic
res, err := publisher.Publish(context.Backgound(), "some-topic", &SimpleRecord{
    StringField:  "test proto",
    FloatField:   56.78,
    BooleanField: false,
})

Subscribe

This module provides a high level Subscriber that you can inject anywhere to subscribe messages from a subscription.

If the subscription's topic is associated to an avro or protobuf schema, the subscriber will offer a message from which you can handle the decoding.

This module also provides a pubsub.Client, that you can use for low level subscribing.

Raw message

To subscribe on a subscription receiving raw messages (without associated schema):

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    fmt.Printf("%s", m.Data())
    
    m.Ack()
})

Avro message

The subscriber message can be decoded into any struct with the following tags:

Considering this avro schema:

{
  "namespace": "Simple",
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "StringField",
      "type": "string"
    },
    {
      "name": "FloatField",
      "type": "float"
    },
    {
      "name": "BooleanField",
      "type": "boolean"
    }
  ]
}

To subscribe from a subscription associated to this avro schema:

// struct with tags, representing the message
type SimpleRecord struct {
    StringField  string  `avro:"StringField" json:"StringField"`
    FloatField   float32 `avro:"FloatField" json:"FloatField"`
    BooleanField bool    `avro:"BooleanField" json:"BooleanField"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Protobuf message

The subscriber message can be decoded into any proto.Message, for protobuf binary or json encoding.

Considering this protobuf schema:

syntax = "proto3";

package simple;

option go_package = "github.com/ankorstore/yokai-contrib/fxgcppubsub/testdata/proto";

message SimpleRecord {
  string string_field = 1;
  float float_field = 2;
  bool boolean_field = 3;
}

To subscribe from a subscription associated to this protobuf schema:

// generated protobuf stub proto.Message, representing the message
type SimpleRecord struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    
    StringField  string  `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
    FloatField   float32 `protobuf:"fixed32,2,opt,name=float_field,json=floatField,proto3" json:"float_field,omitempty"`
    BooleanField bool    `protobuf:"varint,3,opt,name=boolean_field,json=booleanField,proto3" json:"boolean_field,omitempty"`
}

// subscribe from projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
err := subscriber.Subscribe(ctx, "some-subscription", func(ctx context.Context, m *message.Message) {
    var rec SimpleRecord
    
    err = m.Decode(&rec)
    if err != nil {
        m.Nack()
    }
    
    fmt.Printf("%v", rec)
    
    m.Ack()
})

Health Check

This module provides ready to use health check probes, to be used by the Health Check module:

Considering the following configuration:

# ./configs/config.yaml
app:
modules:
  gcppubsub:
    project:
      id: ${GCP_PROJECT_ID}   # GCP project id
    healthcheck:
      topics:                 # list of topics to check for the topics probe
        - some-topic          # refers to projects/${GCP_PROJECT_ID}/topics/some-topic
        - other-topic         # refers to projects/${GCP_PROJECT_ID}/topics/other-topic
      subscriptions:          # list of subscriptions to check for the subscriptions probe
        - some-subscription   # refers to projects/${GCP_PROJECT_ID}/subscriptions/some-subscription
        - other-subscription  # refers to projects/${GCP_PROJECT_ID}/subscriptions/other-subscription

To activate those probes, you just need to register them:

// internal/services.go
package internal

import (
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
	"go.uber.org/fx"
)

func ProvideServices() fx.Option {
	return fx.Options(
		// register the GcpPubSubTopicsProbe for some-topic and other-topic
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubTopicsProbe),
		// register the GcpPubSubSubscriptionsProbe for some-subscription and other-subscription
		fxhealthcheck.AsCheckerProbe(healthcheck.NewGcpPubSubSubscriptionsProbe),
		// ...
	)
}

Notes:

  • if your application is interested only in publishing, activate the GcpPubSubTopicsProbe only
  • if it is interested only in subscribing, activate the GcpPubSubSubscriptionsProbe only

Testing

In test mode:

are all configured to work against a pstest.Server, avoiding the need to spin up any Pub/Sub real (or emulator) instance, for better tests portability.

This means that you can create topics, subscriptions and schemas locally only for your tests.

For example:

// internal/example/example_test.go
package example_test

import (
	"context"
	"testing"
	"time"
	
	"github.com/ankorstore/yokai-contrib/fxgcppubsub"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/message"
	"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/ack"
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/stretchr/testify/assert"
	"github.com/foo/bar/internal"
	"go.uber.org/fx"
	"go.uber.org/fx/fxtest"
)

func TestPubSub(t *testing.T) {
	t.Setenv("APP_ENV", "test")
	t.Setenv("APP_CONFIG_PATH", "testdata/config")
	t.Setenv("GCP_PROJECT_ID", "test-project")

	var publisher fxgcppubsub.Publisher
	var subscriber fxgcppubsub.Subscriber
	var supervisor ack.AckSupervisor

	ctx := context.Background()

	// test app
	internal.RunTest(
		t,
		// prepare test topic and subscription
		fxgcppubsub.PrepareTopicAndSubscription(fxgcppubsub.PrepareTopicAndSubscriptionParams{
			TopicID:        "test-topic",
			SubscriptionID: "test-subscription",
		}),
		fx.Populate(&publisher, &subscriber, &supervisor),
	)
	
	// publish to test-topic
	_, err := publisher.Publish(ctx, "test-topic", "test data")
	assert.NoError(t, err)

	// subscribe from test-subscription
	waiter := supervisor.StartAckWaiter("test-subscription")
	
	go subscriber.Subscribe(ctx, "test-subscription", func(ctx context.Context, m *message.Message) {
		assert.Equal(t, []byte("test data"), m.Data())

		m.Ack()
	})

	// wait for subscriber message ack
	_, err = waiter.WaitMaxDuration(ctx, time.Second)
	assert.NoError(t, err)
}

Notes:

  • you can prepare the test topics, subscriptions and schemas using the provided helpers
  • you can find tests involving avro and protobuf schemas in the module test examples