Categorygithub.com/nikolayk812/pgx-outbox
modulepackage
0.0.2
Repository: https://github.com/nikolayk812/pgx-outbox.git
Documentation: pkg.go.dev

# README

CI Status Go Report Card Go Reference License Coverage Status Dependabot Status Go version Release

Project Logo

pgx-outbox

This is a simple Golang implementation for transactional outbox pattern to solve dual writes problem for PostgreSQL using jackc/pgx driver.

More advanced options are described in Revisiting the Outbox Pattern article by Gunnar Morling.

Motivation behind this library is to provide a generic extensible implementation to avoid boilerplate code in projects.

Note: this is not a general-purpose Postgres queue, even though internal implementation is based on a table with a queue-like structure.

Diagram

How to use

1. Add database migration to a project:

CREATE TABLE IF NOT EXISTS outbox_messages
(
    id           BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,

    broker       TEXT                                NOT NULL,
    topic        TEXT                                NOT NULL,
    metadata     JSONB,
    payload      JSONB                               NOT NULL,

    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
    published_at TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_outbox_messages_published_at_null ON outbox_messages (published_at) WHERE published_at IS NULL;

The outbox table name can be customized, but the table structure should remain exactly the same.

2. Add outbox.Writer to repository layer:

type repo struct {
	pool *pgxpool.Pool
	
	writer outbox.Writer
	messageMapper types.ToMessageFunc[User]
}

To map your a domain model, i.e. User to the outbox message, implement the types.ToMessageFunc function is service layer and pass it to the repository either in New function or as a repository method parameter.

Start using the writer.Write method in the repository methods which should produce outbox messages.

func (r *repo) CreateUser(ctx context.Context, user User) (u User, txErr error) {
	// create a transaction, commit/rollback in defer() depending on txErr

	// INSERT INTO users table under-the-hood
	user, err = r.createUser(ctx, tx, user)
	if err != nil {
		return u, fmt.Errorf("createUser: %w", err)
	}
	
	message, err := r.messageMapper(user)
	if err != nil {
		return u, fmt.Errorf("messageMapper: %w", err)
	}

	// INSERT INTO outbox_messages table under-the-hood
	if _, err := r.writer.Write(ctx, tx, message); err != nil {
		return u, fmt.Errorf("writer.Write: %w", err)
	}

	return user, nil
}

See outbox.Writer example in repo.go of the 01_sns directory.

3. Add outbox.Forwarder to a cronjob:

forwarder, err := outbox.NewForwarderFromPool("outbox_messages", pool, publisher)

stats, err := forwarder.Forward(ctx, 10)
slog.Info("forwarded", "stats", stats)

where pool is a pgxpool.Pool and publisher is an implementation of outbox.Publisher.

This library provides reference publisher implementation for AWS SNS publisher in the sns module.

publisher, err := outboxSns.NewPublisher(awsSnsCli, messageTransformer{})

where messageTransformer is an implementation of outboxSns.MessageTransformer interface, for example:

func (mt messageTransformer) Transform(message types.Message) (*awsSns.PublishInput, error) {
	topicARN := fmt.Sprintf("arn:aws:sns:%s:%s:%s", tc.region, tc.accountID, message.Topic)

	return &awsSns.PublishInput{
		Message:  aws.String(string(message.Payload)),
		TopicArn: &topicARN,
	}, nil
}

See outbox.Forwarder example in main.go of the 01_sns directory.

Examples

1. SNS

Source code and instructions for the example are located in the examples/01_sns directory.

Example 1 diagram

Stargazers over time

Stargazers over time

Alternatives

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

Forwarder reads unpublished messages from the outbox table, publishes them and then marks them as published.
go:generate mockery --name=Publisher --output=internal/mocks --outpkg=mocks --filename=publisher_mock.go.
Reader reads outbox unpublished messages from a single outbox table.
Tx is a transaction interface to support both and pgx.Tx and *sql.Tx.
Writer writes outbox messages to a single outbox table.

# Type aliases

No description provided by the author
No description provided by the author
No description provided by the author