cover_image

Golang池化技术实现

Eagle1949 技术源泉
2023年06月30日 13:25

在golang中,经常会出现需要发送短信、通知或其它内容的需求,而这些事情不需要即时性且有点耗时。我们最常用的方法是使用一个goroutine来做这个事情。

例如:在订单发货后,我们需要给用户发送一个短信通知。

func SendSms(tmpl string,msg string){    // 发送短信代码}
func OrderShipment(){ // 订单发货逻辑 // 发货成功后,发送短信 go SendSms("发货模板","消息内容")}

上面的goroutine方法,可以满足大部分需求。但是有个缺陷,当发货量非常大时,需要开启的goroutine特别多,当我们需要限制开启的goroutine数量时,我们可以使用池化技术。


池化技术就是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。池化技术的优点主要是提前准备和重复利用,控制和管理线程数和任务数的能力。


我们来做一个goroutine的例程池,这样可以控制goroutine的数量。

// ErrScheduleTimeout 在周期内池中没有空闲的资源返回超时错误var ErrScheduleTimeout = fmt.Errorf("schedule error: timed out")
// Pool 包含两种通道,sem控制worker的创建,work控制任务的执行type Pool struct { sem chan struct{} work chan func()}
// NewPool 创建具有给定大小的池,// size为池最大创建的例程数量,queueworker的数量,spawn为预先生成例程的数量func NewPool(size, queue, spawn int) *Pool { if spawn <= 0 && queue > 0 { panic("dead queue configuration detected") } if spawn > size { panic("spawn > workers") } p := &Pool{ sem: make(chan struct{}, size), work: make(chan func(), queue), } for i := 0; i < spawn; i++ { p.sem <- struct{}{} go p.worker(func() {}) }
return p}
// Schedule 调度任务func (p *Pool) Schedule(task func()) { p.schedule(task, nil)}
// ScheduleTimeout 调度任务// 它将返回错误如果在给定周期内没有资源可用func (p *Pool) ScheduleTimeout(timeout time.Duration, task func()) error { return p.schedule(task, time.After(timeout))}
func (p *Pool) schedule(task func(), timeout <-chan time.Time) error { select { case <-timeout: return ErrScheduleTimeout case p.work <- task: return nil case p.sem <- struct{}{}: go p.worker(task) return nil }}
func (p *Pool) worker(task func()) { defer func() { <-p.sem }()
task()
for task := range p.work { task() }}

使用池,下面是个例子。

// 先实例化pool := gopool.NewPool(128, 1024, 1)func OrderShipment(){    // 订单发货逻辑    // 发货成功后,发送短信    // 发送短信    pool.Schedule(func() {        SendSms("发货模板","消息内容")    })}

掌握它的原理,可根据场景应用到其它地方。


这个池的调度实现的很妙,妙处就在于在调度实现利用了golang 的select特性:如果任意某个通道可以进行,它就执行,其他被忽略;如果所有通道都不会执行,select 将阻塞,直到某个通道可以运行;在worker中,使用for range通道,直到通道被关闭,for循环才退出。

func (p *Pool) schedule(task func(), timeout <-chan time.Time) error { select {    case <-timeout:        return ErrScheduleTimeout    case p.work <- task: // 如果任务过多,被阻塞,往下执行        return nil    case p.sem <- struct{}{}: // 如果达到最大例程数,会被阻塞        go p.worker(task)        return nil    }}
func (p *Pool) worker(task func()) { // 返回前读取信号 defer func() { <-p.sem }()
task() // 直到p.work通道被关闭 for task := range p.work { task() }}


如果你对任务执行有时间要求,请使用ScheduleTimeout方法,它会在给定的时间段后如果没有资源可用就返回错误,你可以根据报错做业务处理。


继续滑动看下一个
技术源泉
向上滑动看下一个