Categorygithub.com/vvatanabe/dynamomq
modulepackage
0.11.0
Repository: https://github.com/vvatanabe/dynamomq.git
Documentation: pkg.go.dev

# README

Build Go Report Card Quality Gate Status Reliability Rating Security Rating Maintainability Rating Vulnerabilities Bugs Code Smells Duplicated Lines (%) Coverage

Implementing message queueing with Amazon DynamoDB in Go.

Table of Contents

Current Status

This project is actively under development, but it is currently in version 0. Please be aware that the public API and exported methods may undergo changes.

Motivation

DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It’s a serverless and fully managed service that you can use for mobile, web, gaming, ad tech, IoT, and other applications that need low-latency data access at a large scale.

There are many queuing implementations that offer persistence, single-message processing, and distributed computing. Some popular queuing solutions are Amazon SQS, Amazon MQ, Apache ActiveMQ, RabbitMQ, and Kafka. Those services handle various queuing features and functions with several different characteristics, such as methods of implementation, scaling, and performance.

However, most of those queuing systems cannot easily change the order of the items after they arrive in the queue. Discussed implementation with DynamoDB can change the order in the queue or cancel items before processing.

Quoted from AWS official blog: Implementing Priority Queueing with Amazon DynamoDB

Features

  • Redelivery: Redeliver messages that have not completed successfully for a specified number of times.
  • Concurrent Execution: Process concurrently using multiple goroutines.
  • Dead Letter Queue: Move messages that exceed the maximum number of redeliveries to the dead letter queue.
  • Graceful Shutdown: Complete processing of messages before shutting down the consumer process.
  • FIFO (First In, First Out): Retrieve messages from the message queue on a first-in, first-out basis.
  • Consumer Process Scaling: Scale out by running multiple consumer processes without duplicating message retrieval from the same message queue.
  • Visibility Timeout: DynamoMQ sets a visibility timeout, a period of time during which DynamoMQ prevents all consumers from receiving and processing the message.
  • Delay queues: Delay queues allow you to delay the delivery of new messages to consumers for a set number of seconds.
  • Deduplication: Deduplication messages within the message queue.
  • Randomized Exponential Backoff: Prevent overlapping redelivery timing.
  • Batch Message Processing: Send and delete multiple messages in bulk to/from the message queue.
  • Message Compression

Installation

Requires Go version 1.21 or greater.

DynamoMQ CLI

This package can be installed as CLI with the go install command:

$ go install github.com/vvatanabe/dynamomq/cmd/dynamomq@latest

DynamoMQ Library

This package can be installed as library with the go get command:

$ go get -u github.com/vvatanabe/dynamomq@latest

Setup DynamoMQ

Required IAM Policy

Please refer to dynamomq-iam-policy.json or dynamomq-iam-policy.tf

Create Table with AWS CLI

aws dynamodb create-table --cli-input-json file://dynamomq-table.json 

Please refer to dynamomq-table.json.

Create Table with Terraform

Please refer to dynamomq-table.tf.

Authentication and access credentials

DynamoMQ's CLI and library configure AWS Config with credentials obtained from external configuration sources. This setup allows for flexible and secure management of access credentials. The following are the default sources for configuration:

Environment Variables

  • AWS_REGION - Specifies the AWS region.
  • AWS_PROFILE - Identifies the AWS profile to be used.
  • AWS_ACCESS_KEY_ID - Your AWS access key.
  • AWS_SECRET_ACCESS_KEY - Your AWS secret key.
  • AWS_SESSION_TOKEN - Session token for temporary credentials.

Shared Configuration and Credentials Files

These files provide a common location for storing AWS credentials and configuration settings, enabling consistent credential management across different AWS tools and applications.

Usage for DynamoMQ CLI

The dynamomq command-line interface provides a range of commands to interact with your DynamoDB-based message queue. Below are the available commands and global flags that can be used with dynamomq.

Available Commands

  • completion: Generate the autocompletion script for the specified shell to ease command usage.
  • delete: Delete a message from the queue using its ID.
  • dlq: Retrieve the statistics for the Dead Letter Queue (DLQ), providing insights into failed message processing.
  • enqueue-test: Send test messages to the DynamoDB table with IDs A-101, A-202, A-303, and A-404; existing messages with these IDs will be overwritten.
  • fail: Simulate the failure of message processing, which will return the message to the queue for reprocessing.
  • get: Fetch a specific message from the DynamoDB table using the application domain ID.
  • help: Display help information about any command.
  • invalid: Move a message from the standard queue to the DLQ for manual review and correction.
  • ls: List all message IDs in the queue, limited to a maximum of 10 elements.
  • purge: Remove all messages from the DynamoMQ table, effectively clearing the queue.
  • qstat: Retrieve statistics for the queue, offering an overview of its current state.
  • receive: Receive a message from the queue; this operation will replace the current message ID with the retrieved one.
  • redrive: Move a message from the DLQ back to the standard queue for reprocessing.
  • reset: Reset the system information of a message, typically used in message recovery scenarios.

