例如:在订单发货后,我们需要给用户发送一个短信通知。
func SendSms(tmpl string,msg string){
// 发送短信代码
}
func OrderShipment(){
// 订单发货逻辑
// 发货成功后,发送短信
go SendSms("发货模板","消息内容")
}
上面的goroutine方法,可以满足大部分需求。但是有个缺陷,当发货量非常大时,需要开启的goroutine特别多,当我们需要限制开启的goroutine数量时,我们可以使用池化技术。
池化技术就是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。池化技术的优点主要是提前准备和重复利用,控制和管理线程数和任务数的能力。
我们来做一个goroutine的例程池,这样可以控制goroutine的数量。
// ErrScheduleTimeout 在周期内池中没有空闲的资源返回超时错误
var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
// Pool 包含两种通道,sem控制worker的创建,work控制任务的执行
type Pool struct {
sem chan struct{}
work chan func()
}
// NewPool 创建具有给定大小的池,
// size为池最大创建的例程数量,queue为worker的数量,spawn为预先生成例程的数量
func NewPool(size, queue, spawn int) *Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &Pool{
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
// Schedule 调度任务
func (p *Pool) Schedule(task func()) {
p.schedule(task, nil)
}
// ScheduleTimeout 调度任务
// 它将返回错误如果在给定周期内没有资源可用
func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error {
return p.schedule(task, time.After(timeout))
}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }()
task()
for task := range p.work {
task()
}
}
使用池,下面是个例子。
// 先实例化
pool := gopool.NewPool(128, 1024, 1)
func OrderShipment(){
// 订单发货逻辑
// 发货成功后,发送短信
// 发送短信
pool.Schedule(func() {
SendSms("发货模板","消息内容")
})
}
掌握它的原理,可根据场景应用到其它地方。
这个池的调度实现的很妙,妙处就在于在调度实现利用了golang 的select特性:如果任意某个通道可以进行,它就执行,其他被忽略;如果所有通道都不会执行,select 将阻塞,直到某个通道可以运行;在worker中,使用for range通道,直到通道被关闭,for循环才退出。
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task: // 如果任务过多,被阻塞,往下执行
return nil
case p.sem <- struct{}{}: // 如果达到最大例程数,会被阻塞
go p.worker(task)
return nil
}
}
func (p *Pool) worker(task func()) {
// 返回前读取信号
defer func() { <-p.sem }()
task()
// 直到p.work通道被关闭
for task := range p.work {
task()
}
}
如果你对任务执行有时间要求,请使用ScheduleTimeout方法,它会在给定的时间段后如果没有资源可用就返回错误,你可以根据报错做业务处理。