Categorygithub.com/interconnectedcloud/go-amqp
modulepackage
0.12.5
Repository: https://github.com/interconnectedcloud/go-amqp.git
Documentation: pkg.go.dev

# README

pack.ag/amqp

Go Report Card Coverage Status Build Status GoDoc MIT licensed

pack.ag/amqp is an AMQP 1.0 client implementation for Go.

AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other AMQP 1.0 resources can be found at github.com/xinchen10/awesome-amqp.

This library aims to be stable and worthy of production usage, but the API is still subject to change. To conform with SemVer, the major version will remain 0 until the API is deemed stable. During this period breaking changes will be indicated by bumping the minor version. Non-breaking changes will bump the patch version.

Install

go get -u pack.ag/amqp

Contributing

Contributions are welcome! Please see CONTRIBUTING.md.

Example Usage

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"pack.ag/amqp"
)

func main() {
	// Create client
	client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
		amqp.ConnSASLPlain("access-key-name", "access-key"),
	)
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()

	// Send a message
	{
		// Create a sender
		sender, err := session.NewSender(
			amqp.LinkTargetAddress("/queue-name"),
		)
		if err != nil {
			log.Fatal("Creating sender link:", err)
		}

		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

		// Send message
		err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
		if err != nil {
			log.Fatal("Sending message:", err)
		}

		sender.Close(ctx)
		cancel()
	}

	// Continuously read messages
	{
		// Create a receiver
		receiver, err := session.NewReceiver(
			amqp.LinkSourceAddress("/queue-name"),
			amqp.LinkCredit(10),
		)
		if err != nil {
			log.Fatal("Creating receiver link:", err)
		}
		defer func() {
			ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
			receiver.Close(ctx)
			cancel()
		}()

		for {
			// Receive next message
			msg, err := receiver.Receive(ctx)
			if err != nil {
				log.Fatal("Reading message from AMQP:", err)
			}

			// Accept message
			msg.Accept()

			fmt.Printf("Message received: %s\n", msg.GetData())
		}
	}
}

Related Projects

ProjectDescription
github.com/Azure/azure-event-hubs-go *Library for interacting with Microsoft Azure Event Hubs.
github.com/Azure/azure-service-bus-go *Library for interacting with Microsoft Azure Service Bus.
gocloud.dev/pubsub *Library for portably interacting with Pub/Sub systems.
qpid-protonAMQP 1.0 library using the Qpid Proton C bindings.

* indicates that the project uses this library.

Feel free to send PRs adding additional projects. Listed projects are not limited to those that use this library as long as they are potentially useful to people who are looking at an AMQP library.

Other Notes

By default, this package depends only on the standard library. Building with the pkgerrors tag will cause errors to be created/wrapped by the github.com/pkg/errors library. This can be useful for debugging and when used in a project using github.com/pkg/errors.

# Functions

ConnConnectTimeout configures how long to wait for the server during connection establishment.
ConnContainerID sets the container-id to use when opening the connection.
ConnIdleTimeout specifies the maximum period between receiving frames from the peer.
ConnMaxFrameSize sets the maximum frame size that the connection will accept.
ConnMaxSessions sets the maximum number of channels.
ConnProperty sets an entry in the connection properties map sent to the server.
ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.
ConnSASLPlain enables SASL PLAIN authentication for the connection.
ConnServerHostname sets the hostname sent in the AMQP Open frame and TLS ServerName (if not otherwise set).
ConnTLS toggles TLS negotiation.
ConnTLSConfig sets the tls.Config to be used during TLS negotiation.
Dial connects to an AMQP server.
LinkAddress sets the link address.
LinkAddressDynamic requests a dynamically created address from the server.
LinkBatching toggles batching of message disposition.
LinkBatchMaxAge sets the maximum time between the start of a disposition batch and sending the batch to the server.
LinkCredit specifies the maximum number of unacknowledged messages the sender can transmit.
LinkMaxMessageSize sets the maximum message size that can be sent or received on the link.
LinkName sets the name of the link.
LinkProperty sets an entry in the link properties map sent to the server.
LinkPropertyInt64 sets an entry in the link properties map sent to the server.
LinkReceiverSettle sets the requested receiver settlement mode.
LinkSelectorFilter sets a selector filter (apache.org:selector-filter:string) on the link source.
LinkSenderSettle sets the requested sender settlement mode.
LinkSourceAddress sets the source address.
LinkSourceCapabilities sets the source capabilities.
LinkSourceDurability sets the source durability policy.
LinkSourceExpiryPolicy sets the link expiration policy.
LinkSourceFilter is an advanced API for setting non-standard source filters.
LinkTargetAddress sets the target address.
LinkTargetDurability sets the target durability policy.
LinkTargetExpiryPolicy sets the link expiration policy.
New establishes an AMQP client connection over conn.
NewMessage returns a *Message with data as the payload.
SessionIncomingWindow sets the maximum number of unacknowledged transfer frames the server can send.
SessionMaxLinks sets the maximum number of links (Senders/Receivers) allowed on the session.
SessionOutgoingWindow sets the maximum number of unacknowledged transfer frames the client can send.

# Constants

Default connection options.
Default link options.
Default link options.
Default link options.
Default connection options.
Default session options.
Default connection options.
Default session options.
Only the existence and configuration of the terminus is retained durably.
No terminus state is retained durably.
In addition to the existence and configuration of the terminus, the unsettled state for durable messages is retained durably.
Connection Errors.
Error Conditions.
Error Conditions.
Link Errors.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
AMQP Errors.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Session Errors.
The expiry timer starts when most recently associated connection is closed.
The expiry timer starts when terminus is detached.
The terminus never expires.
The expiry timer starts when the most recently associated session is ended.
Receiver will spontaneously settle all incoming transfers.
Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
Receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender.
Sender will send all deliveries settled to the receiver.
Sender will send all deliveries initially unsettled to the receiver.

# Variables

ErrConnClosed is propagated to Session and Senders/Receivers when Client.Close() is called or the server closes the connection without specifying an error.
ErrLinkClosed returned by send and receive operations when Sender.Close() or Receiver.Close() are called.
ErrSessionClosed is propagated to Sender/Receivers when Session.Close() is called.
Errors.

# Structs

Client is an AMQP client connection.
DetachError is returned by a link (Receiver/Sender) when a detach frame is received.
Error is an AMQP error.
Message is an AMQP message.
MessageHeader carries standard delivery details about the transfer of a message.
MessageProperties is the defined set of properties for AMQP messages.
Receiver receives messages on a single AMQP link.
Sender sends messages on a single AMQP link.
Session is an AMQP session.

# Type aliases

Annotations keys must be of type string, int, or int64.
ArrayUByte allows encoding []uint8/[]byte as an array rather than binary data.
ConnOption is a function for configuring an AMQP connection.
Durability specifies the durability of a link.
ErrorCondition is one of the error conditions defined in the AMQP spec.
ExpiryPolicy specifies when the expiry timer of a terminus starts counting down from the timeout value.
LinkOption is a function for configuring an AMQP link.
ReceiverSettleMode specifies how the receiver will settle messages.
SenderSettleMode specifies how the sender will settle messages.
SessionOption is an function for configuring an AMQP session.
UUID is a 128 bit identifier as defined in RFC 4122.