Categorygithub.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
modulepackage
1.8.0
Repository: https://github.com/azure/azure-sdk-for-go.git
Documentation: pkg.go.dev

# README

Azure Service Bus client module for Go

Azure Service Bus is a highly reliable cloud messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.

Use the Service Bus client module github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus in your application to:

  • Send messages to a Service Bus queue or topic
  • Receive messages from a Service Bus queue or subscription

Key links: Source code | API reference documentation | REST API documentation | Product documentation | Samples

If you used the pre-release azure-service-bus-go module, see the Migration guide.

Getting started

Prerequisites

  • Go, version 1.18 or higher - Install Go
  • Azure subscription - Create a free account
  • Service Bus namespace - Create a namespace
  • A Service Bus queue, topic, or subscription - See the Azure Service Bus documentation to create an entity in your Service Bus namespace. For example, create a Service Bus queue using the Azure portal, the Azure CLI, or other tools.

Install the package

Install the Azure Service Bus client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus

Authenticate the client

The Service Bus Client can be created using a credential from the Azure Identity package, such as DefaultAzureCredential, or using a Service Bus connection string.

Using a service principal

import (
  "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For more information about the DefaultAzureCredential, see:
  // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
  credential, err := azidentity.NewDefaultAzureCredential(nil)

  if err != nil {
    panic(err)
  }

  // The service principal specified by the credential needs to be added to the appropriate Service Bus roles for your
  // resource. For more information about Service Bus roles, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-managed-service-identity#azure-built-in-roles-for-azure-service-bus
  client, err := azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

  if err != nil {
    panic(err)
  }
}

Using a connection string

import (
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For instructions on how to get a Service Bus connection string, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string
  client, err := azservicebus.NewClientFromConnectionString(connectionString, nil)

  if err != nil {
    panic(err)
  }
}

Key concepts

Once you've created a Client, you can interact with resources within a Service Bus namespace:

  • Queue: Allows for sending and receiving messages. Often used for point-to-point communication.
  • Topic: Similar to a queue but splits the receiving and sending into separate entities. Messages are sent to a topic and are broadcast to subscriptions, where they can be consumed independently, and in parallel by receivers.
  • Subscription: Consumes messages from a topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and filters can be used to tailor which messages are received by a specific subscription.

For more information about these resources, see What is Azure Service Bus?.

Using a Client you can do the following:

The queues, topics, and subscriptions should be created prior to using this library.

Examples

The following examples cover common tasks using Azure Service Bus:

Send messages

Once you've created a Client you can create a Sender, which will allow you to send messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

Send a single message:

err := sender.SendMessage(context.TODO(), &azservicebus.Message{
  Body: []byte("hello world!"),
}, nil)

Send multiple messages using a batch

You can also send messages in batches, which can be more efficient than sending them individually.

// Create a message batch. It will automatically be sized for the Service Bus
// namespace's maximum message size.
currentMessageBatch, err := sender.NewMessageBatch(context.TODO(), nil)

if err != nil {
  panic(err)
}

messagesToSend := []*azservicebus.Message{
  // any messages that you'd want to send would be here, or sourced from
  // somewhere else.
}

for i := 0; i < len(messagesToSend); i++ {
  // Add a message to our message batch. This can be called multiple times.
  err = currentMessageBatch.AddMessage(messagesToSend[i], nil)

  if errors.Is(err, azservicebus.ErrMessageTooLarge) {
    if currentMessageBatch.NumMessages() == 0 {
      // This means the message itself is too large to be sent, even on its own.
      // This will require intervention from the user.
      panic("Single message is too large to be sent in a batch.")
    }

    fmt.Printf("Message batch is full. Sending it and creating a new one.\n")

    // send what we have since the batch is full
    err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

    if err != nil {
      panic(err)
    }

    // Create a new batch and retry adding this message to our batch.
    newBatch, err := sender.NewMessageBatch(context.TODO(), nil)

    if err != nil {
      panic(err)
    }

    currentMessageBatch = newBatch

    // rewind the counter and attempt to add the message again (this batch
    // was full so it didn't go out with the previous SendMessageBatch call).
    i--
  } else if err != nil {
    panic(err)
  }
}

