Categorygithub.com/integration-system/go-amqp
modulepackage
0.14.0
Repository: https://github.com/integration-system/go-amqp.git
Documentation: pkg.go.dev

# README

github.com/integration-system/go-amqp

Build Status Go Report Card GoDoc MIT licensed

github.com/integration-system/go-amqp is an AMQP 1.0 client implementation for Go.

AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other AMQP 1.0 resources can be found at github.com/xinchen10/awesome-amqp.

This library aims to be stable and worthy of production usage, but the API is still subject to change. To conform with SemVer, the major version will remain 0 until the API is deemed stable. During this period breaking changes will be indicated by bumping the minor version. Non-breaking changes will bump the patch version.

Install

go get -u github.com/integration-system/go-amqp

Contributing

Contributions are welcome! Please see CONTRIBUTING.md.

Example Usage

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/integration-system/go-amqp"
)

func main() {
	// Create client
	client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
		amqp.ConnSASLPlain("access-key-name", "access-key"),
	)
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()

	// Send a message
	{
		// Create a sender
		sender, err := session.NewSender(
			amqp.LinkTargetAddress("/queue-name"),
		)
		if err != nil {
			log.Fatal("Creating sender link:", err)
		}

		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

		// Send message
		err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
		if err != nil {
			log.Fatal("Sending message:", err)
		}

		sender.Close(ctx)
		cancel()
	}

	// Continuously read messages
	{
		// Create a receiver
		receiver, err := session.NewReceiver(
			amqp.LinkSourceAddress("/queue-name"),
			amqp.LinkCredit(10),
		)
		if err != nil {
			log.Fatal("Creating receiver link:", err)
		}
		defer func() {
			ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
			receiver.Close(ctx)
			cancel()
		}()

		for {
			// Receive next message
			msg, err := receiver.Receive(ctx)
			if err != nil {
				log.Fatal("Reading message from AMQP:", err)
			}

			// Accept message
			msg.Accept(context.Background())

			fmt.Printf("Message received: %s\n", msg.GetData())
		}
	}
}

Related Projects

ProjectDescription
github.com/Azure/azure-event-hubs-go *Library for interacting with Microsoft Azure Event Hubs.
github.com/Azure/azure-service-bus-go *Library for interacting with Microsoft Azure Service Bus.
gocloud.dev/pubsub *Library for portably interacting with Pub/Sub systems.
qpid-protonAMQP 1.0 library using the Qpid Proton C bindings.

* indicates that the project uses this library.

Feel free to send PRs adding additional projects. Listed projects are not limited to those that use this library as long as they are potentially useful to people who are looking at an AMQP library.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

# Functions

ConnConnectTimeout configures how long to wait for the server during connection establishment.
ConnContainerID sets the container-id to use when opening the connection.
ConnIdleTimeout specifies the maximum period between receiving frames from the peer.
ConnMaxFrameSize sets the maximum frame size that the connection will accept.
ConnMaxSessions sets the maximum number of channels.
ConnOnUnexpectedDisconnect sets a callback for a call when the connection is closed by unknown reason.
ConnProperty sets an entry in the connection properties map sent to the server.
ConnSASLAnonymous enables SASL ANONYMOUS authentication for the connection.
ConnSASLExternal enables SASL EXTERNAL authentication for the connection.
ConnSASLPlain enables SASL PLAIN authentication for the connection.
ConnSASLXOAUTH2 enables SASL XOAUTH2 authentication for the connection.
ConnServerHostname sets the hostname sent in the AMQP Open frame and TLS ServerName (if not otherwise set).
ConnTLS toggles TLS negotiation.
ConnTLSConfig sets the tls.Config to be used during TLS negotiation.
Dial connects to an AMQP server.
LinkAddressDynamic requests a dynamically created address from the server.
LinkBatching toggles batching of message disposition.
LinkBatchMaxAge sets the maximum time between the start of a disposition batch and sending the batch to the server.
LinkCredit specifies the maximum number of unacknowledged messages the sender can transmit.
LinkDetachOnDispositionError controls whether you detach on disposition errors (subject to some simple logic) or do NOT detach at all on disposition errors.
LinkMaxMessageSize sets the maximum message size that can be sent or received on the link.
LinkName sets the name of the link.
LinkProperty sets an entry in the link properties map sent to the server.
LinkPropertyInt32 sets an entry in the link properties map sent to the server.
LinkPropertyInt64 sets an entry in the link properties map sent to the server.
LinkReceiverSettle sets the requested receiver settlement mode.
LinkSelectorFilter sets a selector filter (apache.org:selector-filter:string) on the link source.
LinkSenderSettle sets the requested sender settlement mode.
LinkSourceAddress sets the source address.
LinkSourceCapabilities sets the source capabilities.
LinkSourceDurability sets the source durability policy.
LinkSourceExpiryPolicy sets the link expiration policy.
LinkSourceFilter is an advanced API for setting non-standard source filters.
LinkSourceTimeout sets the duration that an expiring source will be retained.
LinkTargetAddress sets the target address.
LinkTargetDurability sets the target durability policy.
LinkTargetExpiryPolicy sets the link expiration policy.
LinkTargetTimeout sets the duration that an expiring target will be retained.
LinkWithManualCredits enables manual credit management for this link.
New establishes an AMQP client connection over conn.
NewMessage returns a *Message with data as the payload.
SessionIncomingWindow sets the maximum number of unacknowledged transfer frames the server can send.
SessionMaxLinks sets the maximum number of links (Senders/Receivers) allowed on the session.
SessionOutgoingWindow sets the maximum number of unacknowledged transfer frames the client can send.

# Constants

Default connection options.
Default link options.
Default link options.
Default link options.
Default connection options.
Default session options.
Default connection options.
Default session options.
Only the existence and configuration of the terminus is retained durably.
No terminus state is retained durably.
In addition to the existence and configuration of the terminus, the unsettled state for durable messages is retained durably.
Connection Errors.
Error Conditions.
Error Conditions.
Link Errors.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
AMQP Errors.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Error Conditions.
Session Errors.
The expiry timer starts when most recently associated connection is closed.
The expiry timer starts when terminus is detached.
The terminus never expires.
The expiry timer starts when the most recently associated session is ended.
Receiver will spontaneously settle all incoming transfers.
Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
Receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender.
Sender will send all deliveries settled to the receiver.
Sender will send all deliveries initially unsettled to the receiver.

# Variables

ErrConnClosed is propagated to Session and Senders/Receivers when Client.Close() is called or the server closes the connection without specifying an error.
ErrLinkClosed is returned by send and receive operations when Sender.Close() or Receiver.Close() are called.
ErrSessionClosed is propagated to Sender/Receivers when Session.Close() is called.
Errors.

# Structs

Client is an AMQP client connection.
DetachError is returned by a link (Receiver/Sender) when a detach frame is received.
Message is an AMQP message.
MessageHeader carries standard delivery details about the transfer of a message.
MessageProperties is the defined set of properties for AMQP messages.
Receiver receives messages on a single AMQP link.
Sender sends messages on a single AMQP link.
Session is an AMQP session.

# Type aliases

Annotations keys must be of type string, int, or int64.
ConnOption is a function for configuring an AMQP connection.
Durability specifies the durability of a link.
No description provided by the author
No description provided by the author
ExpiryPolicy specifies when the expiry timer of a terminus starts counting down from the timeout value.
LinkOption is a function for configuring an AMQP link.
ReceiverSettleMode specifies how the receiver will settle messages.
SenderSettleMode specifies how the sender will settle messages.
SessionOption is an function for configuring an AMQP session.
No description provided by the author