# README
RabbitBus
This repository is a fork of https://github.com/rhinof/grabbit. RabbitBus separated transactional database provider from source code. We'll manage a separate project to use database providers as driver to store transactional information.
RabbitMQ based service bus for GoLang
How to use
Install library in your existing go project
go get github.com/logiqbits/go-rabbitbus
Examples
Pattern wise messaging examples
Command-Reply
Service bus builder
func createServiceBus(conn, serviceName string) gbus.Bus {
return builder.
New().
Bus(conn).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(serviceName)
}
Define command and reply structure, connections and service name strings
type Command1 struct{}
func (Command1) SchemaName() string {
return "cmd1"
}
type Reply1 struct{}
func (Reply1) SchemaName() string {
return "reply1"
}
connection := "amqp://guest:guest@localhost"
commandServiceName := "svc.cmd"
replyServiceName := "svc.cmd.reply"
Command service
commandService := createServiceBus(connection, commandServiceName)
commandService.HandleMessage(Reply1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println("[Received Reply]")
return nil
})
commandService.Start()
defer commandService.Shutdown()
commandService.Send(context.Background(), replyServiceName, gbus.NewBusMessage(Command1{}))
Reply service
replyService := createServiceBus(connection, replyServiceName)
replyService.HandleMessage(Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println("[Received Command]")
log.Println("[Dispatching Reply]")
return invocation.Reply(context.Background(), gbus.NewBusMessage(Reply1{}))
})
replyService.Start()
defer replyService.Shutdown()
Pub-Sub
Define event
type Event1 struct {
Data string
}
func (Event1) SchemaName() string {
return "Event1"
}
Subscriber
connection := "amqp://guest:guest@localhost"
const eventName = "service.event.customname"
bus := builder.
New().
Bus(connection).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(eventName)
eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println(message.Payload)
return nil
}
bus.HandleEvent("test_exchange", "test_topic", Event1{}, eventHandler)
bus.Start()
defer bus.Shutdown()
Publisher
connection := "amqp://guest:guest@localhost"
const eventName = "service.event.customname"
bus := builder.
New().
Bus(connection).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(eventName)
err := b.Publish(context.Background(), "test_exchange", "test_topic", gbus.NewBusMessage(&Event1{Data: time.Now().String()}))
if err != nil {
log.Fatal(err)
}
bus.Start()
defer bus.Shutdown()
RPC
Defining service names, connection string, service bus builder method
const (
serverServiceName = "serverServiceName"
invokerServiceName = "invokerServiceName"
)
type RpcRequest struct {
Data string
}
func (RpcRequest) SchemaName() string {
return "logiqbits.rpc.request"
}
type RpcResponse struct {
Data string
}
func (RpcResponse) SchemaName() string {
return "logiqbits.rpc.response"
}
func createRpcBus(conn, svcName string) gbus.Bus {
return builder.
New().
Bus(conn).
WithPolicies(&policy.Durable{}).
WithConfirms().
WithDeadlettering("dead-logiqbits-rabbitbus").
PurgeOnStartUp().
Build(svcName)
}
Server
connection := "amqp://guest:guest@localhost"
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
req, ok := message.Payload.(*RpcRequest)
if !ok {
log.Fatalln("failed to parse request body")
}
return invocation.Reply(context.Background(), gbus.NewBusMessage(RpcResponse{
Data: fmt.Sprintf("Hello %s", req.Data),
}))
}
serverService := createRpcBus(connection, serverServiceName)
serverService.HandleMessage(RpcRequest{}, handler)
serverService.Start()
defer serverService.Shutdown()
Invoker
invokerService := createRpcBus(connection, invokerServiceName)
invokerService.Start()
defer invokerService.Shutdown()
log.Println("Sending RPC")
reply, err := invokerService.RPC(
context.Background(),
serverServiceName,
gbus.NewBusMessage(RpcRequest{Data: "Mr. Jack"}),
gbus.NewBusMessage(RpcResponse{}),
5*time.Second)
if err != nil {
log.Fatal(err)
}
log.Println(reply.Payload) // should be 'Hello Mr. Jack'