# README
qtask
提供稳定,可靠的任务管理服务。支持两种任务:
-
实时任务 将任务放入消息队列,同时备份到DB。发生异常后从DB恢复到队列。
-
延时任务 将任务放入DB,达到延时时间后从DB拉取放入消息队列。发生异常后从DB恢复到队列。
特性:
√ 实时任务
√ 延时任务
√ 自动备份
√ 自动恢复
√ 过期清理
√ 一键安装
√ 代码集成
√ 基于 hydra 构建
一. 创建任务
任务分为实时任务和延时任务
1.实时任务
import "github.com/micro-plat/qtask"
// 创建实时任务,将任务放入指定的消息队列,并备份到DB
// taskID:任务编号
// invoke:执行函数句柄
taskID, invoke, err:=qtask.Create(c,"订单绑定任务",map[string]interface{}{
"order_no":"8973097380045"
},3600,"ORDER:BIND")
//立即执行
err:=invoke(c)
2.延迟任务
import "github.com/micro-plat/qtask"
//创建延时任务,将任务保存到数据库(状态为等待处理),超时后放入消息队列
// taskID:任务编号
taskID, err:=qtask.Delay(c,"订单绑定任务",map[string]interface{}{
"order_no":"8973097380045"
},60,3600,"ORDER:BIND", qtask.WithDeadline(86400))
3.参数说明
- qtask.WithDeadline 任务截止执行时间,单位:秒,默认值604800秒(7天)
- qtask.WithDeleteDeadline任务删除截止时间,单位:秒。未设置时,任务执行成功立即删除;未执行成功,则为任务添加后的604800秒(7天)后
- qtask.WithMaxCount 任务最多执行次数,默认为100次
- qtask.WithOrderNO 外部业务单号
二. 处理任务
1. 消息队列配置
hydra.Conf.Vars().Queue().Redis("queue", "192.168.0.111:6379"))
2. 添加、注册任务
//添加队列ORDER:BIND的处理任务
hydra.Conf.MQC(mqc.WithRedis("queue")).Queue(queue.NewQueue("ORDER:BIND", "/order/bind"))
//注册/order/bind服务
hydra.S.CRON("/order/bind", OrderBind) //业务处理服务
3. 编写处理代码
import "github.com/micro-plat/qtask"
func OrderBind(ctx hydra.IContext) (r interface{}) {
//检查输入参数...
//业务处理前调用,修改任务状态为处理中(超时前未调用qtask.Finish,任务会被重新放入队列)
qtask.Processing(ctx,ctx.Request().GetInt64("task_id"))
//处理业务逻辑...
//业务处理成功,修改任务状态为完成(任务不再放入队列),并修改删除截止时间
qtask.Finish(ctx,ctx.Request().GetInt64("task_id"))
}
三、创建数据表
当前应用flowserver
中引用了qtask:
:~/work/bin$ flowserver db install --debug
安装到数据库 [OK]
查看数据库,则已创建好qtask所需的数据表(mysql:2张表,oracle:1张表)
四、其它
1. 配置
import "github.com/micro-plat/qtask"
qtask.Config(qtask.WithDBName("oracle"), //数据库配置
qtask.WithQueueName("redis")) //队列配置
2. 数据库
- 使用 mysql 数据库
go install
- 使用 oracle 数据库
go install -tags "oracle"
# Packages
No description provided by the author
# Functions
BindFlow 绑定自动流程(调用后将自动扫描数据库后补队列数据,并定期清除过期数据).
Config 配置数据库,消息队列的配置名称,请通过hydra.OnReadyByInsert修改配置参数.
Create 创建实时任务,将任务信息保存到数据库并发送消息队列c:hydra.IContext,db.IDBExecuter,db.IDBTranstaskName:任务名称input:任务参数,序列化为json后存储,一般只允许传入关键参数。系统会在此输入参数中增加一个参数"task_id",业务流程需使用"task_id"修改任务为处理中或完结任务intervalTimeout:任务单次执行的超时时长,即下次执行时间距离上次执行时间的秒数,任务被放入消息队列、被标记为正在处理等都会自动调整下次执行时间mq: 消息队列名称.
Delay 创建延迟任务,将任务信息保存到数据库,超时后将任务取出放到消息队列c:hydra.IContext,db.IDBExecuter,db.IDBTranstaskName:任务名称input:任务关健参数,序列化为json后存储,一般只允许传入关键参数。系统会在此输入参数中增加一个参数"task_id",业务流程需使用"task_id"修改任务为处理中或完结任务 首次执行时间intervalTimeout:任务单次执行的超时时长,即下次执行时间距离上次执行时间的秒数,任务被放入消息队列、被标记为正在处理等都会自动调整下次执行时间mq: 消息队列名称调用此函数将延后下次被放入消息队列的时间.
Finish 任务完成任务终结,不再放入消息队列.
FinishByInput 任务完成任务终结,不再放入消息队列.
GetDBName 获取已配置的db节点名.
GetPlatName 获取队列对应的平台名称.
GetQueueName 获取已配置的queue节点名.
GetScanInterval 获取已配置的tScanInterval.
Processing 将任务修改为处理中。可以不调用,直接调用Finish完结任务,调用后系统自动延时,并累加任务处理次数任务被正式处理前调用此函数调用后当下次执行时间小于当前时间后会重新放入消息队列进行处理.
ProcessingByInput 将任务修改为处理中。可以不调用,直接调用Finish完结任务,调用后系统自动延时,并累加任务处理次数任务被正式处理前调用此函数调用后当下次执行时间小于当前时间后会重新放入消息队列进行处理.
WithDBName 数据库节点名称.
WithDeadline 秒数,设置任务截止时间.
WithDeleteDeadline 秒数,设置任务删除截止时间.
WithMaxCount 任务执行最大次数.
WithOrderNO 业务单号.
WithPlatName 设置消息队列对应平台的名称.
WithQueueName 消息队列节点名称.
WithScanInterval 秒数,后补时间间隔.