// check if any messages are remaining to be sent.
if currentMessageBatch.NumMessages() > 0 {
  err := sender.SendMessageBatch(context.TODO(), currentMessageBatch, nil)

  if err != nil {
    panic(err)
  }
}

Receive messages

Once you've created a Client you can create a Receiver, which will allow you to receive messages.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

receiver, err := client.NewReceiverForQueue(
  "<queue>",
  nil,
)
// or
// client.NewReceiverForSubscription("<topic>", "<subscription>")

// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err := receiver.ReceiveMessages(ctx,
  // The number of messages to receive. Note this is merely an upper
  // bound. It is possible to get fewer message (or zero), depending
  // on the contents of the remote queue or subscription and network
  // conditions.
  1,
  nil,
)

if err != nil {
  panic(err)
}

for _, message := range messages {
  // The message body is a []byte. For this example we're assuming that the body
  // is a string, converted to bytes, but any []byte payload is valid.
  var body []byte = message.Body
  fmt.Printf("Message received with body: %s\n", string(body))

  // For more information about settling messages, see:
  // https://learn.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
  err = receiver.CompleteMessage(context.TODO(), message, nil)

  if err != nil {
    var sbErr *azservicebus.Error

    if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
      // The message lock has expired. This isn't fatal for the client, but it does mean
      // that this message can be received by another Receiver (or potentially this one).
      fmt.Printf("Message lock expired\n")

      // You can extend the message lock by calling Receiver.RenewMessageLock(msg) before the
      // message lock has expired.
      continue
    }

    panic(err)
	}

  fmt.Printf("Received and completed the message\n")
}

Dead letter queue

The dead letter queue is a sub-queue. Each queue or subscription has its own dead letter queue. Dead letter queues store messages that have been explicitly dead lettered using the Receiver.DeadLetterMessage function.

Opening a dead letter queue is a configuration option when creating a Receiver.

NOTE: To create a azservicebus.Client, see the Authenticate the client section, using a service principal or a Service Bus connection string.

deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
  &azservicebus.ReceiverOptions{
    SubQueue: azservicebus.SubQueueDeadLetter,
  })
// or 
// client.NewReceiverForSubscription("<topic>", "<subscription>", 
//   &azservicebus.ReceiverOptions{
//     SubQueue: azservicebus.SubQueueDeadLetter,
//   })

To see some example code for receiving messages using the Receiver, see the Receive messages example.

Troubleshooting

Logging

This module uses the classification-based logging implementation in azcore. To enable console logging for all SDK modules, set the environment variable AZURE_SDK_GO_LOGGING to all.

Use the azcore/log package to control log event output or to enable logs for azservicebus only. For example:

import (
  "fmt"
  azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
)

// print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// pick the set of events to log
azlog.SetEvents(
  // EventConn is used whenever we create a connection or any links (that is, receivers or senders).
  azservicebus.EventConn,
  // EventAuth is used when we're doing authentication/claims negotiation.
  azservicebus.EventAuth,
  // EventReceiver represents operations that happen on receivers.
  azservicebus.EventReceiver,
  // EventSender represents operations that happen on senders.
  azservicebus.EventSender,
  // EventAdmin is used for operations in the azservicebus/admin.Client
  azservicebus.EventAdmin,
)

Next steps

See the examples for using this library to send and receive messages to or from Service Bus queues, topics, and subscriptions.

Contributing

If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.

# Packages

Package admin provides `Client`, which can create and manage Queues, Topics and Subscriptions.

# Functions

NewClient creates a new Client for a Service Bus namespace, using a TokenCredential.
NewClientFromConnectionString creates a new Client for a Service Bus namespace using a connection string.

# Constants

