Categorygithub.com/mirrorweb/sqspoller
modulepackage
0.4.8
Repository: https://github.com/mirrorweb/sqspoller.git
Documentation: pkg.go.dev

# README

SQS-Poller

SQS-Poller is a simple queue polling framework, designed specifically to work with AWS SQS.

Contents

Installation

  1. Install sqspoller:
$ go get -u github.com/mirrorweb/sqspoller
  1. Import code:
import "github.com/mirrorweb/sqspoller"

Features

  • Timeouts
  • Polling intervals
  • Polling back offs on empty responses
  • Graceful shutdowns
  • Middleware
  • Remove message from queue with simple delete API

Quick Start

package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/mirrorweb/sqspoller"
	"log"
	"time"
)

func main() {
	// create SQS client.
	sess := session.Must(session.NewSession())
	sqsClient := sqs.New(sess)

	// use client to create default Poller instance.
	poller := sqspoller.Default(sqsClient)

	// supply polling parameters.
	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	// configure idle poll interval and handler timeout
	poller.SetIdlePollInterval(30 * time.Second)
	poller.SetHandlerTimeout(120 * time.Second)

	// supply handler to handle new messages
	poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
		msg := msgOutput.Messages[0]
		// do work on message
		fmt.Println("GOT MESSAGE: ", msg)
		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})

	// supply handler to handle errors returned from poll requests to
	// SQS returning a non nil error will cause the poller to exit.
	poller.OnError(func(ctx context.Context, err error) error {
		// log error and exit poller.
		log.Println(err)
		return err
	})

	// Run poller.
	if err := poller.Run(); err != nil {
		log.Fatal(err)
	}
}

Using Middleware

func main() {

	poller := sqspoller.New(sqsClient)

	// IgnoreEmptyResponses stops empty message outputs from reaching the core handler
	// and therefore the user can guarantee that there will be at least one message in
	// the message output.
	//
	// Note: Default poller comes with this middleware.
	poller.Use(sqspoller.IgnoreEmptyResponses())

	// supply handler to handle new messages
	poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
		// can guarantee messages will have length greater than or equal to one.
		msg := msgOutput.Messages[0]

		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})
}

Shutdown

When shutting down the poller, there are three different modes of shutdown to choose from.

ShutdownNow

 poller.ShutdownNow()

The ShutdownNow method cancels the context object immediately and exits the Run() function. It does not wait for any jobs to finish handling before exiting.

ShutdownGracefully

 poller.ShutdownGracefully()

The ShutdownGracefully method waits for the handler to finish handling the current message before cancelling the context object and exiting the Run() function. If the handler is blocked then ShutdownGracefully will not exit.

ShutdownAfter

 poller.ShutdownAfter(30*time.Second)

The ShutdownAfter method attempts to shutdown gracefully within the given time, if the handler cannot complete it's current job within the given time, then the context object is cancelled at that time allowing the Run() function to exit.

If the timeout happens before the poller can shutdown gracefully then ShutdownAfter returns error, ErrShutdownGraceful.

func main() {
	poller := sqspoller.Default(sqsClient)

	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	poller.OnMessage(messageHandler)
	poller.OnError(errorHandler)

	// run poller in a separate goroutine and wait for errors on channel
	pollerErrors := make(chan error, 1)
	go func() {
		pollerErrors <- poller.Run()
	}()

	// listen for shutdown signals
	shutdown := make(chan os.Signal, 1)
	signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

	select {
	case err := <-pollerErrors:
		log.Fatal(err)
	case <-shutdown:
		if err := poller.ShutdownAfter(30 * time.Second); err != nil {
			log.Fatal(err)
		}
	}
}

The Playground

To see how you can experiment and play around with a local SQS and poller instance, take a look here.

Dependencies

  • Go Version 1.13+

Just in case you was worried about dependency bloat, the core package functions only rely on two third party modules, which are:

  • github.com/aws/aws-sdk-go v1.28.9 - What the framework is built for.
  • github.com/google/uuid v1.1.1 - To generate reliable UUIDs for tracing.

The rest of the dependencies that can be found in go.mod, are test dependencies. These modules provide functionality to effectively test the framework.

Testing

To run all the unit and integration tests, hit make test

Tests in the sqspoller_test.go file require that docker is installed and running on your machine, as the tests spin up local SQS containers to test the framework against.

# Packages

No description provided by the author
No description provided by the author

# Functions

Default creates a new instance of the SQS Poller from an instance of sqs.SQS.
IgnoreEmptyResponses stops the data from being passed down to the inner message handler, if there is no message to be handled.
New creates a new instance of the SQS Poller from an instance of sqs.SQS.

# Constants

TrackingKey should be used to access the values on the context object of type *TrackingValue.

# Variables

ErrHandlerTimeout occurs when the MessageHandler times out before processing the message.
ErrNotRunnable occurs when there is an integrity issue in the system.
ErrNoErrorHandler occurs when the caller tries to run the poller before attaching an ErrorHandler.
ErrNoMessageHandler occurs when the caller tries to run the poller before attaching a MessageHandler.
ErrNoReceiveMessageParams occurs when the caller tries to run the poller before setting the ReceiveMessageParams.
ErrNotCloseable occurs when the caller tries to shut down the poller is already stopped or in the process of shutting down.
ErrNotRunnable occurs when the caller tries to run the poller while the poller is already running or shutting down.
ErrRequestTimeout occurs when the poller times out while requesting for a message off the SQS queue.
ErrShutdownGraceful occurs when the poller fails to shutdown gracefully.
ErrShutdownNow occurs when the poller is suddenly shutdown.

# Structs

Message is an individual message, contained within a MessageOutput, it provides methods to remove itself from the SQS queue.
MessageOutput is contains the SQS ReceiveMessageOutput and is passed down to the MessageHandler when the Poller is running.
Poller is an instance of the polling framework, it contains the SQS client and provides a simple API for polling an SQS queue.
TrackingValue represents the values stored on the context object, for each poll the context object will store the time of message received and a trace ID.

# Type aliases

ErrorHandler is a function which handlers errors returned from sqs.ReceiveMessageWithContext, it will only be invoked if the error is not nil.
MessageHandler is a function which handles the incoming SQS message.
Middleware is a function which that wraps a MessageHandler to add functionality before or after the MessageHandler code.