# README
Clickhouse Buffer
An easy-to-use, powerful and productive package for writing data to Clickhouse columnar database
Install
- for go-clickhouse v1
$ go get -u github.com/zikwall/clickhouse-buffer
- for go-clickhouse v2
$ go get -u github.com/zikwall/clickhouse-buffer/v2
Why and why
In the practice of using the Clickhouse database (in real projects),
you often have to resort to creating your own bicycles in the form of queues
and testers that accumulate the necessary amount of data or for a certain period of time
and send one large data package to the Clickhouse database.
This is due to the fact that Clickhouse is designed so that it better processes new data in batches (and this is recommended by the authors themselves).
Features
- non-blocking - (recommend) async write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out. Asynchronous write client is recommended for frequent periodic writes.
- blocking.
Client buffer engines:
- in-memory - use native channels and slices
- redis - use redis server as queue and buffer
- retries - resending "broken" or for some reason not sent packets
Usage
import (
"database/sql"
cxnative "github.com/zikwall/clickhouse-buffer/v2/src/database/native"
cxsql "github.com/zikwall/clickhouse-buffer/v2/src/database/sql"
)
// if you already have a connection to Clickhouse you can just use wrappers
// with native interface
ch := cxnative.NewClickhouseWithConn(conn: driver.Conn)
// or use database/sql interface
ch := cxsql.NewClickhouseWithConn(conn: *sql.DB)
// if you don't want to create connections yourself,
// package can do it for you, just call the connection option you need:
// with native interface
ch, conn, err := cxnative.NewClickhouse(ctx,&clickhouse.Options{
Addr: ctx.StringSlice("clickhouse-host"),
Auth: clickhouse.Auth{
Database: ctx.String("clickhouse-database"),
Username: ctx.String("clickhouse-username"),
Password: ctx.String("clickhouse-password"),
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 5 * time.Second,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
Debug: ctx.Bool("debug"),
})
// or with database/sql interface
ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{
Addr: ctx.StringSlice("clickhouse-host"),
Auth: clickhouse.Auth{
Database: ctx.String("clickhouse-database"),
Username: ctx.String("clickhouse-username"),
Password: ctx.String("clickhouse-password"),
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 5 * time.Second,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
Debug: ctx.Bool("debug"),
}, &cxsql.RuntimeOptions{})
Create main data streamer client and write data
import (
cx "github.com/zikwall/clickhouse-buffer/v2"
cxbuffer "github.com/zikwall/clickhouse-buffer/v2/src/buffer"
cxmemory "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory"
cxredis "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis"
)
// create root client
client := cx.NewClientWithOptions(ctx, ch,
cx.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
)
// create buffer engine
buffer := cxmemory.NewBuffer(
client.Options().BatchSize(),
)
// or use redis
buffer := cxredis.NewBuffer(
contetx, *redis.Client, "bucket", client.Options().BatchSize(),
)
// create new writer api: table name with columns
writeAPI := client.Writer(cxbuffer.View{
Name: "clickhouse_database.clickhouse_table",
Columns: []string{"id", "uuid", "insert_ts"},
}, buffer)
// define your custom data structure
type MyCustomDataView struct {
id int
uuid string
insertTS time.Time
}
// and implement cxbuffer.Inline interface
func (t *MyCustomDataView) Row() cxbuffer.RowSlice {
return cxbuffer.RowSlice{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}
// async write your data
writeAPI.WriteRow(&MyCustomDataView{
id: 1, uuid: "1", insertTS: time.Now(),
})
When using a non-blocking record, you can track errors through a special error channel
errorsCh := writeAPI.Errors()
go func() {
for err := range errorsCh {
log.Warning(fmt.Sprintf("clickhouse write error: %s", err.Error()))
}
}()
Using the blocking writer interface
// create new writer api: table name with columns
writerBlocking := client.WriterBlocking(cxbuffer.View{
Name: "clickhouse_database.clickhouse_table",
Columns: []string{"id", "uuid", "insert_ts"},
})
// non-asynchronous writing of data directly to Clickhouse
err := writerBlocking.WriteRow(ctx, []MyCustomDataView{
{
id: 1, uuid: "1", insertTS: time.Now(),
},
{
id: 2, uuid: "2", insertTS: time.Now(),
},
{
id: 3, uuid: "3", insertTS: time.Now(),
},
}...)
More
Retries:
By default, packet resending is disabled, to enable it, you need to call
(*Options).SetRetryIsEnabled(true)
.
- in-memory use channels (default)
- redis
- rabbitMQ
- kafka
You can implement queue engine by defining the Queueable
interface:
type Queueable interface {
Queue(packet *retryPacket)
Retries() <-chan *retryPacket
}
and set it as an engine:
cx.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)
Logs:
You can implement your logger by simply implementing the Logger interface and throwing it in options:
type Logger interface {
Log(message interface{})
Logf(format string, v ...interface{})
}
// example with default options
cx.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
Tests:
$ go test -v ./...
$ golangci-lint run --config ./.golangci.yml
Integration Tests:
export CLICKHOUSE_HOST=111.11.11.11:9000
export REDIS_HOST=111.11.11.11:6379
export REDIS_PASS=password_if_needed
$ go test -v ./... -tags=integration
TODO:
- buffer interfaces
- more retry buffer interfaces
- rewrite retry lib
- create binary app for streaming data to clickhouse
- client and server with HTTP interface
- client and server with gRPC interface