# README
go-bus
Universal pub-sub library for rabbitmq and kafka in Go
Dev version - only for tests
Installation
go get github.com/hetacode/go-bus
Consumer implementation
General implementation
// fakeLogger is only for tests - you should use own implementation with own logger
type fakeLogger struct{}
func (l *fakeLogger) Infof(message string, args ...interface{}) {
log.Printf(message, args...)
}
func (l *fakeLogger) Errorf(message string, args ...interface{}) {
log.Printf(message, args...)
}
func main() {
eventsMapper := new(goeh.EventsMapper)
eventsMapper.Register(new(TestEvent))
done := make(<-chan os.Signal)
kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper, new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
Kind: &kind,
Exchanage: "test-ex",
Queue: "test-queue",
Server: "amqp://rabbit:5672",
})
go func() {
msgCh, errCh := bus.Consume()
for {
select {
case msg := <-msgCh:
log.Printf("Do something with received event: %+v", msg)
case err := <-errCh:
if err != nil {
panic(err)
}
}
}
}()
<-done
log.Printf("the end")
}
Bind queue to multiple exchanges
Consumer queue can be bind to multiple exchanges.
Important! In this case can be only zero or one routing key attach between exchange <-> queue
kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper, new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
Kind: &kind,
Exchange: "test-ex1|test-ex2",
Queue: "test-queue",
RoutingKey: "" // or "routing-key"
Server: "amqp://rabbit:5672",
})
Bind queue to exchange with multiple routing keys
Consumer queue can be bind to only one exchange with multiple routing keys.
kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper, new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
Kind: &kind,
Exchange: "test-ex", // one exchange
Queue: "test-queue",
RoutingKey: "routing-key1|routing-key2"
Server: "amqp://rabbit:5672",
})
Producer implementation
// fakeLogger is only for tests - you should use own implementation with own logger
type fakeLogger struct{}
func (l *fakeLogger) Infof(message string, args ...interface{}) {
log.Printf(message, args...)
}
func (l *fakeLogger) Errorf(message string, args ...interface{}) {
log.Printf(message, args...)
}
func main() {
eventsMapper := new(goeh.EventsMapper)
eventsMapper.Register(new(TestEvent))
kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper, new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
Kind: &kind,
Exchanage: "test-ex",
Server: "amqp://rabbit:5672",
})
n := 1
for n < 10 {
bus.Publish(&TestEvent{
EventData: &goeh.EventData{ID: strconv.Itoa(n)},
FullName: fmt.Sprintf("Janusz %d", n),
})
}
log.Printf("the end")
}
Retry option
It's a retry mechanism that can be add to both kafka and rabbit implementation of service bus. When send event function return any error a library try send the event again after some delay.
type RetryOptions struct {
Attempts int
Delay time.Duration
}
The RetryOption can be add to the both RabbitMQServiceBusOptions
or KafkaServiceBusOptions
# Functions
NewKafkaServiceBus instance eventsMapper is using only in consumer mode.
NewRabbitMQServiceBus new instance of queue.
# Constants
No description provided by the author
No description provided by the author
RabbitMQServiceBusOptionsFanOutKind fanout kind of rabbitmq.
RabbitMQServiceBusOptionsTopicKind topic kind of rabbitmq.
# Variables
No description provided by the author
# Structs
KafkaServiceBus implementation of service bus.
KafkaServiceBusOptions configuration struct for kafka service bus.
No description provided by the author
RabbitMQServiceBus implementation of service bus.
RabbitMQServiceBusOptions struct with configuration for rabbitmq service bus.
No description provided by the author
# Interfaces
No description provided by the author
ServiceBus general abstraction for bus.
No description provided by the author
# Type aliases
No description provided by the author