package
0.10.4
Repository: https://github.com/useinsider/go-pkg.git
Documentation: pkg.go.dev

# README

Kinesis Package

The inskinesis package is a Go library designed to facilitate streaming data to Amazon Kinesis streams. This README provides an overview of the package's functionality and usage.

Table of Contents

Introduction

The inskinesis package is designed to make it easier to stream data to Amazon Kinesis streams in your Go applications. It provides a simple interface for sending records to a Kinesis stream while handling batching, partitioning, and retries. This can be especially useful for applications that generate a high volume of data and need to send it to Kinesis efficiently.

Installation

To use the inskinesis package in your Go project, you can install it using Go modules. Run the following command in your project directory:

go get github.com/useinsider/go-pkg/inskinesis

Getting Started

Here's a quick guide on how to get started with the inskinesis package:

  1. Import the package in your Go code:

    import "github.com/useinsider/go-pkg/inskinesis"
    
  2. Create a configuration for your Kinesis stream:

    config := inskinesis.Config{
        Region:                 "your-aws-region",
        StreamName:             "your-kinesis-stream-name",
        Partitioner:            nil, // Optionally provide a partitioner function
        MaxStreamBatchSize:     100, // Maximum size of each batch of records
        MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch
        MaxBatchSize:           500, // Maximum size of the log buffer
        MaxGroup:               10, // Maximum number of concurrent groups for sending records
    }
    
FieldDefault ValueDescription
RegionN/AThe AWS region where the Kinesis stream is located. Required
StreamNameN/AThe name of the Kinesis stream. Required
PartitionerUUIDAn optional partitioner function used to determine the partition key for records. If not provided, a default UUID-based partitioner is used.
MaxStreamBatchSize100The maximum size of each batch of records to be sent to the stream.
MaxStreamBatchByteSize256 KB (2^18 byte)The maximum size (in bytes) of each batch of records.
MaxBatchSize500The maximum size of the log buffer for accumulating log records before batching.
MaxGroup1The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1.
RetryCount3The number of times to retry sending a batch of records to the stream.
RetryInterval100 msThe interval between retries.
VerbosefalseWhether to enable verbose logging.

Please note that N/A in the Default Value column indicates that these fields are required and do not have default values.

  1. Create a Kinesis stream instance:

    stream, err := inskinesis.NewKinesis(config)
    if err != nil {
        // Handle the error
    }
    
  2. Send records to the Kinesis stream:

    // Send a single record
    stream.Put(yourRecord)
    
    // Send multiple records
    records := []interface{}{record1, record2, record3}
    for _, record := range records {
        stream.Put(record)
    }
    
  3. To ensure all records are sent and clean up resources, flush and stop streaming:

    stream.FlushAndStopStreaming()
    

Package Structure

The inskinesis package is organized as follows:

  • inskinesis package: The main package containing the StreamInterface, stream, and related functionality for streaming records to Kinesis.
  • PartitionerFunction: A customizable partitioning function for determining the partition key of records.
  • Various error handling and logging functionality.

Usage

The package provides a simple interface for streaming records to a Kinesis stream. You can customize the configuration based on your needs, including the region, stream name, batch sizes, and partitioning function.

Here's an example of how to use the package:

import "github.com/useinsider/go-pkg/inskinesis"

config := inskinesis.Config{
    Region:                 "your-aws-region",
    StreamName:             "your-kinesis-stream-name",
    Partitioner:            nil, // Optionally provide a partitioner function
    MaxStreamBatchSize:     100, // Maximum size of each batch of records
    MaxStreamBatchByteSize: 1024 * 1024, // Maximum size in bytes for each batch
    MaxBatchSize:           500,         // Maximum size of the log buffer
    MaxGroup:               10, // Maximum number of concurrent groups for sending records
}

stream, err := inskinesis.NewKinesis(config)
if err != nil {
// Handle the error
}

// Send records to the stream
stream.Put(yourRecord)

// To ensure all records are sent and clean up resources, flush and stop streaming
stream.FlushAndStopStreaming()

Error Handling

The inskinesis package provides error channels for receiving errors during streaming. You can use these channels to handle errors in your application gracefully. It's important to monitor the error channels to ensure the robustness of your data streaming process.

Here's an example of how to use the error channels:

go func () {
    for {
        select {
        case err := <-stream.Error():
            sentry.Error(err)
        }
    }
}()

Contributing

If you would like to contribute to the inskinesis package, please follow standard Go community guidelines for contributions. You can create issues, submit pull requests, and help improve the package for everyone.

# Functions

NewKinesis creates a new Kinesis stream.
NewMockKinesisInterface creates a new mock instance.
NewMockStreamInterface creates a new mock instance.
NewMockSubscribeToShardEventStreamEvent creates a new mock instance.
NewMockSubscribeToShardEventStreamReader creates a new mock instance.
PartitionerPointer returns a pointer to the PartitionerPointer value passed in.
No description provided by the author

# Variables

No description provided by the author

# Structs

No description provided by the author
CustomRetryer retries on "connection reset by peer".
No description provided by the author
MockKinesisInterface is a mock of KinesisInterface interface.
MockKinesisInterfaceMockRecorder is the mock recorder for MockKinesisInterface.
MockStreamInterface is a mock of StreamInterface interface.
MockStreamInterfaceMockRecorder is the mock recorder for MockStreamInterface.
MockSubscribeToShardEventStreamEvent is a mock of SubscribeToShardEventStreamEvent interface.
MockSubscribeToShardEventStreamEventMockRecorder is the mock recorder for MockSubscribeToShardEventStreamEvent.
MockSubscribeToShardEventStreamReader is a mock of SubscribeToShardEventStreamReader interface.
MockSubscribeToShardEventStreamReaderMockRecorder is the mock recorder for MockSubscribeToShardEventStreamReader.

# Interfaces

No description provided by the author
StreamInterface defines the interface for a Kinesis stream.

# Type aliases

PartitionerFunction is the common signature of all partitioners, it maps a record to a partition key.