Categorygithub.com/xyctruth/final
modulepackage
0.0.0-20220122103129-5891a381dd39
Repository: https://github.com/xyctruth/final.git
Documentation: pkg.go.dev

# README

Final

Go Report Card codecov Build status LICENSE status

简介

Final 使用本地消息表实现最终一致性

使用

初始化

db, err := sql.Open("mysql", mysqlConnStr)
if err != nil {
  panic(err)
}

mqProvider := amqp.NewProvider(amqpConnStr)

bus := final.New("send_svc", db, mqProvider, final.DefaultOptions())
bus.Start()

更多的选项配置在 options.go

订阅

bus.Subscribe("topic1").Middleware(common.Middleware1, common.Middleware2).Handler(common.EchoHandler)

common.Middleware1,common.Middleware2,common.EchoHandler 的代码在 common.go

发布

msg := common.GeneralMessage{Type: "simple message", Count: 100}
msgBytes, _ := msgpack.Marshal(msg)
err := bus.Publish("topic1", msgBytes, message.WithConfirm(true))
if err != nil {
  panic(err)
}

更多消息发布策略在 message_policy.go

关联本地事务发布

database/sql

tx, _ := _example.NewDB().Begin()

/* return err rollback,return nil commit */
err := bus.Transaction(tx, func(txBus *final.TxBus) error {
  // 本地业务
  _, err := tx.Exec("INSERT INTO local_business (remark) VALUE (?)", "sql local business")
  if err != nil {
    return err
  }
  
  // 发布消息
  msg := common.GeneralMessage{Type: "sql transaction message", Count: 100}
	msgBytes, _ := msgpack.Marshal(msg)
  
  err = txBus.Publish("topic1", msgBytes)
  if err != nil {
    return err
  }
  return nil
})

if err != nil {
  panic(err)
}

gorm

tx := _example.NewGormDB().Begin()

/* return err rollback,return nil commit */
err = bus.Transaction(tx.Statement.ConnPool.(*sql.Tx), func(txBus *final.TxBus) error {
   // 本地业务
  result := tx.Create(&_example.LocalBusiness{
    Remark: "gorm local business",
  })
  if result.Error != nil {
    return result.Error
  }

   // 发送消息
  msg := common.GeneralMessage{Type: "gorm transaction message", Count: 100}
  msgBytes, _ := msgpack.Marshal(msg)
  err = txBus.Publish("topic1", msgBytes)
  if err != nil {
    return err
  }
  return nil
})

if err != nil {
  panic(err)
}

# Packages

No description provided by the author
No description provided by the author

# Functions

DefaultOptions bus 默认配置.
New 初始化Bus db sql.DB 本地消息表使用到的db mqProvider mq.IProvider mq驱动实现,用于与消息队列交互, amqp 的实现 amqp_provider.Provider .

# Constants

客户端发送消息,消息表中的默认状态, 等待 mq 的 confirm ack.

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author