package
1.0.0
Repository: https://github.com/bilalislam/torc.git
Documentation: pkg.go.dev

# README

rabbitmq Wrapper

RabbitMq Wrapper is the a client API for RabbitMQ.

  • A wrapper over amqp exchanges and queues.
  • In memory retries for consuming messages when an error occured
  • CorrelationId and MessageId structure
  • Exchange Types With Direct, Fanout, Topic, ConsistentHashing, XDelayedMessage
  • Retry policy (immediately , interval)
  • Multiple consumers In a single process
  • Create goroutines and consume messages asynchronously
  • Disable consume messages asynchronously if you want
  • Retry to connect another node When RabbitMq Node is Down or Broken Connection
  • Add stack trace on the message header if the error occurred when the message is consumed
  • Some extra features while publishing message (will be added) Add support for rabbitmq-delayed-message-exchange plugin

To connect to a RabbitMQ broker...

	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost")

To connect to a RabbitMQ broker with retry policy

  • Consumer retries two times immediately if an error occured

     var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                                               rabbit.RetryCount(2,time.Duration(0)))
    
  • Create goroutines and consume messages asynchronously using PrefetchCount Prefix. Create as number of PrefetchCount as goroutines .

    	var rabbitClient=rabbit.NewRabbitMqClient([]string{"127.0.0.1","127.0.0.2"},"guest","guest","/virtualhost",
                    		rabbit.PrefetchCount(3))
                                                 
    

To send a message

    // Added
    

To consume a message

    onConsumed := func(message rabbit.Message) error {
    
    		var consumeMessage PersonV1
    		var err= json.Unmarshal(message.Payload, &consumeMessage)
    		if err != nil {
    			return err
    		}
    		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
    		return nil
    	}
    

    rabbitClient.AddConsumer("In.Person").
    SubscriberExchange("RoutinKey.*",rabbit.Direct ,"Person").
    HandleConsumer(onConsumed)

To Consume multiple messages

	onConsumed := func(message rabbit.Message) error {

		var consumeMessage PersonV1
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}

	onConsumed2 := func(message rabbit.Message) error {

		var consumeMessage PersonV4
		var err= json.Unmarshal(message.Payload, &consumeMessage)
		if err != nil {
			return err
		}
		fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
		return nil
	}
	rabbitClient.AddConsumer("In.Person3").
            SubscriberExchange("",rabbit.Fanout ,"ExchangeNamePerson").
            HandleConsumer(onConsumed)
            
    rabbitClient.AddConsumer("In.Person").
             SubscriberExchange("Person.*",rabbit.Direct ,"PersonV1").
             HandleConsumer(onConsumed2)
             

	rabbitClient.RunConsumers()

To Consume multiple exchange

    rabbitClient.AddConsumer("In.Lines").
    		SubscriberExchange("1", rabbit.ConsistentHashing,"OrderLineAdded").
    		SubscriberExchange("1", rabbit.ConsistentHashing,OrderLineCancelled).
    		WithSingleGoroutine(true).
    		HandleConsumer(onConsumed2)

Use rabbitmq_delayed_message_exchange plugin

     args := make(amqp.Table)
    args["x-delayed-type"] = "direct"

     rabbitClient.AddConsumer("In.Person").
			SubscriberExchangeWithArguments(
				"Person.*",
				rabbitmq.XDelayedMessage,
				"delay-messages",
				args,
			).
			HandleConsumer(onConsumed2)
			

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Constants

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author