本文侧重 HDFS 消息定义, 读写流程, 默认读者了解 HDFS 基本概念和操作, 如有不了解, 可以翻阅其他资料.
之前(大概2023年年中)在使用 rust 操作 HDFS 文件时发现 rust 的 HDFS 客户端基本都依赖 java 的客户端, 在使用前需要先下好 HDFS java 依赖, 然后配置 JAVA_HOME 等环境变量; 使用还需要启动 jvm 通过 jni 交互, 臃肿又不方便, 所以当时就萌生了使用 rust 从头实现 HDFS 客户端想法, 毕竟我之前从头实现过 smtp, websocket, mysql binlog 协议, 对网络编程和协议解析都比较熟悉, HDFS 应该不会很难.
可等我真正从头实现却发现前方困难重重, 社区没原生 HDFS 客户端也情有可原. 第一个困难来自混乱不清的文档, 不管是官方还是社区, 英文还是中文; 基本都是对 HDFS 协议简单解释, 配上官方文档的一两个配图, 到了具体消息格式, 官方文档是过时的, 其他文档也没有, 只能参照 java 源码和其他语言实现. 第二个困难来自 HDFS 奇葩的消息定义, 3.x 版本的 NameNode 消息使用 protocol buffer 定义和编解码, 但又不使用标准的 grpc, 而是自己定义一套 header 规范, 导致读写消息时只能逐个实验. 第三个困难来自 HDFS 文档里的读写流程与消息定义不能完全对应, 要实现只能靠看源码和试验. 第四个困难来自搭建 HDFS 测试环境, 即使要部署一套单机测试环境, 也不是一件易事.
在我克服上述困难完成纯 rust 实现的 HDFS 客户端后, 我想应该从头梳理下 HDFS 协议消息体定义和流程, 留下比较准确的记录, 方便以后项目开发和社区贡献.
HDFS 里要完成文件读写需要 NameNode 和 DataNode 两种类型节点, NameNode 负责存放文件元数据, 因此在读写文件前客户端首先需要与 NameNode 通信, 获取文件位置, 获取到文件位置(即文件在哪些 DataNode) 后客户端与 DataNode 进行通信开始传输文件. 客户与两种节点的通信协议各不相同, 为了方便描述, 本文将客户端与 NameNode 直接的通信称为 HRpc, 客户端与 DataNode 的通信称为 DataTransfer.
在 HDFS 里文件会被切分成一个个块(Block), 读写时客户端也需要处理属于一个文件的多个Block, 详细过程会在下面的读写章节描述.
HRpc 流程图如下, 客户端创建好 tcp 连接后会先发送 Handshake 消息, 告诉服务端自己使用的协议版本认证协议等, 接着发送 RequestHeader 和 IpcContext 信息, 接着就可以按固定格式调用各个 rpc 方法和读取调用结果.
Handshake 定义如下, 编码时需要依次写入 hrpc
, version
, server_class
和 auth_protocol
即可, 其中 version 默认为 9, 其他两个字段填0即可.
#[derive(Debug, Clone)]
pub struct Handshake {
pub version: u8,
pub server_class: u8,
pub auth_protocol: u8,
}
发送 IpcContext 时首先需要写入大端序的u32,表示包总长度, 接着是encode_length_delimited[1]编码的RpcRequestHeaderProto[2] 和 IpcConnectionContextProto[3]
此消息不需要等待 NameNode 端响应, 可直接进行后面的 rpc 方法调用
rpc 请求与 ipc 请求类似, 不同的是, rpc 请求需要额外传输方法名和方法参数, 然后需要读取 NameNode 响应, NameNode 响应也由响应头和与方法对应的响应组成, 响应对应的请求和响应可以在HRpc[4]文档里找到.
如果调用出现错误, NameNode 会将 RpcResponseHeaderProto 里的 status 设置为 Error, 具体错误信息则放在 error_msg 字段.
下面以delete[5]方法为例说明 rpc 调用流程.
在上文我们提到 HDFS 对文件的读写其实是按块进行的, 这里为了方便, 我们将负责读写块的结构体/流程分别称为 BlockReader 和 BlockWriter.
客户端通过 HRpc 的 getFileInfo 和 getBlockLocations 方法获得文件的元数据和所有块位置信息, 接着可以按顺序依次读取每个块. 如果 HDFS 设置了多副本, 每个块下 locs
字段会有多个 DataNode 信息, 客户端可以通过这些 DataNode 信息自动选择最近或按其他逻辑选择合适的 location 开始读取.
客户端创建好连接后需要先发送 OpReadBlockProto[6], 将客户端ID,认证信息, 块信息和是否进行checksum校验等信息发送给 DataNode, 此外理论上 offset 可以指定读哪里开始读, 但设置似乎不生效. 如果 DataNode 返回成功响应, 客户端还需要再读取PacketHeaderProto[7]数据长度, checksum等消息, 接着DataNode会发送客户端要读取的数据.
如果要求发送 checksum(一个u32数值), DataNode 会在 BlockOpResponseProto 字段里设置对应的 checksum 方法和每多少字节计算一次 checksum, 也就是 bytes_per_checksum, 客户端需要根据公式 data_len.div_ceil(bytes_per_checksum) * 4
, 计算 checksum 数据长度.
相比于读, 写流程更复杂些, 写还分为 create 和 append 模式, 不过 append 仅在第一个 block 处理上和 create 有区别, append 需要将 offset 设置为已有数据长度, 而不是 0; 写入的时候可以不按 bytes_per_checksum 数量写入, 但如果开启 checksum, 每次写入时不论数据多少, 必须同时发送 checksum, 此外如果当前块offset + 写入字节数超过 bytes_per_checksum, 在 bytes_per_checksum 切分, 计算两次 checksum 发送, 否则会导致 server error. 最后, Client 还需要向 NameNode 更新块状态.
为了易读性, 后面的流程图省略了 HRpc 和 BlockReader 具体过程.
上述读写流程也只是提供大致视角, 如果你想了解更多 HDFS 细节可以阅读hdfs-client[8] 源码, 也欢迎使用 hdfs-client 提 pr 和 issue.
PS: 封面来自北京初雪晚上散步所拍
[1]
encode_length_delimited: https://docs.rs/prost/latest/prost/trait.Message.html#method.encode_length_delimited[2]
RpcRequestHeaderProto: https://docs.rs/hdfs-types/0.1.0/hdfs_types/common/struct.RpcRequestHeaderProto.html[3]
IpcConnectionContextProto: https://docs.rs/hdfs-types/0.1.0/hdfs_types/common/struct.IpcConnectionContextProto.html[4]
HRpc: https://docs.rs/hdfs-client/latest/hdfs_client/hrpc/struct.HRpc.html[5]
delete: https://docs.rs/hdfs-client/latest/hdfs_client/hrpc/struct.HRpc.html#method.delete[6]
OpReadBlockProto: https://docs.rs/hdfs-types/0.1.0/hdfs_types/hdfs/struct.OpReadBlockProto.html[7]
PacketHeaderProto: https://docs.rs/hdfs-types/0.1.0/hdfs_types/hdfs/struct.PacketHeaderProto.html[8]
hdfs-client: https://github.com/PrivateRookie/hdfs-client