分布式锁原则上是一种可以解决不同节点上的程序避免资源冲突的一种锁。实现一个分布式锁可以有很多中方式,下面以Redis为例,说明如何在Go中实现一个分布式锁。
首先你需要一个Redis客户端,这里可以选择go-redis库,这是一个功能齐全的redis客户端。安装这个库的命令是:
go get -u github.com/go-redis/redis
然后你可以使用下面的代码实现一个分布式锁:
package main
import (
"fmt"
"github.com/go-redis/redis"
"time"
)
var client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
func Lock(key string, value string, expiration time.Duration) (bool, error) {
return client.SetNX(key, value, expiration).Result()
}
func Unlock(key string, value string) error {
val, err := client.Get(key).Result()
if err != nil {
return err
}
if val == value {
return client.Del(key).Err()
}
return fmt.Errorf("key %s is locked by another value %s", key, val)
}
func main() {
ok, err := Lock("my-key", "my-value", 10*time.Second)
if err != nil {
// handle error
}
if !ok {
fmt.Println("Lock failed")
} else {
fmt.Println("Lock success")
// do your job
Unlock("my-key", "my-value")
}
}
在上面的代码中,我们定义了两个函数:Lock和Unlock。Lock函数会尝试在Redis中设置一个键值对,并设置一个过期时间。如果这个键值对已经被其他地方设置了,那么SetNX函数会返回false,否则返回true。
Unlock函数则尝试删除这个键值对,但是在删除之前,会检查这个键值对的值是否符合输入的值,如果不符合,那么认为这个锁已经被其他地方获取,这时就不应该删除这个键值对。
进阶
上面例子,锁缺失两个重要的性质一个是可重入一个是如何实现到期自动续签逻辑。
在上面的代码中,我们定义了两个函数:Lock和Unlock。Lock函数会尝试在Redis中设置一个键值对,并设置一个过期时间。如果这个键值对已经被其他地方设置了,那么SetNX函数会返回false,否则返回true。
Unlock函数则尝试删除这个键值对,但是在删除之前,会检查这个键值对的值是否符合输入的值,如果不符合,那么认为这个锁已经被其他地方获取,这时就不应该删除这个键值对。
package main
import (
"sync"
"time"
"github.com/go-redis/redis"
"github.com/satori/go.uuid"
)
var client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
type Lock struct {
key string
value string
expiration time.Duration
mu sync.Mutex
isLocked bool
count int
}
func NewLock(key string, expiration time.Duration) *Lock {
return &Lock{
key: key,
value: uuid.NewV4().String(),
expiration: expiration,
}
}
func (l *Lock) Lock() (bool, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.isLocked {
l.count++
return true, nil
}
ok, err := client.SetNX(l.key, l.value, l.expiration).Result()
if err != nil || !ok {
return false, err
}
l.isLocked = true
l.count++
go l.renew()
return true, nil
}
func (l *Lock) Unlock() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.isLocked {
return nil
}
l.count--
if l.count > 0 {
return nil
}
val, err := client.Get(l.key).Result()
if err != nil {
return err
}
if val != l.value {
return nil
}
l.isLocked = false
return client.Del(l.key).Err()
}
func (l *Lock) renew() {
ticker := time.NewTicker(l.expiration / 2)
for range ticker.C {
l.mu.Lock()
if !l.isLocked {
ticker.Stop()
l.mu.Unlock()
break
}
client.Expire(l.key, l.expiration)
l.mu.Unlock()
}
}
func main() {
lock := NewLock("my-key", 10*time.Second)
locked, err := lock.Lock()
if err != nil {
panic(err)
}
if !locked {
return
}
defer lock.Unlock()
// do something
}
这段代码中,Lock结构体中新加入了mu、isLocked和count字段,分别表示互斥锁、是否已经锁定还有重入次数。当再次获取锁的时候,已经锁定则重入次数增加,否则尝试获取锁。在unlock时,如果重入次数大于零,则直接减少重入次数而不释放锁。
同时加入了renew函数,这个函数会每过一段时间检查这个锁是否已经被释放,未被释放则续期,并在锁释放后停止续期。