Global Flags

  • --endpoint-url: Override the default URL for commands with a specified endpoint URL.
  • -h, --help: Display help information for dynamomq.
  • --queueing-index-name: Specify the name of the queueing index to use (default is "dynamo-mq-index-queue_type-sent_at").
  • --table-name: Define the name of the DynamoDB table to contain the items (default is "dynamo-mq-table").

To get more detailed information about a specific command, use dynamomq [command] --help.

Example Usage

Here are a few examples of how to use the dynamomq commands:

# Generate autocompletion script for bash
dynamomq completion bash

# Delete a message with ID 'A-123'
dynamomq delete --id A-123

# Retrieve DLQ statistics
dynamomq dlq

# Enqueue test messages
dynamomq enqueue-test

# Get a message by ID
dynamomq get --id A-123

# List the first 10 message IDs in the queue
dynamomq ls

# Receive a message from the queue
dynamomq receive

# Reset system information of a message with ID
dynamomq reset --id A-123

Interactive Mode

The DynamoMQ CLI supports an Interactive Mode for an enhanced user experience. To enter the Interactive Mode, simply run the dynamomq command without specifying any subcommands.

Interactive Mode Commands

Once in Interactive Mode, you will have access to a suite of commands to manage and inspect your message queue:

  • qstat or qstats: Retrieves the queue statistics.
  • dlq: Retrieves the Dead Letter Queue (DLQ) statistics.
  • enqueue-test or et: Sends test messages to the DynamoDB table with IDs: A-101, A-202, A-303, and A-404; if a message with the same ID already exists, it will be overwritten.
  • purge: Removes all messages from the DynamoMQ table.
  • ls: Lists all message IDs, displaying a maximum of 10 elements.
  • receive: Receives a message from the queue and replaces the current ID with the peeked one.
  • id <id>: Switches the Interactive Mode to app mode, allowing you to perform various operations on a message identified by the provided app domain ID:
    • sys: Displays the system info data in a JSON format.
    • data: Prints the data as JSON for the current message record.
    • info: Prints all information regarding the Message record, including system_info and data in JSON format.
    • reset: Resets the system info of the message.
    • redrive: Drives a message from the DLQ back to the STANDARD queue.
    • delete: Deletes a message by its ID.
    • fail: Simulates the failed processing of a message by putting it back into the queue; the message will need to be received again.
    • invalid: Moves a message from the standard queue to the DLQ for manual fixing.

Usage for DynamoMQ Library

DynamoMQ Client

To begin using DynamoMQ, first import the necessary packages from the AWS SDK for Go v2 and the DynamoMQ library. These imports are required to interact with AWS services and to utilize the DynamoMQ functionalities.

import (
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/vvatanabe/dynamomq"
)

The following code block initializes the DynamoMQ client. It loads the AWS configuration and creates a new DynamoMQ client with that configuration. Replace 'ExampleData' with your own data structure as needed.

ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
  panic("failed to load aws config")
}
client, err := dynamomq.NewFromConfig[ExampleData](cfg)
if err != nil {
  panic("AWS session could not be established!")
}

Define the data structure that will be used with DynamoMQ. Here, 'ExampleData' is a struct that will be used to represent the data in the DynamoDB.

type ExampleData struct {
	Data1 string `dynamodbav:"data_1"`
	Data2 string `dynamodbav:"data_2"`
	Data3 string `dynamodbav:"data_3"`
}

DynamoMQ Producer

The following snippet creates a DynamoMQ producer for the 'ExampleData' type. It then sends a message with predefined data to the queue.

producer := dynamomq.NewProducer[ExampleData](client)
_, err = producer.Produce(ctx, &dynamomq.ProduceInput[ExampleData]{
  Data: ExampleData{
    Data1: "foo",
    Data2: "bar",
    Data3: "baz",
  },
})
if err != nil {
  panic("failed to produce message")
}

DynamoMQ Consumer

To consume messages, instantiate a DynamoMQ consumer for 'ExampleData' and start it in a new goroutine. The consumer will process messages until an interrupt signal is received. The example includes graceful shutdown logic for the consumer.

consumer := dynamomq.NewConsumer[ExampleData](client, &Counter[ExampleData]{})
go func() {
  err = consumer.StartConsuming()
  if err != nil {
    fmt.Println(err)
  }
}()

done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)

<-done

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

if err := consumer.Shutdown(ctx); err != nil {
  fmt.Println("failed to consumer shutdown:", err)
}

Here we define a 'Counter' type that implements the processing logic for consumed messages. Each time a message is processed, the counter is incremented, and the message details are printed.

