https://github.com/go-redsync/redsync是golang实现的一个redis分布式锁,支持quorum机制,内部通过委托模式支持https://github.com/redis/go-redis客户端和https://github.com/gomodule/redigo 客户端。首先看下如何使用,然后分析下它的源码具体实现。
package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// 创建一个redis的客户端连接
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
/*
// 创建一个redis集群模式的客户端连接
client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
Addr: []string{"localhost:6379"},
})
*/
// 创建redsync的客户端连接池
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// 创建redsync实例
rs := redsync.New(pool)
// 通过相同的key值名获取同一个互斥锁.
mutexname := "my-global-mutex"
//创建基于key的互斥锁
mutex := rs.NewMutex(mutexname)
// 对key进行加锁
if err := mutex.Lock(); err != nil {
panic(err)
}
// 锁续期
if ok, err := mutex.Extend(); err != nil || !ok {
panic(err)
}
// 释放互斥锁
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
可以看到,它的核心分为下面几步:
1,获取redis连接池
2,创建redsync的客户端连接池
3, 创建redsync实例
4,创建基于key的互斥锁
5,对key进行加锁
6,锁续期
7,释放互斥锁
下面我们依次按照上述几步分析下它的源码:
首先就是普通的redis-client创建连接池
// 创建redsync的客户端连接池
pool := goredis.NewPool(client)
然后就是基于委托模式redis/goredis/v8/goredis.go对连接池进行封装
// NewPool returns a Goredis-based pool implementation.
func NewPool(delegate redis.UniversalClient) redsyncredis.Pool {
return &pool{delegate}
}
type pool struct {
delegate redis.UniversalClient
}
创建redsync实例源码位于redsync.go,传入多个redis连接池实例就实现了红锁。
func New(pools ...redis.Pool) *Redsync {
return &Redsync{
pools: pools,
}
}
// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
type Redsync struct {
pools []redis.Pool
}
创建基于key的互斥锁,就是初始化锁需要的参数,源码位于redsync.go
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
// A Mutex is a distributed mutual exclusion lock.
type Mutex struct {
name string
expiry time.Duration
tries int
delayFunc DelayFunc
driftFactor float64
timeoutFactor float64
quorum int
genValueFunc func() (string, error)
value string
until time.Time
pools []redis.Pool
}
加锁的源码位于mutex.go,所有的操作函数都封装了两个版本,带context和不带context的
func (m *Mutex) Lock() error {
return m.LockContext(nil)
}
加锁过程是先获取锁,如果超过一半节点上获取成功,则认为加锁成功,否则释放已经加锁的节点:
func (m *Mutex) LockContext(ctx context.Context) error {
value, err := m.genValueFunc()
case <-time.After(m.delayFunc(i)):
n, err := func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
}()
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
如果加锁失败,会进行随机重试。从各个节点获取执行结果的逻辑抽象出了一个函数: 起多个goroutine发起请求,然后通过一个阻塞的chan来收集结果,返回上游供决策:
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
ch := make(chan result)
for node, pool := range m.pools {
go func(node int, pool redis.Pool) {
r := result{Node: node}
r.Status, r.Err = actFn(pool)
ch <- r
}(node, pool)
}
for range m.pools {
r := <-ch
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})
} else {
taken = append(taken, r.Node)
err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})
}
}
if len(taken) >= m.quorum {
return n, &ErrTaken{Nodes: taken}
}
return n, err
具体执行单节点加锁的逻辑位于
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
reply, err := conn.SetNX(m.name, value, m.expiry)
redis/goredis/goredis.go
func (c *conn) SetNX(name string, value string, expiry time.Duration) (bool, error) {
ok, err := c.delegate.SetNX(name, value, expiry).Result()
return ok, noErrNil(err)
}
释放锁的操作就是执行lua脚本,先判断锁是不是自己加的,如果是就释放
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
status, err := conn.Eval(deleteScript, m.name, value)
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
释放互斥锁的逻辑和加锁类似,底层函数是一样的
func (m *Mutex) Unlock() (bool, error) {
return m.UnlockContext(nil)
}
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
// UnlockContext unlocks m and returns the status of unlock.
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, m.value)
})
if n < m.quorum {
return false, err
}
return true, nil
}
最后看下续期操作,如果本地事务耗时特别长,锁过期时间内完不成操作就需要锁续期mutex.go
// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
return m.ExtendContext(nil)
}
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
})
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
}
也是执行lua脚本,先看看是不是自己加的锁,如果是则修改过期时间
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
status, err := conn.Eval(touchScript, m.name, value, expiry)
var touchScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)
除此之外,还有一个函数,判断当前持有锁是否有效,能获取到值,和我们的value相等,说明有效。
func (m *Mutex) Valid() (bool, error) {
return m.ValidContext(nil)
}
func (m *Mutex) ValidContext(ctx context.Context) (bool, error) {
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.valid(ctx, pool)
})
return n >= m.quorum, err
}
func (m *Mutex) valid(ctx context.Context, pool redis.Pool) (bool, error) {
reply, err := conn.Get(m.name)
if err != nil {
return false, err
}
return m.value == reply, nil
其实,到底需不需要红锁,我们需要判断我们的业务场景和我们资源配置,资源允许、可用性要求很高,那么可以使用红锁。那么红锁真的万无一失吗?其实不然,首先我们的value是一个随机值,既然是随机的,就有可能相同,相同了,必然锁失效。红锁是通过过半机制提升锁的可用性,防止单节点挂掉。如果过半节点都挂了,锁还可用吗?