# README
hstreamdb-go
Go Client for HStreamDB
Note that the current release is not suitable for production use - APIs are not yet stable and the package has not been thoroughly tested in real-life use.
Content
Installation
Go 1.19 or later is required.
Add the package to your project dependencies (go.mod).
go get github.com/hstreamdb/hstreamdb-go
Example Usage
Connect to HServer
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
)
func main() {
serverUrl := "hstream://localhost:6580,localhost:6581,localhost:6582"
client, err := hstream.NewHStreamClient(serverUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
// do sth.
client.Close()
}
Work with Streams
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
)
func main() {
// -------------- connect to server first --------------------
// Create a new stream with 1 replica, 5 shards, set the data retention to 1800s.
err := client.CreateStream("testStream",
hstream.WithReplicationFactor(1),
hstream.WithShardCount(5),
hstream.EnableBacklog(1800))
if err != nil {
log.Fatalf("Creating stream error: %s", err)
}
// List all streams
streams, err := client.ListStreams()
if err != nil {
log.Fatalf("Listing streams error: %s", err)
}
for _, stream := range streams {
log.Printf("Stream: %+v\n", stream)
}
// Delete stream
err := client.DeleteStream("testStream",
hstream.EnableForceDelete,
hstream.EnableIgnoreNoneExist)
if err != nil {
log.Fatalf("Delete stream error: %s", err)
}
}
Write to Stream
Write RawRecord
import (
"log"
"strconv"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
)
func main() {
//------------- connect to server and create related stream first --------------------
producer, err := client.NewProducer("testStream")
if err != nil {
log.Fatalf("Create producer error: %s", err)
}
defer producer.Stop()
res := make([]hstream.AppendResult, 0, 100)
for i := 0; i < 100; i++ {
rawRecord, err := Record.NewHStreamRawRecord("key-1", []byte("test-value"+strconv.Itoa(i)))
if err != nil {
log.Fatalf("Creating rawRecord error: %s", err)
}
r := producer.Append(rawRecord)
res = append(res, r)
}
for _, r := range res {
resp, err := r.Ready()
if err != nil {
log.Printf("Append error: %s", err)
} else {
log.Printf("Append response: %s", resp)
}
}
}
Write HRecord
import (
"log"
"strconv"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
)
func main() {
//------------- connect to server and create related stream first --------------------
producer, err := client.NewProducer("testStream")
if err != nil {
log.Fatalf("Create producer error: %s", err)
}
defer producer.Stop()
payload := map[string]interface{}{
"key1": "value1",
"key2": 123,
"key3": struct {
name string
age int
}{
name: "John",
age: 30,
},
}
hRecord, err := Record.NewHStreamHRecord("testStream", payload)
if err != nil {
log.Fatalf("Creating hRecord error: %s", err)
}
value := producer.Append(hRecord)
if resp, err := value.Ready(); err != nil {
log.Printf("Append error: %s", err)
} else {
log.Printf("Append response: %s", resp)
}
}
Batch Writter
import (
"fmt"
"log"
"sync"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
)
func main() {
//------------- connect to server and create related stream first --------------------
producer, err := client.NewBatchProducer("testStream",
// optional: set batch size and max batch bytes trigger
hstream.WithBatch(10, 1000),
// optional: set timeout trigger
hstream.TimeOut(-1),
// optional: set client compression
hstream.WithCompression(compression.Zstd),
// optional: set flow control
hstream.WithFlowControl(80 * 1024 * 1024))
defer producer.Stop()
keys := []string{"test-key1", "test-key2", "test-key3"}
rids := sync.Map{}
wg := sync.WaitGroup{}
wg.Add(3)
for _, key := range keys {
go func(key string) {
result := make([]hstream.AppendResult, 0, 100)
for i := 0; i < 100; i++ {
rawRecord, _ := Record.NewHStreamRawRecord("key-1", []byte(fmt.Sprintf("test-value-%s-%d", key, i)))
r := producer.Append(rawRecord)
result = append(result, r)
}
rids.Store(key, result)
wg.Done()
}(key)
}
wg.Wait()
rids.Range(func(key, value interface{}) bool {
k := key.(string)
res := value.([]hstream.AppendResult)
for idx, r := range res {
resp, err := r.Ready()
if err != nil {
log.Printf("write error: %s\n", err.Error())
}
log.Printf("[key: %s]: record[%d]=%s\n", k, idx, resp.String())
}
return true
})
}
Work with Subscriptions
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
)
func main() {
// -------------- connect to server and create related stream first --------------------
// Create a new subscription
streamName := "testStream"
subId := "SubscriptionId"
err := client.CreateSubscription(subId, streamName,
hstream.WithAckTimeout(60),
hstream.WithOffset(hstream.LATEST))
// List all subscriptions
subs, err := client.ListSubscriptions()
if err != nil {
log.Fatalf("Listing subscriptions error: %s", err)
}
for _, sub := range subs {
log.Printf("Subscription: %+v", sub)
}
}
Consume from Subscription
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
)
func main() {
// ------- connect to server and create related stream and subscription first -------
streamName := "testStream"
subId := "SubscriptionId"
consumer := client.NewConsumer("consumer-1", subId)
defer consumer.Stop()
dataCh := consumer.StartFetch()
fetchRes := make([]Record.RecordId, 0, 100)
for res := range dataCh {
if res.Err != nil {
log.Printf("Fetch error: %s\n", res.Err)
continue
}
for _, record := range res.Result {
rid := record.GetRecordId()
log.Printf("receive recordId: %s\n", rid.String())
fetchRes = append(fetchRes, rid)
record.Ack()
}
if len(fetchRes) == 100 {
break
}
}
}
Work with ShardReader
import (
"log"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
)
func main() {
// ------- connect to server and create related stream first -------
shards, err := client.ListShards(streamName)
if err != nil {
log.Fatalf("List Shards error: %s", err)
}
readerId := "reader"
reader, err := client.NewShardReader(streamName, readerId, shards[0].ShardId,
hstream.WithShardOffset(hstream.EarliestShardOffset),
hstream.WithReaderTimeout(100),hstream.WithMaxRecords(10))
if err != nil {
log.Fatalf("Create shard reader error: %s", err)
}
defer client.DeleteShardReader(shards[0].ShardId, readerId)
defer reader.Close()
// ------- make sure that data has been written to the target shard ------
readRecords := make([]Record.ReceivedRecord, 0, totalRecords)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
res, err := reader.Read(ctx)
s.NoError(err)
readRecords = append(readRecords, res...)
if len(readRecords) >= totalRecords {
break
}
}
}
# Packages
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author