type Counter[T any] struct {
	Value int
}

func (c *Counter[T]) Process(msg *dynamomq.Message[T]) error {
	c.Value++
	fmt.Printf("value: %d, message: %v\n", c.Value, msg)
	return nil
}

Software Design

State Machine

The state machine diagram below illustrates the key steps a message goes through as it traverses the system.

State Machine

Basic Flow

  1. SendMessage(): A user sends a message that is placed in the READY state in the queue.

  2. ReceiveMessage(): The message moves from READY to PROCESSING status as it is picked up for processing.

  3. DeleteMessage(): If processing is successful, the message is deleted from the queue.

Error Handling

  1. ChangeMessageVisibility(): If processing fails, the message is made visible again in the READY state for retry, and its visibility timeout is updated.

  2. MoveMessageToDLQ(): If the message exceeds the retry limit, it is moved to the Dead Letter Queue (DLQ). The DLQ is used to isolate problematic messages for later analysis.

Dead Letter Queue (DLQ)

  1. RedriveMessage(): The system may choose to return a message to the standard queue if it determines that the issues have been resolved. This is achieved through the Redrive operation.

  2. ReceiveMessage(): Messages in the DLQ are also moved from READY to PROCESSING status, similar to regular queue messages.

  3. DeleteMessage(): Once a message in the DLQ is successfully processed, it is deleted from the queue.

This design ensures that DynamoMQ maintains message reliability while enabling tracking and analysis of messages in the event of errors. The use of a DLQ minimizes the impact of failures while maintaining system resiliency.

Table Definition

The DynamoDB table for the DynamoMQ message queue system is designed to efficiently manage and track the status of messages. Here’s a breakdown of the table schema:

KeyAttributesTypeExample Value
PKidstringA-101
dataanyany
receive_countnumber1
GSIPKqueue_typestringSTANDARD or DLQ
versionnumber1
created_atstring2006-01-02T15:04:05.999999999Z07:00
updated_atstring2006-01-02T15:04:05.999999999Z07:00
GSISKsent_atstring2006-01-02T15:04:05.999999999Z07:00
received_atstring2006-01-02T15:04:05.999999999Z07:00
invisible_until_atstring2006-01-02T15:04:05.999999999Z07:00

PK (Primary Key) ID: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages.

GSIPK (Global Secondary Index - Partition Key) queue_type: Used to categorize messages by queue_type, such as 'STANDARD' or 'DLQ' (Dead Letter Queue), allowing for quick access and operations on subsets of the queue.

GSISK (Global Secondary Index - Sort Key) sent_at: The timestamp when the message was sent to the queue. Facilitates the ordering of messages based on the time they were added to the queue, which is useful for implementing FIFO (First-In-First-Out) or other ordering mechanisms.

Attributes: These are the various properties associated with each message:

  • data: This attribute holds the content of the message and can be of any type.
  • receive_count: A numerical count of how many times the message has been retrieved from the queue.
  • version: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified.
  • created_at: The date and time when the message was created. ISO 8601 format.
  • updated_at: The date and time when the message was last updated. ISO 8601 format.
  • received_at: The timestamp when the message was last viewed without being altered. ISO 8601 format.
  • invisible_until_at: The timestamp indicating when the message becomes visible in the queue for processing. ISO 8601 format.

Data Transition

This data transition diagram serves as a map for developers and operators to understand how messages flow through the DynamoMQ system, providing insight into the mechanisms of message processing, failure handling, and retries within a DynamoDB-backed queue.

Data Transition

Initial State

  • SendMessage(): A message is created with an initial status of 'READY'. It includes a unique id, arbitrary data, and a receive_count set to 0, indicating it has not yet been processed. The queue_type is 'STANDARD', and timestamps are recorded for creation, last update, and when added to the queue.

Processing

  • ReceiveMessage(): The message status changes to 'PROCESSING', the receive_count increments to reflect the number of times it's been retrieved, and the version number increases to facilitate optimistic locking. Timestamps are updated accordingly.

Retry Logic

  • ChangeMessageVisibility(): If processing fails, the message's visibility is updated to make it available for retry, and the receive_count is incremented. Timestamps are refreshed to reflect the most recent update.

Dead Letter Queue

  • MoveMessageToDLQ(): After the maximum number of retries is reached without successful processing, the message is moved to the DLQ. Its queue_type changes to 'DLQ', and receive_count is reset, indicating that it's ready for a fresh attempt or investigation.

Redrive Policy

  • RedriveMessage(): If issues are resolved, messages in the DLQ can be sent back to the standard queue for processing. This is depicted by the RedriveMessage() operation, which resets the receive_count and alters the queue_type back to 'STANDARD', along with updating the timestamps.

Authors

  • vvatanabe - Main contributor
  • Currently, there are no other contributors

