写完【paxos 的直观解释】之后,网友都说疗效甚好,但是也会对这篇教程中一些环节提出疑问(有疑问说明真的看懂了 🤔),例如怎么把只能确定一个值的 paxos 应用到实际场景中。
既然 Talk is cheap,那么就 Show me the code,这次我们把教程中描述的内容直接用代码实现出来,希望能覆盖到教程中的涉及的每个细节。帮助大家理解 paxos 的运行机制。
这是一个基于 paxos,200 行代码的 kv 存储系统的简单实现,作为【paxos 的直观解释】这篇教程中的代码示例部分。Paxos 的原理本文不再介绍了,本文提到的数据结构使用【protobuf】定义,网络部分使用【grpc】定义。另外 200 行 go 代码实现 paxos 存储。
文中的代码可能做了简化, 完整代码实现在【paxoskv】这个项目中(naive 分支)。
跑一下:
git clone https://github.com/openacid/paxoskv.git
cd paxoskv
go test -v ./...
这个项目中除了 paxos 实现,用 3 个 test case 描述了 3 个 paxos 运行的例子,
【TestCase1SingleProposer】:无冲突运行。
【TestCase2DoubleProposer】:有冲突运行。
【Example_setAndGetByKeyVer】作为 key-val 使用。
测试代码描述了几个 paxos 运行例子的行为,运行测试可以确认 paxos 的实现符合预期。
本文中 protobuf 的数据结构定义如下:
service PaxosKV {
rpc Prepare (Proposer) returns (Acceptor) {}
rpc Accept (Proposer) returns (Acceptor) {}
}
message BallotNum {
int64 N = 1;
int64 ProposerId = 2;
}
message Value {
int64 Vi64 = 1;
}
message PaxosInstanceId {
string Key = 1;
int64 Ver = 2;
}
message Acceptor {
BallotNum LastBal = 1;
Value Val = 2;
BallotNum VBal = 3;
}
message Proposer {
PaxosInstanceId Id = 1;
BallotNum Bal = 2;
Value Val = 3;
}
// struct KVServer
Storage : map[string]Versions
func Accept(c context.Context, r *Proposer) (*Acceptor, error)
func Prepare(c context.Context, r *Proposer) (*Acceptor, error)
func getLockedVersion(id *PaxosInstanceId) *Version
// struct Proposer
func Phase1(acceptorIds []int64, quorum int) (*Value, *BallotNum, error)
func Phase2(acceptorIds []int64, quorum int) (*BallotNum, error)
func RunPaxos(acceptorIds []int64, val *Value) *Value
func rpcToAll(acceptorIds []int64, action string) []*Acceptor
func ServeAcceptors(acceptorIds []int64) []*grpc.Server
在存储端(Acceptor)也有几个概念:
last_rnd 是 Acceptor 记住的最后一次进行写前读取的 Proposer(客户端)是谁,以此来决定谁可以在后面真正把一个值写到存储中。 v 是最后被写入的值。 vrnd 跟 v 是一对, 它记录了在哪个 Round 中 v 被写入了。
原文中这些名词是参考了【paxos made simple】中的名称,但在【Leslie Lamport】后面的几篇 paper 中都换了名称,为了后续方便,在【paxoskv】的代码实现中也做了相应的替换:
rnd ==> Bal // 每一轮paxos的编号, BallotNum
vrnd ==> VBal // 在哪个Ballot中v被Acceptor 接受(voted)
last_rnd ==> LastBal
message Acceptor {
BallotNum LastBal = 1;
Value Val = 2;
BallotNum VBal = 3;
}
message Proposer {
PaxosInstanceId Id = 1;
BallotNum Bal = 2;
Value Val = 3;
}
message PaxosInstanceId {
string Key = 1;
int64 Ver = 2;
}
message BallotNum {
int64 N = 1;
int64 ProposerId = 2;
}
RPC 消息定义了 Proposer 和 Acceptor 之间的通讯。
在一个 paxos 系统中,至少要有 4 个消息:
Phase 1 的 Prepare-request,Prepare-reply
Phase 2 的 Accept-request,Accept-reply
如【slide-28】所描述的(原文中使用 rnd,这里使用 Bal,都是同一个概念):
Phase- 1(Prepare): request:
Bal: int
reply:
LastBal: int
Val: string
VBal: intPhase- 2(Accept):
request:
Bal: int
Val: string
reply:
LastBal: int
service PaxosKV {
rpc Prepare (Proposer) returns (Acceptor) {}
rpc Accept (Proposer) returns (Acceptor) {}
}
🚀
protoc \
--proto_path=proto \
--go_out=plugins=grpc:paxoskv \
paxoskv.proto
type Acceptor struct {
LastBal *BallotNum ...
Val *Value ...
VBal *BallotNum ...
...
}
type paxosKVClient struct {
cc *grpc.ClientConn
}
type PaxosKVClient interface {
Prepare(
ctx context.Context,
in *Proposer,
opts ...grpc.CallOption
) (*Acceptor, error)
Accept(
ctx context.Context,
in *Proposer,
opts ...grpc.CallOption
) (*Acceptor, error)
}
type PaxosKVServer interface {
Prepare(context.Context,
*Proposer) (*Acceptor, error)
Accept(context.Context,
*Proposer) (*Acceptor, error)
}
type Version struct {
mu sync.Mutex
acceptor Acceptor
}
type Versions map[int64]*Version
type KVServer struct {
mu sync.Mutex
Storage map[string]Versions
}
其中 Version 对应一个 key 的一次变化,也就是对应一个 paxos 实例,Versions 对应一个 key 的一系列变化,Storage 就是所有 key 的所有变化。
Acceptor,是这个系统里的 server 端,监听一个端口,等待 Proposer 发来的请求并处理,然后给出应答。
根据 paxos 的定义,Acceptor 的逻辑很简单:在【slide-28】中描述:
根据教程里的描述,为 KVServer 定义 handle Prepare-request 的代码:
func (s *KVServer) Prepare(
c context.Context,
r *Proposer) (*Acceptor, error) {
v := s.getLockedVersion(r.Id)
defer v.mu.Unlock()
reply := v.acceptor
if r.Bal.GE(v.acceptor.LastBal) {
v.acceptor.LastBal = r.Bal
}
return &reply, nil
}
func (s *KVServer) getLockedVersion(
id *PaxosInstanceId) *Version {
s.mu.Lock()
defer s.mu.Unlock()
key := id.Key
ver := id.Ver
rec, found := s.Storage[key]
if !found {
rec = Versions{}
s.Storage[key] = rec
}
v, found := rec[ver]
if !found {
// initialize an empty paxos instance
rec[ver] = &Version{
acceptor: Acceptor{
LastBal: &BallotNum{},
VBal: &BallotNum{},
},
}
v = rec[ver]
}
v.mu.Lock()
return v
}
handle Accept-request 的处理类似,在【slide-31】中描述:
func (s *KVServer) Accept(
c context.Context,
r *Proposer) (*Acceptor, error) {
v := s.getLockedVersion(r.Id)
defer v.mu.Unlock()
reply := Acceptor{
LastBal: &*v.acceptor.LastBal,
}
if r.Bal.GE(v.acceptor.LastBal) {
v.acceptor.LastBal = r.Bal
v.acceptor.Val = r.Val
v.acceptor.VBal = r.Bal
}
return &reply, nil
}
Acceptor 的逻辑到此完整了,再看 Proposer:
func (p *Proposer) Phase1(
acceptorIds []int64,
quorum int) (*Value, *BallotNum, error) {
replies := p.rpcToAll(acceptorIds, "Prepare")
ok := 0
higherBal := *p.Bal
maxVoted := &Acceptor{VBal: &BallotNum{}}
for _, r := range replies {
if !p.Bal.GE(r.LastBal) {
higherBal = *r.LastBal
continue
}
if r.VBal.GE(maxVoted.VBal) {
maxVoted = r
}
ok += 1
if ok == quorum {
return maxVoted.Val, nil, nil
}
}
return nil, &higherBal, NotEnoughQuorum
}
func (p *Proposer) rpcToAll(
acceptorIds []int64,
action string) []*Acceptor {
replies := []*Acceptor{}
for _, aid := range acceptorIds {
var err error
address := fmt.Sprintf("127.0.0.1:%d",
AcceptorBasePort+int64(aid))
conn, err := grpc.Dial(
address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := NewPaxosKVClient(conn)
ctx, cancel := context.WithTimeout(
context.Background(), time.Second)
defer cancel()
var reply *Acceptor
if action == "Prepare" {
reply, err = c.Prepare(ctx, p)
} else if action == "Accept" {
reply, err = c.Accept(ctx, p)
}
if err != nil {
continue
}
replies = append(replies, reply)
}
return replies
}
Proposer 运行的 Phase2 在【slide-30】中描述,比 Phase1 更简单:
在第 2 阶段 phase-2,Proposer X 将它选定的值写入到 Acceptor 中,这个值可能是它自己要写入的值,或者是它从某个 Acceptor 上读到的 v(修复)。
func (p *Proposer) Phase2(
acceptorIds []int64,
quorum int) (*BallotNum, error) {
replies := p.rpcToAll(acceptorIds, "Accept")
ok := 0
higherBal := *p.Bal
for _, r := range replies {
if !p.Bal.GE(r.LastBal) {
higherBal = *r.LastBal
continue
}
ok += 1
if ok == quorum {
return nil, nil
}
}
return &higherBal, NotEnoughQuorum
}
完整的 paxos 由 Proposer 负责,包括:如何选择一个值,使得一致性得以保证。如【slide-29】中描述的:
Proposer X 收到多数(quorum)个应答,就认为是可以继续运行的。如果没有联系到多于半数的 acceptor,整个系统就 hang 住了,这也是 paxos 声称的只能运行少于半数的节点失效。这时 Proposer 面临 2 种情况: 所有应答中都没有任何非空的 v,这表示系统之前是干净的,没有任何值已经被其他 paxos 客户端完成了写入(因为一个多数派读一定会看到一个多数派写的结果),这时 Proposer X 继续将它要写的值在 phase-2 中真正写入到多于半数的 Acceptor 中。 如果收到了某个应答包含被写入的 v 和 vrnd,这时,Proposer X 必须假设有其他客户端(Proposer)正在运行,虽然 X 不知道对方是否已经成功结束,但任何已经写入的值都不能被修改!所以 X 必须保持原有的值。于是 X 将看到的最大 vrnd 对应的 v 作为 X 的 phase-2 将要写入的值。 这时实际上可以认为 X 执行了一次(不知是否已经中断的)其他客户端(Proposer)的修复。
func (p *Proposer) RunPaxos(
acceptorIds []int64,
val *Value) *Value {
quorum := len(acceptorIds)/2 + 1
for {
p.Val = val
maxVotedVal, higherBal, err := p.Phase1(
acceptorIds, quorum)
if err != nil {
p.Bal.N = higherBal.N + 1
continue
}
if maxVotedVal != nil {
p.Val = maxVotedVal
}
// val == nil 是一个读操作,
// 没有读到voted值不需要Phase2
if p.Val == nil {
return nil
}
higherBal, err = p.Phase2(
acceptorIds, quorum)
if err != nil {
p.Bal.N = higherBal.N + 1
continue
}
return p.Val
}
}
Val=foo Val=bar ?
VBal=3 VBal=2 ?
------- ------- --
A0 A1 A2
if p.Val == nil {
return nil
}
prop := Proposer{
Id: &PaxosInstanceId{
Key: "foo",
Ver: 0,
},
Bal: &BallotNum{N: 0, ProposerId: 2},
}
// 写:
v := prop.RunPaxos(acceptorIds, &Value{Vi64: 5})
// 读:
v := prop.RunPaxos(acceptorIds, nil)
第1个例子是 paxos 无冲突的运行【slide-32】:
func TestCase1SingleProposer(t *testing.T) {
ta := require.New(t)
acceptorIds := []int64{0, 1, 2}
quorum := 2
// 启动3个Acceptor的服务
servers := ServeAcceptors(acceptorIds)
defer func() {
for _, s := range servers {
s.Stop()
}
}()
// 用要更新的key和version定义paxos 实例的id
paxosId := &PaxosInstanceId{
Key: "i",
Ver: 0,
}
var val int64 = 10
// 定义Proposer, 随便选个Proposer id 10.
var pidx int64 = 10
px := Proposer{
Id: paxosId,
Bal: &BallotNum{N: 0, ProposerId: pidx},
}
// 用左边2个Acceptor运行Phase1,
// 成功, 没有看到其他的ballot number
latestVal, higherBal, err := px.Phase1(
[]int64{0, 1}, quorum)
ta.Nil(err, "constitued a quorum")
ta.Nil(higherBal, "no other proposer is seen")
ta.Nil(latestVal, "no voted value")
// Phase1成功后, 因为没有看到其他voted的值,
// Proposer选择它自己的值进行后面的Phase2
px.Val = &Value{Vi64: val}
// Phase 2
higherBal, err = px.Phase2(
[]int64{0, 1}, quorum)
ta.Nil(err, "constitued a quorum")
ta.Nil(higherBal, "no other proposer is seen")
}
本文用到的代码在 paxoskv 项目的 naive 分支上:
【https://github.com/openacid/paxoskv/tree/naive】
【paxos made simple】:http://lamport.azurewebsites.net/pubs/pubs.html#paxos-simple
【Leslie Lamport】:http://www.lamport.org/
【protobuf】:https://developers.google.com/protocol-buffers
【install-protoc】:https://grpc.io/docs/protoc-installation/
【grpc】:https://grpc.io/
【paxos的直观解释】:https://blog.openacid.com/algo/paxos
【issue】:https://github.com/openacid/paxoskv/issues/new/choose
【paxoskv】:https://github.com/openacid/paxoskv/tree/naive
【TestCase1SinglePropose】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/paxos_slides_case_test.go#L11
【TestCase2DoubleProposer】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/paxos_slides_case_test.go#L57
【Example_setAndGetByKeyVer】:https://github.com/openacid/paxoskv/blob/naive/paxoskv/example_set_get_test.go
【Openraft】: https://github.com/datafuselabs/openraft
Databend 文档:https://databend.rs/
Twitter:https://twitter.com/Datafuse_Labs
Slack:https://datafusecloud.slack.com/
Wechat:Databend
GitHub :https://github.com/datafuselabs/databend