Categorygithub.com/jjonline/share-mod-lib/queue
modulepackage
1.0.0
Repository: https://github.com/jjonline/share-mod-lib.git
Documentation: pkg.go.dev

# README

Queue 队列

一、说明

Queue队列为生产 -> 消费模型的简单实现,即:producer -> consumer(worker),一般分为生产端和消费端。

当前已实现开发测试用memory驱动和可用于生产的redis类型驱动。

⚠️ memory类型驱动仅可用于开发调试

由于多个独立进程间内存隔离,以及进程退出后进程所属内存销毁的原因,memory方案在进程退出后未消费的队列数据会丢失,故而仅能用于开发调试环境,且生产端和消费端只能在同一进程。

二、使用示例

完整使用示例查看 example 目录代码结构

step1、实现任务类

任务类即按任务类iface规则实现的结构体,也是队列投递任务和实际执行任务的单元。

package tasks

import (
    "fmt"
    "github.com/jjonline/go-mod-librar/queue"
)

// 定义的任务类struct,需完整实现 queue.TaskIFace
type TestTask struct {
    // 单个job最大执行时长、最大重试次数、多次重试之间间隔时长等设置
    // 这里使用默认设置,若需要自定义参数,自定义方法实现即可
    queue.DefaultTaskSetting
}

func (t TestTask) Name() string {
    return "test_task"
}

func (t TestTask) Execute(ctx context.Context, job *queue.RawBody) error {
    // 队列实际执行的入口方法,请注意处理 context.Context 内部用于超时控制 
    fmt.Println(job.ID)
    return nil
}

step2、消费者端注册启动

// 初始化队列Queue对象,生产者、消费者均通过该对象操作
// 重要:生产者、消费者均需要实例化
service := queue.New(
    queue.Redis, // 队列底层驱动器类型,详见包内常量
    redisClient, // 队列底层驱动client实例
    logger, // 实现 queue.Logger 接口的日志实例,用于记录日志
    5, // 单个队列最大并发消费协程数
)

// 注册单个任务类
_ = service.BootstrapOne(&tasks.TestTask{})

// 也可以这样批量注册任务类
// _ = service.Bootstrap([]*queue.TaskIFace)

// 启动消费端进程,注意传递上下文context用于控制进程优雅控制
idleCloser := make(chan struct{})

// 接收退出信号
quitChan := make(chan os.Signal)
signal.Notify(
    quitChan,
    syscall.SIGINT,  // 用户发送INTR字符(Ctrl+C)触发
    syscall.SIGTERM, // 结束程序
    syscall.SIGHUP,  // 终端控制进程结束(终端连接断开)
    syscall.SIGQUIT, // 用户发送QUIT字符(Ctrl+/)触发
)

go func() {
    // wait exit signal
    <-quitChan

    logger.Info("receive exit signal")

    // shutdown worker daemon with timeout context
    timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // graceful shutdown by signal
    if err := queueService.ShutDown(timeoutCtx); nil != err {
        logger.Warn("violence shutdown by signal: " + err.Error())
    } else {
        logger.Info("graceful shutdown by signal")
    }

    // closer close
    close(idleCloser)
}()

// start worker daemon
if err := queueService.Start(); nil != err && err != queue.ErrQueueClosed {
    logger.Info("queue started failed: " + err.Error())
    close(idleCloser)
} else {
    logger.Info("queue worker started")
}

<-idleCloser
logger.Info("queue worker quit, daemon exited")

step3、生产者端投递job任务

// 初始化队列Queue对象,生产者、消费者均通过该对象操作
// 生产者&&消费者处于同一进程则可共用,不同进程则需要各自独立实例化
service := queue.New(
    queue.Redis, // 队列底层驱动器类型,详见包内常量
    redisClient, // 队列底层驱动client实例
    logger, // 实现 queue.Logger 接口的日志实例,用于记录日志
)

// 单个任务类:若生产者端和消费者端分处不同进程,生产者端任务类也需要执行注册

// 投递一条普通队列任务
service.Dispatch(&tasks.TestTask{}, "job执行时的参数")

