# README
SQS-Poller
SQS-Poller is a simple queue polling framework, designed specifically to work with AWS SQS.
Contents
- Installation
- Features
- Quick Start
- Using Middleware
- Handling Shutdowns
- The Playground
- Dependencies
- Testing
Installation
- Install sqspoller:
$ go get -u github.com/mirrorweb/sqspoller
- Import code:
import "github.com/mirrorweb/sqspoller"
Features
- Timeouts
- Polling intervals
- Polling back offs on empty responses
- Graceful shutdowns
- Middleware
- Remove message from queue with simple delete API
Quick Start
package main
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/mirrorweb/sqspoller"
"log"
"time"
)
func main() {
// create SQS client.
sess := session.Must(session.NewSession())
sqsClient := sqs.New(sess)
// use client to create default Poller instance.
poller := sqspoller.Default(sqsClient)
// supply polling parameters.
poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
QueueUrl: aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
})
// configure idle poll interval and handler timeout
poller.SetIdlePollInterval(30 * time.Second)
poller.SetHandlerTimeout(120 * time.Second)
// supply handler to handle new messages
poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
msg := msgOutput.Messages[0]
// do work on message
fmt.Println("GOT MESSAGE: ", msg)
// delete message from queue
if _, err := msg.Delete(); err != nil {
return err
}
return nil
})
// supply handler to handle errors returned from poll requests to
// SQS returning a non nil error will cause the poller to exit.
poller.OnError(func(ctx context.Context, err error) error {
// log error and exit poller.
log.Println(err)
return err
})
// Run poller.
if err := poller.Run(); err != nil {
log.Fatal(err)
}
}
Using Middleware
func main() {
poller := sqspoller.New(sqsClient)
// IgnoreEmptyResponses stops empty message outputs from reaching the core handler
// and therefore the user can guarantee that there will be at least one message in
// the message output.
//
// Note: Default poller comes with this middleware.
poller.Use(sqspoller.IgnoreEmptyResponses())
// supply handler to handle new messages
poller.OnMessage(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput) error {
// can guarantee messages will have length greater than or equal to one.
msg := msgOutput.Messages[0]
// delete message from queue
if _, err := msg.Delete(); err != nil {
return err
}
return nil
})
}
Shutdown
When shutting down the poller, there are three different modes of shutdown to choose from.
ShutdownNow
poller.ShutdownNow()
The ShutdownNow method cancels the context object immediately and exits the Run() function. It does not wait for any jobs to finish handling before exiting.
ShutdownGracefully
poller.ShutdownGracefully()
The ShutdownGracefully method waits for the handler to finish handling the current message before cancelling the context object and exiting the Run() function. If the handler is blocked then ShutdownGracefully will not exit.
ShutdownAfter
poller.ShutdownAfter(30*time.Second)
The ShutdownAfter method attempts to shutdown gracefully within the given time, if the handler cannot complete it's current job within the given time, then the context object is cancelled at that time allowing the Run() function to exit.
If the timeout happens before the poller can shutdown gracefully then ShutdownAfter returns error, ErrShutdownGraceful.
func main() {
poller := sqspoller.Default(sqsClient)
poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(1),
QueueUrl: aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
})
poller.OnMessage(messageHandler)
poller.OnError(errorHandler)
// run poller in a separate goroutine and wait for errors on channel
pollerErrors := make(chan error, 1)
go func() {
pollerErrors <- poller.Run()
}()
// listen for shutdown signals
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
select {
case err := <-pollerErrors:
log.Fatal(err)
case <-shutdown:
if err := poller.ShutdownAfter(30 * time.Second); err != nil {
log.Fatal(err)
}
}
}
The Playground
To see how you can experiment and play around with a local SQS and poller instance, take a look here.
Dependencies
- Go Version 1.13+
Just in case you was worried about dependency bloat, the core package functions only rely on two third party modules, which are:
- github.com/aws/aws-sdk-go v1.28.9 - What the framework is built for.
- github.com/google/uuid v1.1.1 - To generate reliable UUIDs for tracing.
The rest of the dependencies that can be found in go.mod, are test dependencies. These modules provide functionality to effectively test the framework.
Testing
To run all the unit and integration tests, hit make test
Tests in the sqspoller_test.go file require that docker is installed and running on your machine, as the tests spin up local SQS containers to test the framework against.