最近粉丝量暴涨,up 主加更一篇长文。
必须宠粉,谢谢各位关注、分享、转发
Go 语言以其优雅的并发模型而闻名。我们一起考虑这样一种场景,协程 A 在执行过程中需要创建子协程 A1、A2、A3…An,协程 A 创建完子协程后就等待子协程退出,为了处理这三种场景,Go 提供了三种解决方案,并且这三种方案各有优劣:
Channel:使用 channel 控制子协程,优点是实现简单,清晰易懂。
WaitGroup:使用信号量机制控制子协程,优点是子协程个数可以动态调整。
Context:使用上下文控制子协程,优点是对子协程派生出来的孙子协程可以控制。
Go 语言其并发控制主要依靠 Channel、WaitGroup 和 Context 这三种机制,这三种解决方案的优点、缺点是相对而言的,要结合实际应用场景进行选择。
在这篇文章中,我们将深入探讨这三种并发控制机制的工作原理和使用方法。
Channel 是 Go 语言中的一种类型,可以用来传递类型化的数据。我们可以通过 make 函数来创建一个Channel:
ch := make(chan int)
在这个例子中,我们创建了一个可以传递 int 类型数据的 Channel。
Channel 支持两种操作:发送和接收。我们可以使用 <- 操作符来进行这两种操作:
ch <- 1 // 发送操作,将 1 发送到 Channel ch 中
value := <-ch // 接收操作,从 Channel ch 中接收数据,并存储到变量 value 中
Channel 一般用于协程之间的通信,不过 Channel 也可以用于并发控制,比如主协程启动 N 个子协程,主协程等待所有的子协程退出后再继续后续的流程,这种场景可以使用 Channel 轻松实现。
下面的程序通过创建 N 个 Channel 来管理 N 个子协程,每个子协程都有一个 Channel 用于跟父协程通信,父协程创建完所有的子协程后等待所有的的子协程结束。
package main
import (
"fmt"
"time"
)
func Process(ch chan int) {
//Do some work...
time.Sleep(time.Second)
// non-buffered channel 发送端,在发送时会一直阻塞,直到接收端已经接收了数据,否则会一直阻塞下去
// 管道中写入一个元素表示当前协程已结束,
ch <- 1
}
func main() {
// 创建一个10个元素的切片,元素类型为channel
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
// 切片中放入一个 channel
channels[i] = make(chan int)
// 启动协程,传一个管道用于通信
go Process(channels[i])
}
// 遍历切片,等待子协程结束
for i, ch := range channels {
// non-buffered channel 接收端也会一直阻塞,直接发送端已经发送了数据,否则会一直阻塞下去。
<-ch
fmt.Println("Routine ", i, " quit!")
}
}
// 打印输出
Routine 0 quit!
Routine 1 quit!
Routine 2 quit!
Routine 3 quit!
Routine 4 quit!
Routine 5 quit!
Routine 6 quit!
Routine 7 quit!
Routine 8 quit!
Routine 9 quit!
我们总结一下使用 channel 控制子协程的优点是实现简单,缺点是当需要大量创建子协程时就需要有相同数量的 channel,而且这样实现的话对于子协程继续派生出来的孙子协程是不方便控制的。
接下来我们继续看起来比 channel 优雅一下的 WaitGroup、Context ,它们在开源组件中使用的频率更高一些。
WaitGroup 是 Go 开发中经常使用的并发控制技术。WaitGroup 可理解为 Wait-Goroutine-Group,即等待一组 goroutine 执行结束。
WaitGroup 是 Go 语言中的一个类型,它提供了一种等待一组 goroutines 完成的简单方式。我们可以通过 Add(n)
方法来增加 WaitGroup 的计数,通过 Done()
方法来减少 WaitGroup 的计数,通过 Wait()
方法来阻塞当前 goroutine,直到 WaitGroup 的计数变为零。
var wg sync.WaitGroup
wg.Add(1)
go func() {
// 业务在子协程执行
wg.Done()
}()
wg.Wait()
在这个例子中,我们增加了 WaitGroup 的计数,然后启动了一个新的 goroutine 来执行任务。当任务完成后,我们调用 Done 方法来减少 WaitGroup 的计数。主 goroutine 通过 Wait 方法等待任务完成。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
// 创建 10 个子协程并且 wg.Add(1)
for i := 0; i < 10; i++ {
wg.Add(1)
go exec(i)
}
wg.Wait()
fmt.Println("main exit!")
}
func exec(i int) {
// 使用 defer 保证子协程发生异常的时候也会正常执行 wg.Done()
defer func() {
wg.Done()
}()
// 类比执行业务
time.Sleep(time.Second * 2)
fmt.Printf("%d exec! \n", i)
}
// 打印输出,顺序每次随机
5 exec!
6 exec!
7 exec!
9 exec!
8 exec!
0 exec!
2 exec!
1 exec!
4 exec!
3 exec!
main exit!
信号量是 UNIX 提供的一种保护共享资源的机制,作用是防止多个线程同时访问某个资源。简单理解:
如果信号量 > 0 ,表示资源可用,获取信号量时系统会自动将信号量减 1。
如果信号量 = 0,表示资源暂不可用,获取信号量是,当前的线程会进入睡眠状态,当信号量变成 > 0 时将会被唤醒。
信号量 s 只能由两种特殊的操作来处理,这两种操作称为 P 和 V。
P(s):如果 s 是非零的,则 P 将 s 减 1,并且立即返回。如果 s 为零,那么就挂起这个线程,直到 s 变为非零,等到另一个执行 V(s) 操作的线程唤醒该线程。在唤醒之后,P 操作将 s 减1,并将控制返回给调用者。
V(s):V 操作将 s 加 1。如果有任何线程阻塞在 P 操作等待 s 变为非零,那么 V 操作会唤醒这些线程中的一个,然后该线程将 s 减 1,完成它的 P 操作。
WaitGroup 的实现依赖于 Go 语言的原子操作和内存模型,WaitGroup 实现中也使用了信号量。WaitGroup 内部有一个计数器,当我们调用 Add 方法时,计数器会增加;当我们调用 Done 方法时,计数器会减少;当我们调用 Wait 方法时,如果计数器不为零,当前 goroutine 会被阻塞。
在 Go 的底层信号量函数中:runtime_Semacquire(s *uint32)
函数会阻塞 goroutine 直到信号量 s 的值大于 0,然后原子性地减这个值,即 P 操作。runtime_Semrelease(s *uint32, lifo bool, skipframes int)
函数原子性增加信号量的值,然后通知被 runtime_Semacquire
阻塞的 goroutine,即 V 操作。
src/sync/waitgroup.go:WaitGroup
中定义:
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
因为 WaitGroup 需要等待一组操作完成之后再执行,所以需要等待所有操作完成之后才能继续执行。为了实现这个功能,WaitGroup 使用一个计数器 counter 来记录还有多少个操作没有完成,如果 counter 的值为 0,则表示所有操作已经完成。
WaitGroup 在所有任务都完成之后,需要唤醒所有处于等待的协程,此时记录有多少个协程处于等待状态。为实现这个功能,WaitGroup 使用了一个等待计数器 waiter 来记录当前有多少个协程正在等待操作完成。
WaitGroup 对于计数器和等待计数器的实现,是通过一个 64 位无符号整数来实现的,也就是 WaitGroup 结构体中的 state1,其中高 32 位保存了任务计数器 counter 的值,低 32 位保存了等待计数器 waiter 的值。当我们创建一个 WaitGroup 实例时,该实例的任务计数器和等待计数器都会被初始化为 0。
而且,等待协程需要等待所有任务完成之后才能继续执行,所以等待协程在任务未完成时会被阻塞。当任务全部完成后,会自动被唤醒。WaitGroup 使用 state2 用于实现信号量机制。通过调用 runtime_Semacquire()
和 runtime_Semrelease()
函数,可以在不阻塞线程的情况下进行等待和通知操作。
接下来继续介绍 Add()
,Done()
和 Wait()
方法的具体实现。
Add()
做了两件事:
把 delta 值累加到 counter 当中,因为 delta 可以是负值,也就是说 counter 可以变成 0 或 负值。
当 counter = 0 时,根据 waiter 数值释放等量的信号量,把等待的 goroutine 全部唤醒;如果 counter < 0,则触发 panic 。
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
// delta 的值可以为负数,Done方法便是通过Add(-1)来实现的
// statep: 为state1的地址 semap: 为state2的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// 高 32 位的值 加上 delta,增加任务计数器的值
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v: 取高32位数据,获取到待完成任务数
v := int32(state >> 32)
// 取低32位数据,获取到等待线程的值
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
// w = 0: 说明没有协程在等待,此时可以直接退出
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 此时 v = 0,所有任务都完成了,唤醒等待协程
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait()
方法也做两件事:一是累加 waiter,二是阻塞等待信号量。
注意:这里还用到了 CAS 算法,保证了有多个 goroutine 同时执行 Wait()
方法也能正确累加 waiter。
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
// statep: 为 state1 的地址 semap: 为 state2 的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 加载state1的值
state := atomic.LoadUint64(statep)
// v: 取高32位数据,获取到待完成任务数
v := int32(state >> 32)
w := uint32(state)
// 没有任务待执行,全部都完成了
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 增加 waiter 计数器的值
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 等待被唤醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
Done()
方法只完成一件事情:即把 counter 减 1。前面分析 Add()
方法可以接受负值,所以这里 Done()
方法只调用了 Add(-1)
。实际上这里正是最后一个完成的 goroutine 把等待的 waiter 唤醒的(感叹设计非常巧妙)。
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Add()
操作必须早于 Wait()
, 否则会触发 panic。
Add()
设置的值必须与实际等待的 goroutine 的个数一致,不然会触发 panic。
context 是 Go 1.7 版本引入的一个新特性,它主要用于限制和传递可跨 API 边界的请求级变量,取消信号和超时时间,并包含一个并发安全的 map 用于携带数据。
context 翻译成 “上下文”,context 与 WaitGroup 最大的不同点是:context 对于派生的 goroutine 有更强的控制能力,它可以控制多级的 goroutine。
context 的 API 非常简单,标准库实现上干净、独立,接下来我们会从具体的使用场景和源码分析两个角度进行分析。
package main
import (
"context"
"fmt"
)
func exec1(ctx context.Context) {
ctx = context.WithValue(ctx, "k1", "v1")
exec2(ctx)
}
func exec2(ctx context.Context) {
fmt.Println(ctx.Value("k1").(string))
}
func main() {
ctx := context.Background()
exec1(ctx)
}
我们在 exec1
通过 WithValue(parent Context, key, val interface{}) Context
,赋值 k1 为v1,在其下层函数 exec2
通过ctx.Value(key interface{}) interface{}
获取 k1 的值,非常简单。
扩展:如果我们是在 exec2 里赋值,在 exec1 里面能够拿到这个值吗?
答案是不能,context 只能自上而下携带值,这个是需要特别注意的一点。
主动取消
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
func exec(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
respC := make(chan int)
// 处理逻辑
go func() {
time.Sleep(time.Second * 5)
respC <- 10
}()
// 取消机制
select {
case <-ctx.Done():
fmt.Println("cancel")
return errors.New("cancel")
case r := <-respC:
fmt.Println(r)
return nil
}
}
func main() {
wg := new(sync.WaitGroup)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go exec(ctx, wg)
time.Sleep(time.Second * 2)
// 触发取消
cancel()
// 等待goroutine退出
wg.Wait()
}
超时取消
package main
import (
"context"
"fmt"
"time"
)
func exec(ctx context.Context) {
execCtx, execCancel := context.WithTimeout(ctx, time.Second*4)
defer execCancel()
resp := make(chan struct{}, 1)
// 处理逻辑
go func() {
// 处理耗时
time.Sleep(time.Second * 10)
resp <- struct{}{}
}()
// 超时机制
select {
//case <-ctx.Done():
// fmt.Println("ctx timeout")
// fmt.Println(ctx.Err())
case <-execCtx.Done():
fmt.Println("execCtx timeout")
fmt.Println(execCtx.Err())
case v := <-resp:
fmt.Println("exec function handle done")
fmt.Printf("result: %v\n", v)
}
fmt.Println("exec finish")
return
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
exec(ctx)
}
注意:对于有多个超时时间的处理,可以把上述超时取消例子中的注释打开,会观察到,当处理两个 ctx 时,时间短的会优先触发,这种情况下,如果只判定一个 context 的 Done() 是可以的,但是一定要保证调用到两个cancel
函数。
execCtx timeout:
ctx timeout:
context 实际上只定义了接口,凡是实现了该接口的类都可以称为一种 context,官方中实现了几种常用的 context 分别用于不同的场景。
// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
// The close of the Done channel may happen asynchronously,
// after the cancel function returns.
Done() <-chan struct{}
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
Value(key any) any
}
context 接口只定义了 4 个方法:
Deadline() (deadline time.Time, ok bool)
该方法返回一个 deadline 和 标识是否已设置 deadline 的 bool 值,如果没有设置 deadline,则 ok == false,此时 deadline 为一个初始值的 time.Time 值。
Done() <-chan struct{}
Done() 方法返回一个用于探测 context 是否取消的 channel。当 context 取消时会自动将该 channel 关闭。
需要注意:对于不支持取消的 context ,该方法可能会返回 nil,例如 context.Background()
Err() error
Err() 方法描述 context 关闭的原因,关闭原因由 context 实现控制,不需要用户设置,比如 Deadline context,关闭原因可能是因为 deadline ,也可能是被提前主动关闭了。
因 deadline 关闭:context deadline exceeded。
因 主动 关闭:context canceled。
Value(key any) any
有一种 context 它不是用于控制呈树状分布的 goroutine,而是用于在树状分布的 goroutine 之间传递信息。
Value() 方法就是用于此种类型的 context,该方法根据 key 值查询 map 中的 value。
既然 context 都需要实现 Context 接口,那么包括不直接可见(非导出)的结构体,一共有几种 context 呢?
答案是 4 种。
定义如下:
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
context 包中定义了一个公用的 emptyCtx 全局变量,名为 background,可以使用 context.Background()
获取它,还有一个 todo,可以使用 context.TODO()
,实现代码如下:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
Background 和 TODO 是一模一样的,官方说:background 它通常由主函数、初始化和测试使用,并作为传入请求的顶级上下文;TODO 是当不清楚要使用哪个 Context 或尚不可用时,代码应使用 context.TODO,后续在在进行替换掉,归根结底就是语义不同而已。
context 包还提供四个方法创建不同的类型的 context,使用四个方法如果没有父 context ,则都需要传入 background ,即 background 作为其父节点:
WithCancel()
WithDeadline()
WithTimeOut()
WithValue()
context 包中除了 emptyCtx 以外,还有 cancelCtx,timerCtx 和 ValueCtx,共计四种。它们之间的关系如下:
withValue 内部主要就是调用 valueCtx 类:
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
valueCtx 类
valueCtx目的就是为 Context 携带键值对,因为它采用匿名接口的继承实现方式,它会继承父 Context,也就相当于嵌入 Context 当中。
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}
valueCtx 实现 String()
方法输出 Context 和携带的键值对信息,实现 Value 方法来存储键值对。
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}
用图来理解:我们在调用 Context 中的 Value 方法时会层层向上调用直到最终的根节点,中间要是找到了 key 就会返回,否会就会找到最终的 emptyCtx 返回 nil。
这个函数执行步骤如下:
创建一个 cancelCtx 对象,作为子 context
然后调用 propagateCancel 构建父子 context 之间的关系,这样当父 context 被取消时,子 context 也会被取消。
返回子 context 对象和子树取消函数。
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
我们再分析一下 cancelCtx 这个类。
cancelCtx 类
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
mu:是一个互斥锁,用来保证并发安全的,所以 context 是并发安全的。
done:用来做 context 的取消通知信号,之前的版本使用的是 chan struct{} 类型,现在用 atomic.Value 做锁优化。
children:key 是接口类型 canceler,目的就是存储实现当前 canceler 接口的子节点,当根节点发生取消时,遍历子节点发送取消信号。
error:当 context 取消时存储取消信息。
propagateCancel 方法(传播 Cancel)
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
// 当前父 context 从来不会被取消,是一个空节点,直接返回即可。
return // parent is never canceled
}
// 提前判断一个父context是否被取消,如果取消了也不需要构建关联了,把当前子节点取消掉并返回
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// 这里是找到可以"取消"的context
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 将当前节点挂到父节点的 childrn map 中,外面调用 cancel 时可以层层取消
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 没有找到可"取消"的父节点挂载,那么需要开一个 goroutine
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
cancel 方法
最后我们再来分析一下返回的 cancel 方法是如何实现,该方法会关闭上下文中的 Channel 并向所有的子上下文同步取消信号:
并且通过源码我们可以知道 cancel 方法是可以被重复调用,是幂等的。
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 取消时传入的 error 信息不能为 nil, context 定义了默认 error:var Canceled = errors.New("context canceled"
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已经有错误信息,说明当前节点已经被取消过了,直接返回
}
c.err = err
// 用来关闭 channel,通知其他协程
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 当前节点向下取消,遍历它的所有子节点,然后执行取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
// 子节点置空
c.children = nil
c.mu.Unlock()
// 把当前节点从父节点中移除,只有在外部父节点调用时才会传true,其他都是传 false,内部调用都会因为c.children = nil被剔除出去
if removeFromParent {
removeChild(c.Context, c)
}
}
先看 WithTimeout 方法,它内部就是调用的 WithDeadline 方法:
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
所以我们进一步来看 withDeadline 的实现方式:
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 不能为空 context 创建衍生 context
if parent == nil {
panic("cannot create context from nil parent")
}
// 判断当父 context 的结束时间早于要设置的时间,则不需要再去单独处理子节点的定时器了
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 创建一个timerCtx对象
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 将当前节点挂到父节点上
propagateCancel(parent, c)
// 获取过期时间
dur := time.Until(d)
// 当前时间已经过期了则直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
// 如果没被取消,则直接添加一个定时器,定时去取消
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
从上面可以发现 withDeadline 相较于 withCancel 方法也就多了一个定时器去定时调用 cancel 方法,这个 cancel 方法在 timerCtx 类中进行了重写,我们先来看一下 timerCtx 类,它是基于 cancelCtx 的,多了两个字段:
timerCtx 类
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
并且 timerCtx 也实现了 cancel 方法,我们发现内部也是调用了cancelCtx 的 cancel 方法执行取消:
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 调用 cancelCtx 的 cancel 方法取消掉子节点context
c.cancelCtx.cancel(false, err)
// 把 父 context 移除放到了这做
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
// 停掉定时器,释放资源
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
至此,源码部分我们终于看完了,回顾一下,是不是有很多感想?
Channel、WaitGroup 和 Context 是 Go 语言并发控制的三种主要机制,它们各有各的应用场景和优缺点。Channel 提供了一种在 goroutines 之间传递数据和同步的手段,WaitGroup 提供了一种等待一组 goroutines 完成的方式,而 Context 则提供了一种跨 API 边界操作和处理超时或取消请求的方式。理解并熟练掌握这三种机制,是高效使用 Go 语言进行并发编程的关键。
同时,我们也要注意,虽然这三种机制能够帮助我们控制并发,但是并不能避免所有的并发问题,例如数据竞态等问题。因此,我们在编写并发程序时,还需要采用其它的同步机制,例如锁,以及遵循一些并发编程的最佳实践,例如避免在 goroutines 之间共享状态,使用不可变的数据结构等。
希望这篇文章能够帮助你更深入地理解 Go 语言的并发控制机制,如果你有任何问题或建议,欢迎在下方留言。