package
0.0.12
Repository: https://github.com/fankane/go-utils.git
Documentation: pkg.go.dev

# README

memory

介绍

memory是一个自研的基于内存的消息队列,支持按topic发送和消费数据

  • 生产者可以给任意topic队列发送消息,可以多个生产者
  • 一个消费者同时只能消费一个topic的数据,可以同时有多个消费者(消费不同topic即可)
  • 支持配置消息最大数量,以及消息最大占用内存
  • 可获取整个队列里面消息条数和占用内存大小,也可或者指定topic里面的消息条数和占用内存大小
  • 如果服务重启,尚未消费的数据会丢失,处理方式
    • 1)可以获取队列里面还有多少条数据未消费,配合服务优雅升级时,处理完剩余消息
    • 2)可以服务重启前,备份队列数据到文件,启动的时候,从文件加载到内存

流程图

avatar

  • 轮询逻辑
    • 轮询是否有到了执行时间的消息时,查询队列里面最早需要执行的消息,如果没到时间,计算还需要多久,然后再次查询
    • 如果等待的时间里面,有新消息加入,判断是否比原来的预期时间更早,则更新等待时间。

使用样例

  1. 在入口,比如 main.go 里面隐式导入 memory 包路径
import _ "github.com/fankane/go-utils/plugin/queue/memory"
  1. 在运行文件根目录下的 system_plugin.yaml 文件(没有则新建一个)里面添加如下内容
plugins:
  queue:  # 插件类型
    memory:
      buffer_size: 1000   #队列缓冲区,不填默认 1000,超过缓冲区大小的,到了消费的时间,入消费队列会阻塞
      max_size: 10240000  # 占用内存上限,单位B,不填默认无限制
      max_len: 3000       # 堆积消息数量上限,不填默认无限制
      load_at_begin: true         #启动时加载数据,默认false
      load_file: "./backup"       #当load_at_begin=true时必填
  1. 在需要使用的地方,直接使用
// 如果不以插件形式使用,则需要手动调用一次 InitQueue()

// 发送消息到队列
NewProducer().SendMessage(topic, []byte("hello"), Delay(time.Second)) //发送消息到队列,延迟1秒消费

// 消费数据
RegisterHandler(topic, func(ctx context.Context, value []byte) error {
    fmt.Println("消费数据:", string(value))
    return nil
})