License

This project is licensed under the MIT License. For detailed licensing information, refer to the LICENSE file included in the repository.

# Packages

No description provided by the author

# Functions

NewConsumer creates a new Consumer instance with the specified client, message processor, and options.
NewFromConfig creates a new DynamoMQ client using the provided AWS configuration and any additional client options.
NewMessage creates a new instance of a Message with the provided data and initializes its timestamps.
NewProducer creates a new instance of a Producer, which is used to produce messages to a DynamoDB-based queue.
WithAWSBaseEndpoint is an option function to set a custom base endpoint for AWS services.
WithAWSDynamoDBClient is an option function to set a custom AWS DynamoDB client for the DynamoMQ client.
WithAWSRetryMaxAttempts is an option function to set the maximum number of retry attempts for AWS service calls.
WithConcurrency sets the number of concurrent workers for processing messages in the Consumer.
WithErrorLog sets a custom logger for the Consumer.
WithIDGenerator is an option function to set a custom ID generator for the Producer.
WithMaximumReceives sets the maximum number of times a message can be delivered to the Consumer.
WithOnShutdown adds functions to be called during the Consumer's shutdown process.
WithPollingInterval sets the polling interval for the Consumer.
WithQueueingIndexName is an option function to set the queue index name for the DynamoMQ client.
WithQueueType sets the type of queue (STANDARD or DLQ) for the Consumer.
WithRetryInterval sets the retry interval for failed messages in the Consumer.
WithTableName is an option function to set the table name for the DynamoMQ client.
WithUseFIFO is an option function to enable FIFO (First-In-First-Out) behavior for the DynamoMQ client.
WithVisibilityTimeout sets the visibility timeout for messages in the Consumer.

# Constants

QueueTypeDLQ represents a Dead Letter Queue, used for holding messages that failed to process.
QueueTypeStandard represents a standard queue.
StatusProcessing indicates that a message is currently being processed.
StatusReady indicates that a message is ready to be processed.

# Variables

ErrConsumerClosed is an error that indicates the Consumer has been closed.

# Structs

BuildingExpressionError represents an error during the building of a DynamoDB expression.
ChangeMessageVisibilityInput represents the input parameters for changing the visibility timeout of a specific message in a DynamoDB-based queue.
ChangeMessageVisibilityOutput represents the result of the operation to change the visibility of a message.
ClientImpl is a concrete implementation of the dynamomq.Client interface.
ClientOptions defines configuration options for the DynamoMQ client.
ConditionalCheckFailedError represents an error when a condition check on the 'version' attribute fails.
Consumer is a struct responsible for consuming messages from a DynamoDB-based queue.
ConsumerOptions contains configuration options for a Consumer instance.
DeleteMessageInput represents the input parameters for deleting a specific message from a DynamoDB-based queue.
DeleteMessageOutput represents the result of the delete message operation.
DynamoDBAPIError represents a generic error encountered when making a DynamoDB API call.
EmptyQueueError represents an error when an operation cannot proceed due to an empty queue.
GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ).
GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue.
GetMessageOutput represents the result of the operation to retrieve a message.
GetQueueStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based queue.
GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue.
IDDuplicatedError represents an error when a provided ID is duplicated in the system.
IDNotFoundError represents an error when a provided ID is not found in DynamoDB.
IDNotProvidedError represents an error when an ID is not provided where it is required.
InvalidStateTransitionError represents an error for invalid state transitions during operations.
ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue.
ListMessagesOutput represents the result of the operation to list messages from the queue.
MarshalingAttributeError represents an error during the marshaling of DynamoDB attributes.
Message represents a message structure in a DynamoDB-based queue system.
MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ.
ProduceInput represents the input parameters for producing a message.
ProduceOutput represents the result of the produce operation.
Producer is a generic struct responsible for producing messages of any type T to a DynamoDB-based queue.
ProducerOptions holds configuration options for a Producer.
ReceiveMessageInput represents the input parameters for receiving a message from a DynamoDB-based queue.
ReceiveMessageOutput represents the result of a message receiving operation.
RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue.
RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ.
ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue.
ReplaceMessageOutput represents the result of the operation to replace a message in the queue.
SendMessageInput represents the input parameters for sending a message to a DynamoDB-based queue.
SendMessageOutput represents the result of a message sending operation.
UnmarshalingAttributeError represents an error during the unmarshaling of DynamoDB attributes.

# Interfaces

Client is an interface for interacting with a DynamoDB-based message queue system.
MessageProcessor is an interface defining a method to process messages of a generic type T.

# Type aliases

MessageProcessorFunc is a functional type that implements the MessageProcessor interface.
QueueType represents the type of queue in a DynamoDB-based messaging system.
Status represents the state of a message in a DynamoDB-based queue.