【导读】一些场景下,为了保障服务稳定性会引入熔断机制。本文介绍了用 Go 语言自己实现熔断需要什么操作。
熔断是指在下游发生错误时上游主动关闭或限制对下游的请求。
总得来说三个状态的转换大致如下图:
https://github.com/rubyist/circuitbreaker
atomic.StoreInt32((*int32)(&b.state), int32(HALFOPEN))
type TripFunc func(Metricser) bool
type Metricser interface {
Fail() // records a failure
Succeed() // records a success
Timeout() // records a timeout
Failures() int64 // return the number of failures
Successes() int64 // return the number of successes
Timeouts() int64 // return the number of timeouts
ConseErrors() int64 // return the consecutive errors recently
ErrorRate() float64 // rate = (timeouts + failures) / (timeouts + failures + successes)
Samples() int64 // (timeouts + failures + successes)
Counts() (successes, failures, timeouts int64)
Reset()
}
type window struct {
sync.RWMutex
oldest int32 // oldest bucket index
latest int32 // latest bucket index
buckets []bucket // buckets this window holds
bucketTime time.Duration // time each bucket holds
bucketNums int32 // the numbe of buckets
inWindow int32 // the number of buckets in the window
allSuccess int64
allFailure int64
allTimeout int64
conseErr int64
}
type bucket struct {
failure int64
success int64
timeout int64
}
用环形队列实现动态统计。把一个连续的时间切成多个小份,每一个 bucket 保存 BucketTime 的统计数据,BucketTime * BucketNums 是统计的时间区间。
每 BucketTime,会有一个 bucket 过期
if w.inWindow == w.bucketNums {
// the lastest covered the oldest(latest == oldest)
oldBucket := &w.buckets[w.oldest]
atomic.AddInt64(&w.allSuccess, -oldBucket.Successes())
atomic.AddInt64(&w.allFailure, -oldBucket.Failures())
atomic.AddInt64(&w.allTimeout, -oldBucket.Timeouts())
w.oldest++
if w.oldest >= w.bucketNums {
w.oldest = 0
}
} else {
w.inWindow++
}
w.latest++
if w.latest >= w.bucketNums {
w.latest = 0
}
(&w.buckets[w.latest]).Reset()
type PanelStateChangeHandler func(key string, oldState, newState State, m Metricser)
转自:
blog.cyeam.com/framework/2020/03/01/circuitbreaker
- EOF -
Go 开发大全
参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。
关注后获取
回复 Go 获取6万star的Go资源库
分享、点赞和在看
支持我们分享更多好文章,谢谢!