Categorygithub.com/danube-messaging/danube-go
modulepackage
0.2.3
Repository: https://github.com/danube-messaging/danube-go.git
Documentation: pkg.go.dev

# README

Danube-go client

The Go Client library for interacting with Danube Messaging Broker platform.

Danube is an open-source distributed Messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

Example usage

Check out the example files.

Start the Danube server

Use the instructions from the documentation to run the Danube broker/cluster.

Create Producer

client := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()

ctx := context.Background()
topic := "/default/test_topic"
producerName := "test_producer"

producer, err := client.NewProducer(ctx).
    WithName(producerName).
    WithTopic(topic).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}
log.Printf("The Producer %s was created", producerName)

payload := fmt.Sprintln("Hello Danube")

// Convert string to bytes
bytes_payload := []byte(payload)

// You can send the payload along with the user defined attributes, in this case is nil
messageID, err := producer.Send(ctx, bytes_payload, nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}
log.Printf("The Message with id %v was sent", messageID)

Create Consumer

client := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()

ctx := context.Background()
topic := "/default/test_topic"
consumerName := "test_consumer"
subscriptionName := "test_subscription"
subType := danube.Exclusive

consumer, err := client.NewConsumer(ctx).
    WithConsumerName(consumerName).
    WithTopic(topic).
    WithSubscription(subscriptionName).
    WithSubscriptionType(subType).
    Build()
if err != nil {
    log.Fatalf("Failed to initialize the consumer: %v", err)
}

// Request to subscribe to the topic and create the resources on the Danube Broker
if err := consumer.Subscribe(ctx); err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}
log.Printf("The Consumer %s was created", consumerName)

// Request to receive messages
stream, err := consumer.Receive(ctx)
if err != nil {
    log.Fatalf("Failed to receive messages: %v", err)
}

// consume the messages from the go channel
for msg := range stream {
    fmt.Printf("Received message: %+v\n", string(msg.GetPayload()))

    // Acknowledge the message
    if _, err := consumer.Ack(ctx, msg); err != nil {
        log.Fatalf("Failed to acknowledge message: %v", err)
    }
}

Contribution

Working on improving and adding new features. Please feel free to contribute or report any issues you encounter.

Use latest DanubeApi.proto file

Make sure the proto/DanubeApi.proto is the latest from Danube project.

If not replace the file and add at the top of the file

option go_package = "github.com/danube-messaging/danube-go/proto";

right after the package danube;

In order to generate the Go grpc code you need the following packages installed:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

And generate the Go code from the proto file:

protoc --proto_path=./proto --go_out=./proto --go-grpc_out=./proto --go_opt=paths=source_relative      --go-grpc_opt=paths=source_relative proto/DanubeApi.proto

# Packages

No description provided by the author
No description provided by the author

# Functions

Convert Protobuf Schema to Schema.
Convert Protobuf TypeSchema to SchemaType.
NewClient initializes a new DanubeClientBuilder.
NewConfigDispatchStrategy creates a new ConfigDispatchStrategy instance.
NewLookupService creates a new instance of LookupService.
No description provided by the author
NewReliableDispatchStrategy creates a new reliable ConfigDispatchStrategy instance.
NewReliableOptions creates a new ReliableOptions instance.
NewSchema creates a new Schema instance with the specified name, type, and optional JSON schema data.
Convert Protobuf Schema to JSON.
WithConnectionTimeout configures the connection timeout for the connection.
WithKeepAliveInterval configures the keepalive interval for the connection.

# Constants

Exclusive - only one consumer can subscribe to a specific subscription.
FailOver - similar to exclusive subscriptions, but multiple consumers can subscribe, and one actively receives messages.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
Shared - multiple consumers can subscribe, messages are delivered round-robin.

# Structs

No description provided by the author
ConfigDispatchStrategy represents the dispatch strategy for a topic.
Consumer represents a message consumer that subscribes to a topic and receives messages.
ConsumerBuilder is a builder for creating a new Consumer instance.
No description provided by the author
DanubeClient is the main client for interacting with the Danube messaging system.
DanubeClientBuilder is used for configuring and creating a DanubeClient instance.
LookupResult holds the result of a topic lookup.
No description provided by the author
Producer represents a message producer that is responsible for sending messages to a specific partitioned or non-partitioned topic on a message broker.
ProducerBuilder is a builder for creating a new Producer instance.
No description provided by the author
ReliableOptions represents configuration options for reliable dispatch strategy.
Schema represents the structure of data, including its type and associated schema data.

# Type aliases

DialOption is a function that configures gRPC dial options.
RetentionPolicy represents the retention policy for messages in the topic.
SchemaType represents the type of schema used for data serialization and validation.
the type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).