# README
Broker
Include default broker (Kafka & RabbitMQ), or other broker (GCP PubSub, STOMP/AMQ) can be found in candi plugin.
Kafka
Register Kafka broker in service config
Modify configs/configs.go
in your service
package configs
import (
"github.com/golangid/candi/broker"
...
// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
...
brokerDeps := broker.InitBrokers(
broker.NewKafkaBroker(),
)
...
}
If you want to use Kafka consumer, just set USE_KAFKA_CONSUMER=true
in environment variable, and follow this example.
If you want to use Kafka publisher in your usecase, follow this example code:
package usecase
import (
"context"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/codebase/factory/dependency"
"github.com/golangid/candi/codebase/factory/types"
"github.com/golangid/candi/codebase/interfaces"
)
type usecaseImpl {
kafkaPub interfaces.Publisher
}
func NewUsecase(deps dependency.Dependency) Usecase {
return &usecaseImpl{
kafkaPub: deps.GetBroker(types.Kafka).GetPublisher(),
}
}
func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
err := uc.kafkaPub.PublishMessage(ctx, &candishared.PublisherArgument{
Topic: "example-topic",
Data: "hello world",
})
return err
}
RabbitMQ
Register RabbitMQ broker in service config
Modify configs/configs.go
in your service
package configs
import (
"github.com/golangid/candi/broker"
...
// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
...
brokerDeps := broker.InitBrokers(
broker.NewRabbitMQBroker(),
)
...
}
If you want to use RabbitMQ consumer, just set USE_RABBITMQ_CONSUMER=true
in environment variable, and follow this example.
If you want to use RabbitMQ publisher in your usecase, follow this example code:
package usecase
import (
"context"
"github.com/golangid/candi/broker"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/codebase/factory/dependency"
"github.com/golangid/candi/codebase/factory/types"
"github.com/golangid/candi/codebase/interfaces"
)
type usecaseImpl {
rabbitmqPub interfaces.Publisher
}
func NewUsecase(deps dependency.Dependency) Usecase {
return &usecaseImpl{
rabbitmqPub: deps.GetBroker(types.RabbitMQ).GetPublisher(),
}
}
func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
err := uc.rabbitmqPub.PublishMessage(ctx, &candishared.PublisherArgument{
Topic: "example-topic",
Data: "hello world"
Header: map[string]interface{}{
broker.RabbitMQDelayHeader: 5000, // if you want set delay consume your message by active consumer for 5 seconds
},
})
return err
}
# Functions
GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern.
GetDefaultKafkaConfig construct default kafka config.
InitBrokers register all broker for publisher or consumer
* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env
KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION
* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env
RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME
*/.
KafkaSetBrokerHost set custom broker host.
KafkaSetConfig set custom sarama configuration.
KafkaSetPublisher set custom publisher.
KafkaSetWorkerType set worker type.
NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration (with default worker type is types.Kafka).
NewKafkaPublisher setup only kafka publisher with client connection.
NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment (with default worker type is types.RabbitMQ).
NewRabbitMQPublisher setup only rabbitmq publisher with client connection.
NewRedisBroker setup redis for publish message (with default worker type is types.RedisSubscriber).
ParseRedisPubSubKeyTopic parse key to redis message.
RabbitMQSetBrokerHost set custom broker host.
RabbitMQSetChannel set custom channel configuration.
RabbitMQSetExchange set exchange.
RabbitMQSetPublisher set custom publisher.
RabbitMQSetWorkerType set worker type.
RedisSetConfigCommands set config commands.
RedisSetSubscribeChannels set channels.
RedisSetWorkerType set worker type.
# Constants
RabbitMQDelayHeader header key, value in millisecond.
RedisBrokerKey key constant.
# Structs
Broker model.
KafkaBroker configuration.
RabbitMQBroker broker.
RabbitMQPublisher rabbitmq.
No description provided by the author
RedisMessage messaging model for redis subscriber key.
# Type aliases
KafkaOptionFunc func type.
RabbitMQOptionFunc func type.
RedisOptionFunc func type.