# README

RabbitMQ Client

Клиент для взаимодейсвия с RabbitMQ

Usage

Client

var rabbitClientConfiguration = mq.Config {
	User: "user",
	Address:  event.AddressConfiguration{
		IP:   "127.0.0.1",
		Port: "5672",
	},
	Password: "password",
}

func main() {
  client := NewRabbitClient()
  client.ReceiveConfiguration(rabbitClientConfiguration)
  client.Close()
}

Consumer

func callback(delivery Delivery) {
    defer func() { _ = delivery.Ack().Release() }() //обязательно вызывать Release, чтобы сообщения не копились
    fmt.Println(string(delivery.GetMessage()))
}

func errorHandler(err error) {
	fmt.Println(err)
}


func main() {
	mapConsumers := map[string]mq.ConsumerCfg{
		"exampleConsumer": mq.ByOneConsumerCfg{
			CommonConsumerCfg: mq.CommonConsumerCfg{
				QueueName:     "example.queue", //Название очереди
				PrefetchCount: 1000,
				DeadLetter:    true, //Включение Dead Letter Exchange
			},
			Callback:     callback,     //Функция обработки сообщений из очереди
			ErrorHandler: errorHandler, //Функция обработки ошибок очереди
		},
	}

  client := NewRabbitClient()
  client.ReceiveConfiguration(rabbitClientConfiguration,
     WithConsumers(mapConsumers),
  )
  client.Close()
}

Publisher

func main() {
	mapPublishers := map[string]mq.PublisherCfg{
		"examplePublisher": {
			ExchangeName: "example.exchange", //Название точки маршрутизации
			RoutingKey:   "example.queue",  //Ключ маршрутизации
		},
    }

  client := NewRabbitClient()
  client.ReceiveConfiguration(rabbitClientConfiguration,
     WithPublishers(mapPublishers),
  )
  client.GetPublisher("example").Publish(amqp.Publishing{Body: []byte("example")})
  client.Close()
}

Example

Пример показывает применение Dead Letter Exchange Использовано мануальное объявление declareCfg

const (
    //publisherExchange = "example.exchange"
    defaultQueueName = "exampleQ"
)

var rabbitClientConfiguration = structure.RabbitConfig {
    User: "user",
    Address: structure.AddressConfiguration{
        IP:   "127.0.0.1",
        Port: "5672",
    },
    Password: "password",
}

func callback(delivery mq.Delivery) {
    msg := string(delivery.GetMessage())
    if msg == "DLX" {
        log.Printf("Nack(false) Received a message: %s", msg)
        _ = delivery.Nack(false).Release()
        return
    }
    log.Printf("Ack         Received a message: %s", msg)
    _ = delivery.Ack().Release()
}

func errorHandler(err error) {
    fmt.Println(err)
}

func main() {
    mapConsumers := map[string]mq.ConsumerCfg{
        "exampleConsumer": mq.ByOneConsumerCfg{
            CommonConsumerCfg: mq.CommonConsumerCfg{
                QueueName:     defaultQueueName, //Название очереди
                PrefetchCount: 1000,
            },
            Callback:     callback, //Функция обработки сообщений из очереди
            ErrorHandler: errorHandler, //Функция обработки ошибок очереди
        },
    }
    
    mapPublishers := map[string]mq.PublisherCfg{
        "examplePublisher": {
            //ExchangeName: publisherExchange, //Название точки маршрутизации
            RoutingKey:   defaultQueueName, //Ключ маршрутизации
            DeadLetter:    true, //enable DLX
        },
    }
    
    declareCfg := mapPublishers["examplePublisher"].GetDefaultDeclarations()
    
    client := mq.NewRabbitClient()
    client.ReceiveConfiguration(rabbitClientConfiguration,
        mq.WithConsumers(mapConsumers),
        mq.WithPublishers(mapPublishers),
        mq.WithDeclares(declareCfg),
    )
    
    // Подключение происходит асинхронно, для данного примера проще просто подождать
    time.Sleep(200 *time.Millisecond)
    
    err := client.GetPublisher("examplePublisher").Publish(amqp.Publishing{Body: []byte("example")})
    if err != nil {
        log.Printf("%s: %s", "example", err)
    }
    err = client.GetPublisher("examplePublisher").Publish(amqp.Publishing{Body: []byte("DLX")})
    if err != nil {
        log.Printf("%s: %s", "DLX", err)
    }
    
    time.Sleep(500 * time.Millisecond)
    client.Close()
}

Implemented methods

  • NewRabbitClient()
  • (*RabbitMqClient) ReceiveConfiguration()
  • (*RabbitMqClient) Close()
  • (*RabbitMqClient) GetPublisher()
  • WithAwaitConsumersTimeout()
  • WithConsumers()
  • WithPublishers()
  • (*Delivery) GetMessage()
  • (d *Delivery) Ack()
  • (d *Delivery) Nack()
  • (*Delivery) Release()

NewRabbitClient() *rabbitMqClient

Возвращает пустой объект rabbitMqClient

client := NewRabbitClient()

(*RabbitMqClient) ReceiveConfiguration(rabbitConfig RabbitConfig, opts ...Option)

Инициализирует объект rabbitMqClient

client.ReceiveConfiguration(rabbitClientConfiguration,
	       WithPublishers(mapPublishers),
           WithConsumers(mapConsumers),
           WithDeclares(declareCfg),
        )

(*RabbitMqClient) Close()

Прекращает взаимодействие с Rabbit

client.Close()

(*RabbitMqClient) GetPublisher(name string) *publisher

Возвращает объект, публикующий сообщения в очередь

client.GetPublisher("example")

WithAwaitConsumersTimeout(timeout time.Duration) Option

Переопределяет время ожидания остановки работы потребителей сообщений

client.WithAwaitConsumersTimeout(5 * time.Second)

WithConsumers(consumers map[string]Consumer) Option

Инициализирует подписки на очереди, взаимодействие с сообщениями происходит через callback-функцию, принимающую на вход Delivery

WithConsumers(mapConsumers)

WithPublishers(publishers map[string]Publisher) Option

Инициализирует взаимодействия с очередями для публикаций сообщений

WithPublishers(mapPublishers)

func WithDeclares(declare DeclareCfg) Option

Инициализирует очереди, обменники и привязки

WithDeclares(declareCfg)

(*Delivery) GetMessage() []byte

Возвращает тело сообщения

delivery.GetMessage()

(d *Delivery) Ack() *Delivery

Изменяет флаг, указывающий что сообщение потребится из очереди

delivery.Ack()

(d *Delivery) Nack(requeue bool) *Delivery

Изменяет флаг, указывающий что сообщение вернется в очередь, если очередь поддерживает Dead Letter, то сообщение будет отправлено в очередь имя_очереди.DLX

delivery.Nack(false)

(d *Delivery) Release(ack, multiple, requeue bool) error

Потребляет сообщение из очереди или возвращает его обратно

delivery.Release()