modulepackage
0.0.0-20230209124253-5cfe7a909806
Repository: https://github.com/obgnail/mysql-river.git
Documentation: pkg.go.dev
# README
Mysql River

introduction
解析 mysql binlog,提供简单易用的同步方案。
内置三个电池:
- TraceLog:将 binlog 实时翻译为 sql 语句。
- ElasticSearchSync:将 binlog 的数据同步到 es 中。
- KafkaBroker:将 binlog 的数据同步到 Kafka 中,和 MySQL 彻底解耦。
feather
mysql-river 内置 auto position saver 和 auto health checker 两个功能:
- auto position saver:自动记录 river 的处理进展,将其保存为 master.info 文件。当 river 挂掉重启后依旧可以恢复进展,不必担心数据丢失。
- auto health checker:提供健康检测接口。当 river 的进展和 mysql binlog 的进展差值超过阈值时,触发对应函数。
health check rule
对比 master.info
(file-pos) 和 canal.GetMasterPos()
(db-pos) 的 position 信息,当触发规则时,调用对应函数。可以对接自动告警功能。
- 当获取 db-pos 失败时, 健康状态为 red
- 当 db-pos 跟 file-pos 相差在阈值内, 健康状态为 green
- 当 db-pos 跟 file-pos 相关在阈值外时, 健康状态为 yellow
- 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 没有变化时,且 db-pos 跟 file-pos 相等时 健康状态为 green
- 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 没有变化时,且 db-pos 大于 file-pos 时 健康状态为 red
- 当 db-pos 跟 上次记录的 db-pos 没有变化时,且file-pos 跟 上次记录的 file-pos 有变化时, 健康状态为 green
- 当 db-pos 跟 上次记录的 db-pos 有变化时, 且 file-pos 跟 上次记录的 file-pos 没有变化时, 健康状态为 red
- 当 db-pos 跟 上次记录的 db-pos 有变化时, 且 file-pos 跟 上次记录的 file-pos 有变化时, 健康状态为 green
Usage
只需实现 Handler 接口:
- OnEvent:核心函数。river 会自动解析 mysql binlog 文件,将 20+ 种 event 归纳为 insert、update、delete、ddl、gtid、xid、rotate、table_changed 几种。
- OnAlert:auto health check 不通过时自动调用此函数,可以对接自动告警功能。
- OnClose:river 发生不可恢复错误时,自动调用此函数,可以用此关闭 handler 或对接自动告警功能。
type Handler interface {
String() string
OnEvent(event *EventData) error
OnAlert(msg *StatusMsg) error
OnClose(river *River) // OnEvent、OnAlert抛出的error会触发OnClose
}
type EventData struct {
// insert、update、delete、ddl、gtid、xid、rotate、table_changed
EventType string `json:"event_type"`
ServerID uint32 `json:"server_id"`
LogName string `json:"log_name"`
LogPos uint32 `json:"log_pos"`
Db string `json:"db"`
Table string `json:"table"`
SQL string `json:"sql"` // 仅当EventType为ddl有值
GTIDSet string `json:"gtid_set"`
Primary []string `json:"primary"` // 主键字段;EventType为insert、update、delete时有值
Before map[string]interface{} `json:"before"` // 变更前数据, insert 类型的 before 为空
After map[string]interface{} `json:"after"` // 变更后数据, delete 类型的 after 为空
Timestamp uint32 `json:"timestamp"` // 事件时间
}
type StatusMsg struct {
Status HealthStatus
LastStatus HealthStatus
Reason []string // 发生告警时的消息(可能有多条不通过)
FilePos *mysql.Position
DBPos *mysql.Position
CheckInterval time.Duration
PosThreshold int
}
example
package main
import (
"fmt"
"github.com/obgnail/mysql-river/river"
"time"
)
var config = &river.Config{
MySQLConfig: &river.MySQLConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "root",
},
PosAutoSaverConfig: &river.PosAutoSaverConfig{
SaveDir: "./",
SaveInterval: 3 * time.Second,
},
HealthCheckerConfig: &river.HealthCheckerConfig{
CheckPosThreshold: 3000,
CheckInterval: 5 * time.Second,
},
}
func main() {
err := river.New(config).
SetHandler(river.NopCloserAlerter(func(event *river.EventData) error {
fmt.Println(event.EventType, event.LogName, event.LogPos, event.Before, event.After)
return nil
})).
Sync(river.FromFile) // 从 master.info 文件开始解析
PanicIfError(err)
}
Built-in battery
trace log
var config = &river.Config{
MySQLConfig: &river.MySQLConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "root",
},
PosAutoSaverConfig: &river.PosAutoSaverConfig{
SaveDir: "./",
SaveInterval: 3 * time.Second,
},
HealthCheckerConfig: &river.HealthCheckerConfig{
CheckPosThreshold: 3000,
CheckInterval: 5 * time.Second,
},
}
func main() {
traceConfig := &trace_log.Config{
DBs: []string{"testdb01"},
EntireFields: false,
ShowTxMsg: true,
Highlight: true,
}
handler := trace_log.New(traceConfig)
err := river.New(config).SetHandler(handler).Sync(river.FromDB) // 从最新位置开始解析
PanicIfError(err)
}
elastic search sync
var config = &river.Config{
MySQLConfig: &river.MySQLConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "root",
},
PosAutoSaverConfig: &river.PosAutoSaverConfig{
SaveDir: "./",
SaveInterval: 3 * time.Second,
},
HealthCheckerConfig: &river.HealthCheckerConfig{
CheckPosThreshold: 3000,
CheckInterval: 5 * time.Second,
},
}
func main() {
handlerConfig := &elasticsearch.EsHandlerConfig{
Host: "127.0.0.1",
Port: 9200,
User: "",
Password: "",
BulkSize: 128,
FlushInterval: time.Second,
SkipNoPkTable: true,
Rules: []*elasticsearch.Rule{
elasticsearch.NewDefaultRule("testdb01", "user"),
},
}
handler := elasticsearch.New(handlerConfig)
err := river.New(config).SetHandler(handler).Sync(river.FromDB)
PanicIfError(err)
}
kafka broker
因为引入了 kafka 这个组件,谁也不能保证 kafka 不会挂掉,进而引入了bbolt,系统会自动在指定位置生成 kafka_offset.bolt
。该文件会自动记录所有 partition 的 offset,并且在下次启动 river 的时候自动加载在此文件并自动进行偏移处理。
故,此机制是透明的。
var config = &river.Config{
MySQLConfig: &river.MySQLConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "root",
},
PosAutoSaverConfig: &river.PosAutoSaverConfig{
SaveDir: "./",
SaveInterval: 3 * time.Second,
},
HealthCheckerConfig: &river.HealthCheckerConfig{
CheckPosThreshold: 3000,
CheckInterval: 5 * time.Second,
},
}
func main() {
kafkaConfig := &kafka.Config{
Addrs: []string{"127.0.0.1:9092"},
Topic: "binlog",
OffsetStoreDir: "./",
Offset: nil,
UseOldestOffset: false,
}
handler, err := kafka.New(kafkaConfig)
PanicIfError(err)
go handler.Consume(func(msg *sarama.ConsumerMessage) error {
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n",
msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
return nil
})
err = river.New(config).SetHandler(handler).Sync(river.FromFile)
PanicIfError(err)
}
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author