# README
协程池
无论协程创建、销毁都比较耗资源,还是同时存在大量的协程会让协程M:N模型调度器更累
功能
核心协程:作为核心,当提交任务到协程池中,核心协程数足够则直接创建协程,期间直到服务停止;
任务队列(同步队列):当核心协程数达到后,不会立即启用业余协程,而是将任务放到任务队列中;
业余协程:业余协程在无任务情况下,允许存活一段时间;当队列满了之后之间启用业务协程;
拒绝策略:当任务队列、业余协程忙中,将采用某种拒绝策略;
停止协程池:慢性通知--先通知池停止提交任务,所有协程后台将所有任务处理完;
主动通知--停止提交任务,等待一段时间后台所有协程处理所有任务,再判断是否还有协程在跑;
终止协程池:把所有协程当前手上的任务都处理好,直接退出;
使用
详情请见go_routine_pool_test.go
func init() {
log.SetGlobalLog("taskpool", true)
}
const TestSize = 100000
func TestGoRoutinePool(t *testing.T) {
q := NewChanTaskQueue(10000)
p := NewGoRoutinePool(2000, 4000, 3*time.Second, q, &CallerRunsPolicy{})
var wg sync.WaitGroup
wg.Add(TestSize)
start := time.Now()
log.Info("开始任务")
fmt.Println("开始任务?")
for i := 0; i < TestSize; i++ {
ret, err := p.Execute(&testR{name: strconv.Itoa(i)})
if err != nil {
log.Info("p.Execute", log.Any("ret", ret), log.ErrorField(err))
}
if i%15000 == 0 {
time.Sleep(1 * time.Second)
}
if p.queue.Size() == 10000 && p.largestPoolSize.Load() >= 3900 {
time.Sleep(1 * time.Second)
}
wg.Done()
}
log.Info("start")
wg.Wait()
log.Info("end")
if ret, err := p.AwaitTermination(3 * time.Second); err != nil {
log.Info("终止条件失败", log.Any("ret", ret), log.ErrorField(err))
}
defer log.Info("完成任务数量", log.Any("", p.completedTaskCount.Load()))
log.Info("总耗时:", log.Any("", time.Now().Sub(start)))
}
type testR struct {
name string
}
func (t *testR) Run() (bool, error) {
time.Sleep(1 * time.Second)
log.Info(" ->", log.Any("name", t.name))
return true, nil
}
设计
GoRoutine Worker
// GoRoutineWork 执行者
type GoRoutineWork struct {
core bool
allowCoreTimeOut bool //是否允许核心协程超时
status int
name string //工作者名称
completedTasks int64
keepAliveTime int64 //秒级时间
task Runnable
pool IGoRoutinePool
}
面向接口编程,提供扩展空间;
// ITaskPool 线程池:核心数量、最大数量、业余队列存活时间&时间单元、同步队列&队列满后执行策略
type IGoRoutinePool interface {
Execute(Runnable) (bool, error)
IsShutdown() bool
TryTerminate() (bool, error)
AwaitTermination(unit time.Duration) (bool, error)
GetTask(allowCoreThreadTimeOut bool) (Runnable, error) //从同步队列取任务
AddTask(Runnable) (bool, error) //增加任务
Queue() IGoRoutinePoolQueue
RemoveWorker(string, bool) (bool, error) //取出任务队列
AddCompletedTasks(int64) bool // 增加已经完成的任务数
}
队列主要存放Runnable实现类
// ITaskPoolQueue 线程安全 Put Pool
type IGoRoutinePoolQueue interface {
IsEmpty() bool
Size() int64
// 直接存任务,可能会失败
Add(r Runnable) (bool, error)
// 直接取任务,可能为空
Take() (Runnable, error)
// 尝试一定时间内放任务
Offer(r Runnable, unit time.Duration) (bool, error)
// 尝试一定时间取任务
Poll(unit time.Duration) (Runnable, error)
}
// 协程池满了,队列满了,要怎么拒绝任务呢?
type IGoRoutinePoolRejectedPolicy interface {
RejectedExecution(Runnable, IGoRoutinePool) (bool, error)
}
type Runnable interface {
Run() (bool, error)
}
# Functions
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Constants
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Variables
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Structs
No description provided by the author
No description provided by the author
DiscardOldestPolicy:把队列最老的丢了,再执行当前的.
DiscardPolicy:任务直接丢弃.
GoRoutineWork 执行者.
No description provided by the author
# Interfaces
ITaskPool 线程池:核心数量、最大数量、业余队列存活时间&时间单元、同步队列&队列满后执行策略.
ITaskPoolQueue 线程安全 Put Pool.
协程池满了,队列满了,要怎么拒绝任务呢?.
No description provided by the author