Categorygithub.com/harpy-wings/signal-flow
modulepackage
1.1.0
Repository: https://github.com/harpy-wings/signal-flow.git
Documentation: pkg.go.dev

# README

signal-flow

Go Report Card Coverage Go Reference build workflow codecov Reliability Rating Security Rating

Signal-Flow is a library for handling AMQP operations, including retrying and reconnecting strategies. It facilitates message emission and retrieval from AMQP brokers, incorporating flow control signals. It is designed generically to support various data formats.

HLD:

                Queue
                ↧ 1:n (Foreach)
+―――――――――――――――――――+
| SignalFlow        | 
| Foreach           |
| Emit              | ↦ Exchange
+―――――――――――――――――――+ 1:n (Emit)


Quick Start

Install Signal-Flow:

go get -u github.com/harpy-wing/signal-flow

Then, use it in your code:

package main

import (
  "log"
  "time"

  signalflow "github.com/harpy-wings/signal-flow"
)

func main() {
  // Define a custom data structure.
  type Telegraph struct {
    Text string `json:"text"`
  }

  // Initialize Signal-Flow with necessary configurations.
  SF, err := signalflow.New[Telegraph](
    signalflow.OptionWithHost("amqp://guest:guest@localhost:5672/"), // Set AMQP server host.
    signalflow.OptionWithQueueName("qt1"),                           // Specify the queue name.
    signalflow.OptionWithExchangeName("ext1"),                       // Specify the exchange name.
  )
  if err != nil {
    panic(err)
  }

  // Consume messages from the queue.
  SF.Foreach(func(m Telegraph) error {
    log.Println(m.Text)
    return nil
  })

  // Emit a message to the exchange.
  err = SF.Emit(Telegraph{
    Text: "Hello Gopher 1!",
  })
  if err != nil {
    panic(err)
  }
}

See _example for more.

More

Goals

The aim of this library is to create a very easy-to-use interface for AMQP brokers. Maintenance is limited to improving documentation, adding more end-to-end test functionality, fixing raised issues, and adding benchmark tests.

Supported Go Versions

The library is developed by Go 1.22.1; however, the CI tests run on different Go versions >= 1.19. Check the Actions for more details.

Why You Should Use Signal-Flow?

  • Increase test coverage by mocking the Signal-Flow library.
  • Avoid dealing with deadlocks, parallelism, and flow control of AMQP.
  • No need to handle retrying the connection or recovering the channel once it disconnects.
  • Reduce boilerplate code in your project.

Interface

type SignalFlow[Message any] interface {
  Foreach(fn func(Message) error) error
  Emit(msg Message) error
  EmitR(msg Message, routingKey string) error
}

Foreach(fn func(Message) error) error

Foreach executes the provided fn function for every message received. If the fn function returns nil, the message is acknowledged (Ack); otherwise, it is not.

Emit(msg Message) error

Emit sends a message to the preset exchange with the preset routing-key. If it fails, it returns an error.

EmitR(msg Message, routingKey string)

Emit sends a message to the preset exchange with the given routing-key. If it fails, it returns an error.

Best Practices

Don't Use Singleton Pattern

Inside an application, there could be multiple modules consuming or producing different types of messages. Make sure to create separate Signal-Flow clients for each module.

Implement Idempotence

Acknowledgment of a message is asynchronous, so you might receive the same message from the RMQ Broker multiple times due to unexpected network errors. Ensure your message processing is idempotent.

Advanced Configuration

The following Options are available for Signal-Flow configuration:

func New[Message any](ops ...Option) (SignalFlow[Message], error) 

OptionWithHost(v string)

Sets the host of the RMQ Broker for Signal-Flow. This option is required.

OptionWithCodec(c Codec)

The Signal-Flow library defaults to using the JSON Codec. However, you can utilize other Codecs defined in the codec directory or define your own Codec. It just needs to implement the Codec interface.