CodeConnectionLost means our connection was lost and all retry attempts failed.
CodeLockLost means that the lock token you have for a message has expired.
CodeTimeout means the service timed out during an operation.
CodeUnauthorizedAccess means the credentials provided are not valid for use with a particular entity, or have expired.
EventAdmin is used for operations in the azservicebus/admin.Client.
EventAuth is used when we're doing authentication/claims negotiation.
EventConn is used whenever we create a connection or any links (ie: receivers, senders).
EventReceiver represents operations that happen on Receivers.
EventSender represents operations that happen on Senders.
MessageStateActive indicates the message is active.
MessageStateDeferred indicates the message is deferred.
MessageStateScheduled indicates the message is scheduled.
ReceiveModePeekLock will lock messages as they are received and can be settled using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message functions.
ReceiveModeReceiveAndDelete will delete messages as they are received.
SubQueueDeadLetter targets the dead letter queue for a queue or subscription.
SubQueueTransfer targets the transfer dead letter queue for a queue or subscription.

# Variables

ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add() or if the message is being sent on its own and is too large for the link.

# Structs

AbandonMessageOptions contains optional parameters for Client.AbandonMessage.
AddAMQPAnnotatedMessageOptions contains optional parameters for the AddAMQPAnnotatedMessage function.
AddMessageOptions contains optional parameters for the AddMessage function.
AMQPAnnotatedMessage represents the AMQP message, as received from Service Bus.
AMQPAnnotatedMessageBody represents the body of an AMQP message.
AMQPAnnotatedMessageHeader carries standard delivery details about the transfer of a message.
AMQPAnnotatedMessageProperties represents the properties of an AMQP message.
CancelScheduledMessagesOptions contains optional parameters for the CancelScheduledMessages function.
Client provides methods to create Sender and Receiver instances to send and receive messages from Service Bus.
ClientOptions contains options for the `NewClient` and `NewClientFromConnectionString` functions.
CompleteMessageOptions contains optional parameters for the CompleteMessage function.
DeadLetterOptions describe the reason and error description for dead lettering a message using the `Receiver.DeadLetterMessage()`.
DeferMessageOptions contains optional parameters for Client.DeferMessage.
GetSessionStateOptions contains optional parameters for the GetSessionState function.
Message is a message with a body and commonly used properties.
No description provided by the author
MessageBatchOptions contains options for the `Sender.NewMessageBatch` function.
NewSenderOptions contains optional parameters for Client.NewSender.
PeekMessagesOptions contains options for the `Receiver.PeekMessages` function.
ReceiveDeferredMessagesOptions contains optional parameters for the ReceiveDeferredMessages function.
ReceivedMessage is a received message from a Client.NewReceiver().
ReceiveMessagesOptions are options for the ReceiveMessages function.
Receiver receives messages using pull based functions (ReceiveMessages).
ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` functions.
RenewMessageLockOptions contains optional parameters for the RenewMessageLock function.
RenewSessionLockOptions contains optional parameters for the RenewSessionLock function.
ScheduleAMQPAnnotatedMessagesOptions contains optional parameters for the ScheduleAMQPAnnotatedMessages function.
ScheduleMessagesOptions contains optional parameters for the ScheduleMessages function.
SendAMQPAnnotatedMessageOptions contains optional parameters for the SendAMQPAnnotatedMessage function.
No description provided by the author
SendMessageBatchOptions contains optional parameters for the SendMessageBatch function.
SendMessageOptions contains optional parameters for the SendMessage function.
SessionReceiver is a Receiver that handles sessions.
SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` functions.
SetSessionStateOptions contains optional parameters for the SetSessionState function.

# Type aliases

Code is an error code, usable by consuming code to work with programatically.
Error represents a Service Bus specific error.
MessageState represents the current state of a message (Active, Scheduled, Deferred).
NewWebSocketConnArgs are passed to your web socket creation function (ClientOptions.NewWebSocketConn).
ReceiveMode represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`.
RetryOptions controls how often operations are retried from this client and any Receivers and Senders created from this client.
SubQueue allows you to target a subqueue of a queue or subscription.