sync.Cond
实现了一种条件变量同步原语,可以让一个 goroutine
集合在满足特定条件时被唤醒。
sync.Cond
典型的使用场景是 生产-消费者模式
,多个 goroutine
等待某个事件发生, 单个 goroutine
通知某个事件已发生。比如电商中的用户下单事件发生时,会通知到订单、用户、积分、优惠券、仓储等服务,如果是单个生产者对单个消费者,直接使用 互斥锁
或 channel
就可以。
为什么多个消费者模式不使用互斥锁或 channel 呢?
可以想象一个非常简单的场景: 有一个 goroutine
在异步接收数据,剩下的多个 goroutine
必须等待该 goroutine
接收完才能读取。在这种情况下,如果单纯使用 互斥锁
或 channel
,就只能有一个 goroutine
可以等待并读取到数据,其他的 goroutine
没办法读取。
当然我们可以通过折衷的方案来解决,例如 可以创建一个全局变量,用来标志这个 goroutine
数据是否接收完成,剩下的 goroutine
反复检查该全局变量,直到满足条件。或者 可以创建多个 channel
,每个 goroutine
阻塞在一个 channel
上面,接收数据的 goroutine
在数据接收完毕后,逐个通知。但是不论哪种方式,实现复杂度都大大增加了。
sync.Cond
提供了简洁优雅的方式来解决上述问题。
通过一个小例子展示 sync.Cond
的使用方法。
package main
import (
"fmt"
"sync"
"time"
)
// 条件变量
var done = false
// 数据读取操作
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
// 等待生产者写入通知
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
// 数据写入操作
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(100 * time.Millisecond)
c.L.Lock()
// 设置条件变量
done = true
c.L.Unlock()
fmt.Println(name, "wakes all")
// 通知所有消费者
c.Broadcast()
}
func main() {
// 创建对象时传入一个互斥锁
cond := sync.NewCond(&sync.Mutex{})
// 3 个消费者
go read("reader-1", cond)
go read("reader-2", cond)
go read("reader-3", cond)
// 1 个生产者
write("writer-1", cond)
time.Sleep(time.Second)
}
$ go run main.go
# 输出如下
writer-1 starts writing
writer-1 wakes all
reader-2 starts reading
reader-1 starts reading
reader-3 starts reading
从输出结果中可以看到,消费者刚开始时调用 Wait
方法阻塞,直到生产者 (write) 写入完成后调用 Broadcast
方法通知所有消费者 (read),然后所有消费者依次输出。
我们来探究一下 sync.Cond
的内部实现,文件路径为 $GOROOT/src/sync/cond.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
Cond
对象表示同步条件变量,可以让 goroutins
等待或通知某个事件发生,Cond 对象一旦使用后,就不能再复制。
每一个 Cond
对象都持有一个对应的 Locker
接口 (通常是一个互斥锁或读写锁),当条件发生变化以及调用 Wait
方法时,必须持有对应的锁。
在简单的应用场景中,更好的选择是使用 channel
完成同步操作 (Go 的标准库设计理念是上层应用尽量使用 channel
作为同步原语),可以将两者的对应关系简单概况如下:
type Cond struct {
// 保证编译期间不会发生复制
noCopy noCopy
// 当访问或者修改条件时,必须持有 L
L Locker
// goroutine 链表
notify notifyList
// 保证运行期间不会发生复制
checker copyChecker
}
notifyList
对象表示一个 goroutine
链表数据结构。
// runtime/sema.go
type notifyList struct {
// 等待的 goroutine 索引,可以在没有获取锁的情况下原子性递增
wait uint32
// 已经通知到的 goroutine 索引
// 可以在没有获取锁的情况下进行读取操作,但是必须在获得锁的情况下进行写入操作
notify uint32
lock mutex
// 等待索引和已通知索引可以是环形队列结构
// 链表头指针
head *sudog
// 链表尾指针
tail *sudog
}
NewCond
方法创建一个 Cond
对象,参数为一个 Locker
接口。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
Wait
方法 (阻塞调用) 会解锁 c.L
字段并且休眠当前 goroutine
,等到当前 goroutine
被唤醒后,Wait
方法在返回之前再对 c.L
字段加锁。
func (c *Cond) Wait() {
// 复制检测
c.checker.check()
// 等待索引 + 1
t := runtime_notifyListAdd(&c.notify)
// goroutine 休眠之前先解锁 (否则其他 goroutine 获取不到锁,会造成死锁问题)
c.L.Unlock()
// 等待唤醒,并传递等待索引
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
runtime_notifyListAdd
方法通过链接器链接到 notifyListAdd
方法,notifyListAdd
方法将当前调用方 goroutine
添加到通知链表中以便接收通知。
// runtime/sema.go
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 等待索引计数 + 1
return atomic.Xadd(&l.wait, 1) - 1
}
runtime_notifyListWait
方法通过链接器链接到 notifyListWait
方法,如果在调用 notifyListAdd
方法之后已经发送过通知,notifyListWait
就会立即返回,否则就陷入阻塞。
// runtime/sema.go
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
// 如果之前已经发送过通知,直接返回即可
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 获取当前 goroutine 并追加到链表尾部
s := acquireSudog()
s.g = getg()
// 获取等待索引
s.ticket = t
// 将 goroutine 加入到链表中
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 休眠当前 goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
// 归还 goroutine
releaseSudog(s)
}
Signal
方法唤醒链表头部等待的 goroutine
。
func (c *Cond) Signal() {
// 复制检测
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
runtime_notifyListNotifyOne
方法通过链接器链接到 notifyListNotifyOne
方法,唤醒链表头部的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// 如果已通知索引和等待索引相同
// 说明没有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 获取已通知索引
t := l.notify
// 如果已通知索引和等待索引相同
// 说明没有等待的 goroutine, 直接返回
// 又是一个经典的双重检测
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 已通知索引 + 1
atomic.Store(&l.notify, t+1)
// 根据已通知索引,找到对应的 goroutine 唤醒
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
// 唤醒满足条件的 goroutine
readyWithTime(s, 4)
return
}
}
}
为什么不直接唤醒链表的头部元素呢?
这就是 已通知索引 notify
字段存在的意义,因为获取等待索引和加入到链表两个步骤不是原子操作,这意味着在并发场景下,会出现顺序不一致的情况。
例如,goroutine
对应的等待索引为 2, 但是因为并发问题,加入到链表的时候,排到了第 3 个位置,如图所示:
不过不需要担心,我们可以根据 已通知索引 notify
字段,保证在发送单个通知时保证顺序的一致性,避免乱序可能带来的 先到的 goroutine 反而等待时间长
这类问题。
当前 notify
字段对应的 goroutine
通知后,会变更指针到下一个 goroutine
,如图所示:
从算法时间复杂度来分析,直接唤醒链表头部元素是 O(1)
, 通过 notify
字段唤醒是 O(N)
, 但是官方的注释中写道:
This scan looks linear but essentially always stops quickly.
所以即便出现乱序,notify
索引字段对应的 goroutine
也不会太靠后,所以不会产生太多的性能问题。
Broadcast
方法唤醒所有等待的 goroutine
。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime_notifyListNotifyAll
方法通过链接器链接到 notifyListNotifyAll
方法,唤醒所有等待的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// 没有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 既然是全部唤醒,也就不用担心上面提到的乱序问题了
// 直接遍历 goroutine 链表,逐个唤醒 goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Cond.copyChecker
字段持有指向自身的指针,用来检测是否被复制,当指针值和实际地址值不一致时,说明发生了复制。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// 临界区域重复检测,避免原子对比后的瞬间,值被复制
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
check
方法的实现很有意思,里面有 3 个判断条件:
uintptr(*c) != uintptr(unsafe.Pointer(c))
比较 copyChecker 的指针值 (默认是 0)atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c)))
CAS 操作 copyChecker 的指针值uintptr(*c) != uintptr(unsafe.Pointer(c))
比较 copyChecker 的指针值为什么 CAS 操作
之后又重复比较了一次呢?主要是针对临界区的检测,因为可能会出现一种极端情况: CAS 操作
之后,copyChecker 瞬间被复制了。
noCopy
对象可以添加到具体的结构体中,实现 "首次使用之后,无法被复制" 的功能 (由编译器实现)。
noCopy.Lock
方法是一个空操作,由 go vet
工具链中的 -copylocks checker 参数指令使用。
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
sync.Cond
不是一个常用的同步机制,在条件变量长时间无法满足时,sync.Cond
能够让出处理器的使用权,和单纯使用 for {}
进行无限等待相比,
可以提高 CPU 的利用率。但是使用时我们也需要注意以下问题:
Wait
方法在调用之前一定要完成加锁操作,否则程序会 panic
(因为方法内部会释放互斥锁)Signal
方法会唤醒链表 (队列) 最前面、等待最久的 goroutine
(通过等待索引字段保证顺序)Broadcast
方法会按照链表的顺序 (并不是先进先出,因为可能存在乱序问题) 唤醒所有等待的 goroutine
Go sync.Cond: https://geektutu.com/post/hpg-sync-cond.html
[2]Go 设计与实现: https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#cond
[3]Golang sync.Cond 条件变量源码分析: https://www.cyhone.com/articles/golang-sync-cond/