// Codec is an interface for encoding and decoding messages.
// By default, two codecs (json, binary) are available under the codec package.
type Codec interface {
  // Encode encodes the message to an array of bytes.
  Encode(m any) ([]byte, error)

  // Decode retrieves the message from an array of bytes.
  Decode(any, []byte) error

  // ContentType returns the content type of the codec.
  // Example: "application/json"
  ContentType() string
}

OptionWithName(v string)

This function specifies a name for the consumer. By default, Signal-Flow picks up a random name starting with the sf_ prefix. However, it's better for the developer to provide a proper name so it can be easily identified for logging or in the RMQ management dashboard.

OptionWithQueueName(v string)

This option specifies a queue name for Signal-Flow. Once the queue name is specified, Signal-Flow initializes the consumer.

OptionWithExchangeName(v string)

This option specifies the exchange name for Signal-Flow. Once the exchange name is specified, Signal-Flow initializes the producer.

OptionWithRoutingKey(v string)

This option is used to specify the default routing key for the producer. If you set a routing key and the exchange name is empty, Signal-Flow initializes the producer for the default exchange and uses the routing key. You can pass the queue name here with the default routing key to emit a message to a specific queue.

OptionWithErrorHandler(fn func(error))

Since some processes happen in parallel, errors may occur while processing messages. This function allows you to handle those errors for better error handling.

OptionQueueDeclaration(req QueueDeclarationRequest)

By default, Signal-Flow does not create a queue if it does not exist when attempting to consume from it. If you want to declare a queue, you can use this option. Queue declaration occurs as soon as the connection is established and before the consumer initializes.

OptionExchangeDeclaration(req ExchangeDeclarationRequest)

By default, Signal-Flow does not create an exchange if it does not exist when trying to emit a message to it. If you want to declare an exchange, you can use this option. Exchange declaration occurs as soon as the connection is established and before the producer initializes.

OptionBinding(req BindingRequest)

You can bind an exchange to another exchange or queue using this option. It will be executed once the connection is established and before the consumer or producer initializes.

Note: You need to specify the destination type of the binding, whether it's a queue or an exchange.

OptionWithLogger(v logrus.FieldLogger)

This option allows you to set the logger for SignalFlow. SignalFlow will then utilize this logger for logging purposes, enabling you to specify your configuration for handling logs.

Contributing

Contributions are welcome. Please read the Contribution document and raise an issue before starting work.

Run the tests

Ensure that RMQP is available at amqp://guest:guest@localhost:5672/ or your Docker daemon is running. Then run the tests using:

go test -v ./...

Dependencies

Standard Library Imports

  • errors
  • math
  • math/rand
  • strconv
  • sync
  • time

SonarCloud

# Packages

No description provided by the author
No description provided by the author

# Functions

No description provided by the author
OptionBinding binds an exchange to another exchange or queue in the RMQ server.
OptionExchangeDeclaration declares an exchange in the RMQ server.
OptionQueueDeclaration declares a queue in the RMQ server.
OptionWithCodec specifies the Codec for encoding and decoding messages.
No description provided by the author
OptionWithExchangeName sets the exchange name for the signal flow client.
OptionWithQoS sets the Quality of Service (QoS) for the RMQ server.
OptionWithHostname sets the host URL of the Signal flow.
OptionWithLogger This option allows you to set the logger for SignalFlow.
OptionWithName sets the name of the Signal flow Client.
OptionWithQueueName sets the queue name for the signal flow client.
OptionWithRoutingKey specifies the routing key for the Producer once it emits messages.

# Constants

Binding an exchange to another exchange.
By default, we expect to bind an exchange to a queue.

# Variables

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

BindingRequest contains the configuration for binding an exchange to another queue or exchange.
ExchangeDeclarationRequest contains the configuration for an exchange declaration request.
QueueDeclarationRequest contains the configuration for a queue declaration request.

# Interfaces

Codec is an interface for encoding and decoding messages.
SignalFlow is an interface for handling AMQP operations, including retrying and reconnecting strategies.

# Type aliases

No description provided by the author
No description provided by the author