作者 | 郭强,Apache SeaTunnel PPMCHazelcast IMDG 是一种开源分布式内存对象存储,支持多种数据结构。SeaTunnel Engine 使用 Hazelcast IMDG 将数据存储在 RAM 中,从而在集群中传播和复制数据。但由于它是分布式内存数据库,因此一旦宕机,内存中的数据就会丢失。为了解决这个问题,我们开发实现了一个 WAL (Write-Ahead Logging) 机制,从而可以将数据持久化写入第三方文件存储,以便在服务器宕机时可以恢复数据。WAL机制是将数据写入磁盘的机制。它的工作原理是在写入数据之前,先将数据写入日志文件中。当数据写入日志文件后,再将数据写入内存。这样,即使在写入数据时发生了宕机,也可以通过日志文件中的数据恢复数据。 它最重要的作用是数据恢复,一旦服务器崩溃,通过WAL日志,我们可以恢复崩溃之前的数据。这也意味如果写入WAL失败,整个操作将认为失败。 因此可以充分保证得数据的完整性和一致性,而它基本是数据库软件崩溃恢复的标准做法。对于我们来讲,大体流程则是,先写入磁盘,而后写入成功,再写入内存。如果宕机或者节点重启,则全量读取WAL文件进行数据恢复。由于 Hazelcast IMDG 是分布式内存数据库,因此每个节点都可以去进行数据操作,因此,这种情况下我们每个节点都有自己的 WAL 文件,而不是像主从架构那样,只有主节点有 WAL 文件。- 每个节点都有自己的WAL文件,每个节点都会将数据写入自己的WAL文件中。
- 当节点宕机时,会将全量读取所有的WAL文件,然后将数据恢复到内存中。
我们根据namespace来区分不同的WAL文件,每个namespace又会根据自己所在的节点创建对应的WAL文件。因此,它的文件目录可以抽象为一个树形结构,如下图所示:├── namespace
│ ├── cluster name 1
│ │ ├── business name 1
│ │ │ ├── node1
│ │ │ │ ├── wal file
│ │ │ ├── node 2
│ │ │ │ ├── wal file
│ ├── cluster name 2
│ │ ├── business name 2
│ │ │ ├── node1
│ │ │ │ ├── wal file
│ │ │ ├── node 2
│ │ │ │ ├── wal file
- namespace:区分不同的 SeaTunnel 存储业务
- cluster name:集群名称,用于区分不同的集群,每个集群都有自己的集群名称。
- business name:业务名称,用于区分不同的业务,每个业务都有自己的业务名称。
- node:节点,用于区分不同的节点,每个节点都有自己的节点名称,且不会产生冲突。
我们使用Disruptor框架来实现WAL的写入,它的写入流程如下:当用户去进行DDL操作的时候(需要注意,所有的DDL操作在我们的实际存储中都是追加,没有真正的物理删除),用户会去构建对应的WAL Data(包含操作指令,版本,序列化,namespce等一系列信息,以及对应的序列化信息等等), 然后提交消息到disruotor,这个过程是同步过程,当本次操作的数据进行消费,真正执行 append 命令后则返回,这个过程完成后会返回给用户一个成功的消息,即意味着本次操作执行成功。我们计划会有一个异步定时线程,去执行flush以及归档操作,当我们的数据达到一定的阈值后,我们会将数据进行归档,归档的过程是将数据进行压缩,然后写入到Storage-Data中,然后将WAL中的数据删除,同时创建新的WAL 文件。 我们进行过基准测试,双线程生产,单线程消费,当数据在byte级别的时候,单机可以达到上万次操作,在MB级别的时候,大概是不到一百的操作,其中byte级别的数据内存设置为512M,MB级别的数据内存设置为5G。但有待讨论,目前社区认为每次都是需要去进行fsync的,这个过程是需要消耗一定的时间的,所以我们需要在这个过程中进行优化。
我们的存储只有 K - V ,而 K 和 V 具体的类型并没有限制,他们可能是 Long 类型,Map类型,亦或者其它类型,但对于底层存储来讲,我们只需要将 K 和V 序列化成字节数组,然后将字节数组写入磁盘即可。DataEntry{
Object key;
String keyClass;
Object value;
String valueClass;
Boolean deleted;
Long timestamp;
}
而最终实际存储后的数据是DataEntry的序列化后的字节数组。
因此实际存储到磁盘的数据协议如下:
+--------------------------+---------------+
| Data Size (12) | data |
+--------------- ----------+---------------+
| record data length | Data (byte[]) |
+--------------------------+---------------+
当需要从存储中恢复的时候,我们会将存储中的数据读取出来,然后将其反序列化成DataEntry对象,然后将其放入内存中。进行数据的恢复。这个过程中会进行排序去重。当进行全量查询的时候,我们需要查询所有namespace的数据,这个时候,我们需要将所有的数据进行归档,但如果采用上面的做法,我们只查询各个节点所有的归档的WAL信息,然后进行查询即可。这个过程中会进行计算,归并操作,所有的数据都是有序的,我们需要根据数据顺序决定所有数据的最终值,举个例子:insert into data(K,V) values(1,1)
insert into data(K,V) values(1,2)
delete from data where K=1
insert into data(K,V) values(1,3)
update data set V=4 where K=1
上面这些记录一共会在存储中有五条记录,因此我们会根据版本来进行合并,在上面的语句中,如果我们查询,则实际返回的记录的是 K=1 V=4。而对于一个文件的数据归并来讲,这种过程其实更为简单,因为文件中的数据是有序的,因此我们只需要将文件中的数据读取出来,倒序排序后进行以下判断:delete set = new HashSet<>();// 存储当前已经删除的Key
data set = new HashSet<>();// 存储当前已经存在的Key
is in delete set? // 如果在删除集合中,那么直接跳过 由于倒序,因此最先put的数据一定是最新的
yes: end
no:
is deleted? // 如果是删除操作
yes:
add to delete set if not exist
end
no:
is in data set? // 如果在数据集合中,那么直接跳过 由于倒序,因此最先put的数据一定是最新的
yes: end
no:
add to data set
end
随着写入操作的执行,WAL 文件会变得越来越大。例如,如果将 key 为 1 类型为 Integer 的数据写入 100 次,那么 WAL 文件将包含 100 个相同的 key(类型为Integer) 为 1 的数据。但实际有效数据只有一条。 这也对我们每次恢复数据时的性能有影响。因此我们需要对 WAL 文件进行重写,将重复的数据进行合并。这个过程中,我们会将 WAL 文件中的数据读取出来,然后进行排序,然后进行去重,最后将去重后的数据写入新的 WAL 文件中。但这可能会有大量的内存占用(取决于具体的数据集)。一些建议的做法是,我们提供对应的CLI工具,用户可以通过CLI工具来进行WAL文件的重写。这个过程不会影响到正在运行的服务,因此可以在业务低峰期进行。我们可以对已经归档的数据进行排序去重,最后生成临时文件, 然后将临时文件重命名为原来的文件名,这样就完成了重写。分布式重写的效率更高,更快。但我们需要考虑的是,如果有多个节点同时进行重写,那么会有多个节点同时写入同一个文件,这样会导致文件的内容不一致,因此我们需要对重写的过程进行加锁,这样会导致重写的效率降低。单机重写速率略慢,但却可以保证在无竞争的状态下去进行,因此可以保证重写的正确性。WAL 文件的刷盘机制是一个比较重要的问题,如果 WAL 文件没有及时刷盘,那么在机器宕机的情况下,我们的数据就会丢失。但频繁的刷盘会导致性能的下降。 因此我们需要对 WAL 文件的刷盘机制进行优化。一般来说,我们会将 WAL 文件的刷盘机制分为两种:- 同步刷盘:每次写入数据后,都会调用 fsync() 方法,将数据刷盘。这样可以保证数据的安全性,但是会导致性能的下降。
- 周期刷盘,每隔一段时间,将数据刷盘一次。这样可以可以减少 fsync() 的调用次数,从而提高性能,但是也会存在窗口数据丢失的风险以及全量恢复时窗口数据可见性的问题。
- 基于数据量的刷盘:按照数据量来刷盘,比如每写入 1MB 的数据,就调用一次 fsync() 方法,但是也会存在窗口数据丢失的风险。
采用何种方式取决于具体的业务场景,我们是否需要还有待于讨论,但对我而言,针对不同的业务场景采用不同的方式才是比较合理的。Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台https://github.com/apache/incubator-seatunnelhttps://seatunnel.apache.org/https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProApache SeaTunnel(Incubating) 下载地址:https://seatunnel.apache.org/download我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!https://github.com/apache/incubator-seatunnel/issueshttps://github.com/apache/incubator-seatunnel/pullsdev-subscribe@seatunnel.apache.orghttps://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQhttps://twitter.com/ASFSeaTunnel