Goroutines和channel结构体使golang成为强大的并发语言。在第一部分中,我们探讨了如何构建一个协程池来优化golang的并发性能,即限制资源利用率。但那只是用一个简单的例子来演示我们如何去做。在本文,我们将构建一个健壮的解决方案,以便我们可以在任何应用程序中使用这个解决方案。在网上也有一些解决方案,使用了调度程序和复杂的结构。事实上,不需要那么复杂,我们可以使用一个共享的渠道分配任务的处理。看看本文是怎么实现的。代码结构
这里,我们创建了一个通用的workerpool包,它可以根据设置的并发度完成任务处理。workerpool
├── pool.go
├── task.go
└── worker.go
workerpool是根目录,Task是要处理的任务,worker是一个简单的处理任务的函数。pool是处理worker的创建和管理。代码实现
package workerpool
import (
"fmt"
)
type Task struct {
Err error
Data interface{}
f func(interface{}) error
}
func NewTask(f func(interface{}) error, data interface{}) *Task {
return &Task{f: f, Data: data}
}
func process(workerID int, task *Task) {
fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
task.Err = task.f(task.Data)
}
Task是一个简单的结构体,包含任务需要的所有字段,f是处理任务的函数、data是函数参数。process是执行具体任务的函数。我们将任务返回的错误存储在任务的Err字段里面。下面看看worker是如何处理任务的。
package workerpool
import (
"fmt"
"sync"
)
type Worker struct {
ID int
taskChan chan *Task
}
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
}
}
func (wr *Worker) Start(wg *sync.WaitGroup) {
fmt.Printf("Starting worker %d\n", wr.ID)
wg.Add(1)
go func() {
defer wg.Done()
for task := range wr.taskChan {
process(wr.ID, task)
}
}()
}
Worker也是一个简单的结构体。包含一个等待处理的任务通道,以及worker ID。Start方法遍历taskChan,并启动一个goroutine来处理任务。每个worker都并发地执行通道里面的任务。Worker Pool
Task和Worker结构体和相应的方法实现了,但还缺少如何创建Worker以及向通道中发送任务的内容。
package workerpool
import (
"fmt"
"sync"
"time"
)
type Pool struct {
Tasks []*Task
concurrency int
collector chan *Task
wg sync.WaitGroup
}
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
collector: make(chan *Task, 1000),
}
}
func (p *Pool) Run() {
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
worker.Start(&p.wg)
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
close(p.collector)
p.wg.Wait()
}
workerpool保存它需要处理的所有任务,并以并发数作为输入,生成数量相同的goroutines并发地完成任务。它有一个带缓存的通道collector,被所有的worker共享。
因此在运行workerpool时,会创建指定数量的worker来共同处理collector通道中的任务。使用waitgroup来同步所有的worker,下面来试试效果。
package main
import (
"fmt"
"time"
"github.com/Joker666/goworkerpool/workerpool"
)
func main() {
var allTask []*workerpool.Task
for i := 1; i <= 100; i++ {
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, i)
allTask = append(allTask, task)
}
pool := workerpool.NewPool(allTask, 5)
pool.Run()
}
在这里,我们创建了100个任务,并使用5个作为并发处理它们。看看输出:Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s
花了2秒完成100个任务的处理,如果将并发改为10的话处理时间大概1秒左右。
我们已经为workerpool实现了一个健壮的解决方案,可以处理并发性,存储错误到任务中。这个包是通用的,没有与特定的实现耦合。进一步扩展:在后台处理任务
实际上,我们可以进一步扩展我们的解决方案,这样,worker就可以一直在后台等待新的任务,我们可以把新的任务发送给它们处理。为此,Task保持原样,但是我们需要稍微修改一下Worker。我们添加quit通道和两个方法到Worker结构体中。StartBackgorund将启动一个无限循环从taskChan中读取任务并处理。如果从quit通道中读取到数据就退出函数。Stop方法向quit中写数据。因此Pool也需要做相应的改动。
type Pool struct {
Tasks []*Task
Workers []*Worker
concurrency int
collector chan *Task
runBackground chan bool
wg sync.WaitGroup
}
func (p *Pool) AddTask(task *Task) {
p.collector <- task
}
func (p *Pool) RunBackground() {
go func() {
for {
fmt.Print("⌛ Waiting for tasks to come in ...\n")
time.Sleep(10 * time.Second)
}
}()
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
p.Workers = append(p.Workers, worker)
go worker.StartBackground()
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
p.runBackground = make(chan bool)
<-p.runBackground
}
func (p *Pool) Stop() {
for i := range p.Workers {
p.Workers[i].Stop()
}
p.runBackground <- true
}
Pool结构体包含一个runBackground通道通知worker后台执行,AddTask方法可以随时向任务通道collector中添加任务。RunBackground方法通过一个goroutine在后台保持执行。如果任务通道没有数据就会等待。如果有一个真实的场景,它将与HTTP服务器一起运行并处理任务。我们会用一个无限循环来复制类似的行为,符合某个条件,就会停止。
...
pool := workerpool.NewPool(allTask, 5)
go func() {
for {
taskID := rand.Intn(100) + 20
if taskID%7 == 0 {
pool.Stop()
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, taskID)
pool.AddTask(task)
}
}()
pool.RunBackground()
当我们运行main函数时,我们会看到一个随机的任务被插入,而workers在后台运行,其中一个worker会读取到这个任务。当它符合停止的条件时,它将最终停止。