【导读】本文介绍了 PG 前后端协议的项目实践。
本文基于3.0版本协议,主要以图示方式更为形象的描述其中比较重要的链接、查询等协议,详尽的文字说明可参考官方文档《Chapter 53. Frontend/Backend Protocol》(https://www.postgresql.org/docs/current/protocol.html)或中文文档《第 53 章 前端/后端协议》(http://www.postgres.cn/docs/14/protocol.html)。
总体流程
协议大致可以分为 startup、normal两个阶段,startup阶段即建立链接阶段,包含ssl、认证、链接参数等;normal阶段即为发起SQL查询等的阶段。
startup阶段流程
(1)一般客户端在建立链接时会先询问服务端是否开启SSL加密,若开启则服务器回复'S',客户端则进行SSL握手等,若不开启则回复'N'。
(2)客户端向服务器发送 StartupMessage 启动包,服务端判断是否需要认证,若需要则发送AuthenticationRequest信息,若不需要认证或已认证则可发送AuthenticationOk,认证失败响应ErrorResponse。
PG支持多种方式认证如:AuthenticationKerberosV5、AuthenticationCleartextPassword、AuthenticationMD5Password等,后面示例将采用常见的AuthenticationMD5Password方式进行演示。
(3)认证通过后服务端一般会发送三条消息:ParameterStatus如版本号、编码格式等;BackendKeyData包括当前链接进程id、取消链接的密钥;ReadForQuery 客户端收到这条消息后表示启动成功,前端现在可以发出命令到服务端。
(4)消息报文格式
启动消息报文格式
如上图,启动报文格式一般包含消息长度4个字节(包含自身)、协议类型编号4个字节、参数键值对等。
此部分主要讲解两种查询协议Simple Query、 Extended Query,其他协议先不做讲解,感兴趣可查阅文档学习。
此两种协议消息报文格式一般为:
Query消息协议报文
Simple Query流程
如上图:
(1)客户端向服务端发起Query请求;
(2)服务端收到后立即处理,并将结果分三种类型消息返回:
RowDescription:表字段信息描述,包括字段名、字段类型等信息;
DataRow:数据信息,每条信息仅包含一行数据,如查询数据有10条,则会发送10条DataRow类型消息;
CommandComplete:表示本次查询完成,并返回处理了多少条数据,如select到10条数据,消息体为 SELECT 10;
(3)服务端向客户端发送ReadyForQuery,表示可以接收下一条命令;该消息类型中包含三种消息类型标识:
'I':表示操作不在事物内;
'T':表示操作处在事物中;
'E':在失败的事物中;
(4)消息报文示例
Simple Query消息报文
相比于Simple Query,使用 Extended Query 时,发送请求的过程被分为若干步骤,准备步骤的结果可以被多次复用以提高效率;另外,还可以获得额外的特性, 比如可以把数据值作为独立的参数提供而不是必须把它们直接插入一个查询字符串。通常包括 Parse、Describe、Bind 和 Execute等。
交互流程如下图,图示已经比较清晰,不在文字赘述。
Extended Query流程
消息报文格式
各阶段消息报文格式示意
以上基本把链接、两种类型查询协议讲解完毕,主要以流程图示方式直观展示,还是那句话详尽的文字描述及理论性概念请阅读官方文档,不必在此赘述。图示部分参考《Postgres for the wire》。
Talk is cheap. Show me the code.
相较于理论学习,实操能帮助大家更好的理解协议交互过程及方式,印象更加深刻。
pgproto3 是从开源Go版本PG驱动 pgx 中抽取的协议层库,站在巨人的肩膀,我们可以很容易的实现整个通信过程。
说明:demo实现服务端处理逻辑,demo中许多异常处理、代码规范、函数封装等都不完善,仅作为简单的演示流程,实际工程代码远比此复杂,考虑的场景也更为丰富。我们分别使用psql、pgx驱动作为客户端链接测试。
首先开启一个服务端tcp监听端口,并循环接收客户端链接,代码大致如下:
package main
import (
"bufio"
"fmt"
"log"
"net"
)
func main() {
//pg 协议是在TCP/IP和Unix 域套接字上实现的,因此先开启一个tcp端口 监听客户端请求
ln, err := net.Listen("tcp", "127.0.0.1:5432")
if err != nil {
log.Fatal(err)
}
log.Println("Listening on", ln.Addr())
for {
c := new(ClientConn)
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
log.Println("Accepted connection from", conn.RemoteAddr())
go func() {
tcpConn := conn.(*net.TCPConn)
c.c = tcpConn
//初始化reader
c.rb = bufio.NewReaderSize(c.c, 8 * 1024)
c.startup()
}()
}
}
ClientConn结构体简单的定义为:
type ClientConn struct {
rb *bufio.Reader
c net.Conn
}
下面分析启动startup()流程代码,首先定义两个接收消息的处理方法:一个判断startup message消息类型的方法,返回消息类别;一个处理一般格式的消息,返回数组:
//读取startup消息结构体
func (c *ClientConn) receiveStartupMessage() (pgproto3.FrontendMessage, error) {
header := make([]byte, 4)
if _, err := io.ReadFull(c.rb, header); err != nil {
return nil, err
}
//根据消息报文格式可知 前4位为消息体的长度
msgLen := int(binary.BigEndian.Uint32(header) - 4)
msg := make([]byte, msgLen)
//获取消息编码
if _, err := io.ReadFull(c.rb, msg); err != nil {
return nil, err
}
code := binary.BigEndian.Uint32(msg)
switch code {
//ProtocolVersionNumber
case 196608:
startMessage := &pgproto3.StartupMessage{}
if err := startMessage.Decode(msg); err != nil {
return nil, err
}
return startMessage, nil
//sslRequestNumber 询问是否为ssl协议类型
case 80877103:
sslRequest := &pgproto3.SSLRequest{}
if err := sslRequest.Decode(msg); err != nil {
return nil, err
}
return sslRequest, nil
//cancelRequestCode 取消协议类型
case 80877102:
cancelRequest := &pgproto3.CancelRequest{}
if err := cancelRequest.Decode(msg); err != nil {
return nil, err
}
return cancelRequest, nil
default:
log.Fatal("unknown startup message")
return nil, errors.New("unknown startup message")
}
}
// 读取一般格式协议报文
func (c *ClientConn) readNormalMsg() ([]byte, error) {
msgType := make([]byte, 1)
if _, err := io.ReadFull(c.rb, msgType); err != nil {
return nil, err
}
// 后面四个字节为长度,包括自己
msgLength := make([]byte, 4)
if _, err := io.ReadFull(c.rb, msgLength); err != nil {
return nil, err
}
msgLen := binary.BigEndian.Uint32(msgLength)
// 获取请求的具体信息
msg := make([]byte, msgLen-4)
if _, err := io.ReadFull(c.rb, msg); err != nil {
return nil, err
}
data := append(msgType, msg...)
return data, nil
}
然后定义一个处理startup message的方法:
func (c *ClientConn) handleStartupMessage(startupMessage *pgproto3.StartupMessage) error {
// 可以从客户端获取用户名和数据库名称等信息
username := startupMessage.Parameters["user"]
log.Println("username: ",username)
log.Println("database: ",startupMessage.Parameters["database"])
// 进行用户验证
auth := make([]byte, 0)
// 发送一个 authRequest,
// 如果是AuthenticationCleartextPassword{} 接收到的auth为明文密码
// 这里使用 MD5 加密要求; 前端必须返回一个 MD5 加密的密码进行验证
// salt 为随机生成的 4 个字节,这里我们写死几个数
salt := [4]byte{109,65,109,65}
authRequest := pgproto3.AuthenticationMD5Password{Salt: salt}
n,err :=c.c.Write(authRequest.Encode(nil))
fmt.Println(n, err)
// 读取客户端发来的密码
// 格式: 'p' + len + 'password' + '0'
// 长度 = len + password + 1
auth, err = c.readNormalMsg()
//psql中会先断开链接,输入密码后再重新建立链接,因此会出现EOF读取错误
//正常退出即可,等待下一次链接进行处理即可
if err !=nil {
return err
}
if auth[0] != 'p' {
return errors.New("received is not a password packet" + string(auth[0]))
}
// 去掉第一个 'p' 和最后一个结束符,中间的为认证信息
auth = auth[1 : len(auth)-1]
log.Println("客户端收到md5密码: ", auth)
//假定我们服务端密码为 password
pwd := "password"
//构造加密后的密码
//客户端首先将用户输入的密码进行一次MD5加密,其中用户名作为salt
//然后将服务器发送过来的4位的随机数md5Salt作为salt再进行一次MD5加密,
//并将结果作为认证信息再次发送给服务器端。
res := "md5" + fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%x", md5.Sum([]byte(pwd + username))) +
string([]byte{109,65,109,65}))))
if res!=string(auth){
//返回给客户端对应错误信息 这里就不处理了
log.Println("密码认证失败")
}
//认证成功 给客户端写回成功信息
authOK := &pgproto3.AuthenticationOk{}
c.c.Write(authOK.Encode(nil))
//写回服务端部分参数信息
parameters := map[string]string{
"client_encoding": "UTF8",
"DateStyle": "ISO, YMD",
"server_version": "version: PostgreSQL 14.2",
}
// 发送 ParameterStatus
for k, v := range parameters {
parameterStatus := &pgproto3.ParameterStatus{Name: k, Value: v}
c.c.Write(parameterStatus.Encode(nil))
}
// 发送 ReadyForQuery 表示一切准备就绪。"I"表示空闲(没有在事务中)
c.writeReadyForQuery('I')
return nil
}
//发送ReadyForQuery消息
func (c *ClientConn) writeReadyForQuery(status byte) {
readyForQuery := &pgproto3.ReadyForQuery{TxStatus: status}
c.c.Write(readyForQuery.Encode(nil))
}
startup()处理方法:
func (c *ClientConn) startup() {
m, err := c.receiveStartupMessage()
if err != nil {
log.Fatal(err)
return
}
switch m.(type) {
case *pgproto3.CancelRequest:
c.c.Close()
break
case *pgproto3.SSLRequest:
//直接模拟不支持ssl类型 写N
data := []byte{'N'}
c.c.Write(data)
// 完成 SSL 确认后需要正式接收 StartupMessage
m, err := c.receiveStartupMessage()
if err != nil {
return
}
msg, ok := m.(*pgproto3.StartupMessage)
// 如果接收到的包不为启动包则报错
if !ok {
return
}
// 接收完 SSLRequest 包后接收 StartupMessage
c.handleStartupMessage(msg)
case *pgproto3.StartupMessage:
//处理链接请求
c.handleStartupMessage(m.(*pgproto3.StartupMessage))
default:
log.Fatal("received is not a expected packet")
break
}
}
以上即是启动过程的整体代码,下面我们启动验证整个启动过程。
debug调试
服务端启动输出如下日志,等待客户端链接:
2022/04/04 21:31:35 Listening on 127.0.0.1:5432
shell中输入链接命令:
psql -h 127.0.0.1 -U root -p5432 -d testdb
首先看下客户端发送的消息体:
SSLRequest阶段协议调试
收到的第一个消息总长度为8,消息编码内容长度为4,并且确实为SSLRequest类型消息。
StartupMessage调试
当写回'N'后收到客户端发来的StartupMessage。
认证阶段调试
可以从启动包获取用户名等信息,并且写回需要md5方式认证,此时需要说明一点:psql链接端会断开此链接,等用户输入密码后重新建立链接,因此会收到一条EOF消息,正常退出循环即可,等待下次链接接入。
此时客户端需要用户输入密码,输入代码内置的密码 password:
客户端输入密码
此时服务端收到开头为'p'的消息,表示密码验证,最后一位为结束符:
收到md5加密密码信息
服务端依据md5规则计算预置密码的密文,并做比较,注意计算时涉及两次md5加密,分别以用户名和服务端加盐字段拼接,详见如下:
密码计算验证
之后服务端向客户端发送ParameterStatus、ReadyForQuery消息,完成psql与服务端建立链接。(这里没发送BackendKeyData消息)
此时psql显示可以正常执行命令:
链接建立完成
首先实现一个循环处理消息的方法,目前仅处理Simple Query类型的 'Q' 消息,然后直接定义返回数据消息,演示消息返回:
func (c *ClientConn) run() {
for {
data, err := c.readNormalMsg()
if err != nil {
return
}
//处理数据
cmd := data[0]
data = data[1:]
switch cmd {
//http://www.postgres.cn/docs/12/protocol-message-formats.html
//在简单查询模式中,检索出来的值的格式总是文本,除非给出的命令是在一个使用BINARY选项声明的游标上FETCH。
case 'Q': /* simple query */
if len(data) > 0 && data[len(data)-1] == 0 {
data = data[:len(data)-1]
dataStr := hack.String(data)
}
log.Println("获取的客户端查询语句:", dataStr)
//mock字段描述
buf := (&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
{
Name: []byte("name"),//字段名称
TableOID: 0,
TableAttributeNumber: 0,
DataTypeOID: 25,//字段数据类型
DataTypeSize: -1,
TypeModifier: -1,
Format: 0,//0表示text 1表示二进制格式
},
}}).Encode(nil)
//mock结果返回
buf = (&pgproto3.DataRow{Values: [][]byte{[]byte("tom")}}).Encode(buf)
buf = (&pgproto3.CommandComplete{CommandTag: []byte("SELECT 1")}).Encode(buf)
buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
c.c.Write(buf)
default:
log.Fatal("command %d not supported now", cmd)
}
}
}
并把方法加入main处理流程:
......
go func() {
tcpConn := conn.(*net.TCPConn)
c.c = tcpConn
//初始化reader
c.rb = bufio.NewReaderSize(c.c, 8 * 1024)
c.startup()
//添加命令处理方法
c.run()
}()
......
debug调试
客户端发起模拟查询命令:
模拟查询
服务端接收消息并解析内容,处理过程类似密码接收,不再赘述,可以从消息体总获取客户端的命令,下面模拟服务端数据返回:
接收消息
分别模拟返回RowDescription、DataRow、CommandComplete以及ReadyForQuery消息,为了方便消息体内容均写死返回;可以看到发送的字节均是按照固定消息协议格式发送:
数据消息发送
写回客户端后展示如下:
客户端展示如下
psql默认采用Simple Query方式执行命令,而pgx驱动默认使用Extended Query方式执行,因此选用pgx作为扩展查询的测试客户端。
首先增加处理Parse、Describe、Bind、Execute、Sync消息协议代码,同样为便于演示将返回信息写死:
case 'P': /* parse */
parse := &pgproto3.Parse{}
parse.Decode(data)
parseComplete := &pgproto3.ParseComplete{}
c.c.Write(parseComplete.Encode(nil))
case 'B': /* bind */
bind := &pgproto3.Bind{}
bind.Decode(data)
bindComplete := &pgproto3.BindComplete{}
c.c.Write(bindComplete.Encode(nil))
case 'E': /* execute */
execute := &pgproto3.Execute{}
execute.Decode(data)
buf := (&pgproto3.DataRow{Values: [][]byte{[]byte("tom")}}).Encode(nil)
buf = (&pgproto3.CommandComplete{CommandTag: []byte("SELECT 1")}).Encode(buf)
c.c.Write(buf)
case 'C': /* close */
close := &pgproto3.Close{}
close.Decode(data)
c.c.Close()
case 'D': /* describe */
desc := &pgproto3.Describe{}
desc.Decode(data)
parameterDes := &pgproto3.ParameterDescription{}
c.c.Write(parameterDes.Encode(nil))
buf := (&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
{
Name: []byte("name"),//字段名称
TableOID: 0,
TableAttributeNumber: 0,
DataTypeOID: 25,//字段数据类型
DataTypeSize: -1,
TypeModifier: -1,
Format: 0,//0表示text 1表示二进制格式
},
}}).Encode(nil)
c.c.Write(buf)
case 'S': /* sync */
sync := &pgproto3.Sync{}
sync.Decode(data)
c.writeReadyForQuery('I')
pgx链接查询代码:
package main
import (
"context"
"fmt"
"github.com/jackc/pgx/v4"
"os"
)
func main() {
/** pg数据库链接测试**/
urlExample := "postgres://root:password@127.0.0.1:5432/testdb"
conn, err := pgx.Connect(context.Background(), urlExample)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer conn.Close(context.Background())
var name string
err = conn.QueryRow(context.Background(), "select * from test;").Scan(&name)
if err != nil {
fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
os.Exit(1)
}
fmt.Println(name)
}
debug调试
startup阶段与psql大同小异,此处略过,直接进行扩展查询协议调试。
打入断点,运行pgx 链接查询代码,首先进入Parse阶段,因代码查询中没有指定变量参数,所以收到的parse结构体信息为空字符,同时写回ParseComplete消息:
Parse阶段
接着第一次进入Describe阶段,写回ParameterDescription、RowDescription,因没有参数信息,这里使用一个空的ParameterDescription:
第一次Describe阶段
然后进入第一次Sync阶段,写回ReadyForQuery:
第一次Sync阶段
然后进入Bind阶段,按照名称绑定查询方法,写回BindComplete消息:
Bind阶段
然后进入第二次Describe阶段,此时写回RowDescription即可,demo中未做区分因此也写回了两种消息:
第二次Describe阶段
然后进入Execute阶段,因demo写回的都是空数据,所以此处收到的Portal参数为空,正常为绑定的名称;此阶段直接写回数据,DataRow以及CommandComplete;如下:
Execute阶段
最后进入第二次Sync阶段,同样写回ReadyForQuery,告诉客户端可以执行其他命令:
第二次Sync阶段
回到pgx链接代码,此时获取到服务端返回数据 name = "tom",至此整个扩展查询流程结束:
pgx客户端收到查询结果
全文以图示及demo调试的方式详细阐述了PG前后端通信协议过程,主要包含两个查询子协议,希望对理解协议本身有所帮助;其余协议过程后续有需要再补充~
转自:
zhuanlan.zhihu.com/p/493045524
- EOF -
Go 开发大全
参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。
关注后获取
回复 Go 获取6万star的Go资源库
分享、点赞和在看
支持我们分享更多好文章,谢谢!