// 投递一条延迟队列任务(指定执行时刻)
// 指定执行时刻,如果时刻是过去则立即执行
service.DelayAt(&tasks.TestTask{}, "job执行时的参数", time.Time类型的延迟到将来时刻)

// 投递一条延迟队列任务(指定相对于当前的延迟时长)
// 指定相对于投递时刻需要延迟的时长
service.Delay(&tasks.TestTask{}, "job执行时的参数", time.Duration类型的时长)

四、重试次数 & 重试间隔 & 超时

队列保证每个job至少能被执行1次

3.1、重试次数

任务类定义实现的 MaxTries() int64 方法指定单个job能被重试的次数

注意:返回值若小于等于1则仅被执行1次

执行任务类失败或异常会触发重试

3.2、重试间隔

当任务类允许多次重试时,下一次重试可以并不是立即执行,通过RetryInterval() int64方法设置重试之前的等待时长间隔,单位:秒

注意:返回值若小于等于0则取值0表示立即重试

重试间隔 是配合 重试次数 起作用的,仅可多次重试的任务有效

3.3、超时

因goroutine无法从外部kill掉,超时控制通过context.Context上下文实现,需任务类自主实现超时控制的退出机制!

任务类通过嵌入DefaultTaskSetting则设置的最大超时时长为900秒,可通过任务类Timeout方法自定义超时时间。

3.4、约定

  1. 重试次数若小于等于1则取值1
  2. 重试间隔若小于等于0则取值0,0表示没有重试间隔
  3. 任务执行成功:Execute(job *RawBody) error返回nil
  4. 任务执行失败:Execute(job *RawBody) error返回error
  5. 任务执行异常:Execute(job *RawBody) error发生了panic
  • 提供有默认设置最大超时时间、最大重试次数、重试间隔的可嵌入结构体 queue.DefaultTaskSetting
  • 提供有默认设置最大重试次数、重试间隔而不设置超时时间可自定义超时的可嵌入结构体 queue.DefaultTaskSettingWithoutTimeout
  • 当然你也可以完全自定义任务类而不嵌入任何默认构件结构体

# Functions

FakeUniqueID 生成一个V4版本的uuid字符串,生成失败返回时间戳纳秒 UUID单机足以保障唯一,生成失败场景下纳秒时间戳也可以一定程度上保障单机唯一.
IFaceToString interface类型转string.
New 初始化一个队列 @param driver 队列实现底层驱动,可选值见上方14行附近位置的常量 @param conn driver对应底层驱动连接器句柄,具体类型参考 QueueIFace 实体类 @param logger 实现 Logger 接口的结构体实例的指针对象 @param concurrent 单个队列最大并发消费数.

# Constants

job任务执行时长极限预警值:15分钟.
默认最大重试次数:1次<即不重试>.
默认下次任务重试间隔:1分钟<即可多次执行任务失败后下一次尝试是在60秒后>.
queue队列支持的底层驱动名称常量 后续扩充mq、sqs、db等在此添加常量并实现 QueueIFace 接口予以关联.
queue队列支持的底层驱动名称常量 后续扩充mq、sqs、db等在此添加常量并实现 QueueIFace 接口予以关联.

# Variables

ErrAbortForWaitingPrevJobFinish 等待上一次任务执行结束退出.
ErrMaxAttemptsExceeded 尝试执行次数超限.
ErrQueueClosed 队列处于优雅关闭或关闭状态错误.

# Structs

DefaultTaskSetting 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长.
DefaultTaskSettingWithoutTimeout 默认task设置struct:实现默认的最大尝试次数、尝试间隔时长、最大执行时长.
No description provided by the author
No description provided by the author
Payload 存储于队列中的job任务结构.
Queue 队列struct.
RawBody 队列execute执行时传递给执行方法的参数Raw结构:job任务参数的包装器 - ID 内部标记队列任务的唯一ID,使用UUID生成.

# Interfaces

JobIFace 基于不同技术栈的队列任务Job实现契约.
Logger 日志接口定义.
QueueIFace 基于不同技术栈的队列实现契约.
TaskIFace 定义队列Job任务执行逻辑的契约(队列任务执行类).

# Type aliases

FailedJobHandler 失败任务记录|处理回调方法 @param *Payload 失败job的对象信息 @param error job任务失败的error报错信息.