SOFA-JRaft 源码解析:节点的启动过程

Raft 协议相信大家都不陌生,作为分布式共识算法的一员,旨在提供与 Paxos 共识算法相同的容错性和性能的前提下,追求更好的可理解性和工程可实现性。过去几年,围绕 Raft 协议涌现出了一系列各类语言的实现(参考 Raft 协议官网),这也充分证明了该算法相对于 Paxos 算法在理解和实现层面的友好性。

SOFA-JRaft 是一个基于 Raft 协议的 java 语言实现算法库,提供生产级别的稳定性、容错性,以及高性能,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。

本系列文章将从源码层面剖析 SOFA-JRaft 的设计与实现,区别于 SOFA:JRaftLab/ 项目中的一系列文章侧重于介绍 SOFA-JRaft 各模块的设计,本系列文章更加侧重于 SOFA-JRaft 各模块的实现,从实现层面再反观 SOFA-JRaft 的架构设计。

注:本系列文章如不做特殊说明,均使用 JRaft 指代 SOFA-JRaft,使用 Raft 指代 Raft 协议。

Leader 选举示例

在正式开始之前还是先引用一个 JRaft 的官方示例,演示如何基于 JRaft 实现分布式场景下的主节点选举。ElectionBootstrap 是整个示例的驱动类,对应的 main 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(final String[] args) {
if (args.length < 4) {
System.out.println(
"Useage : java com.alipay.sofa.jraft.example.election.ElectionBootstrap {dataPath} {groupId} {serverId} {initConf}");
System.out.println(
"Example: java com.alipay.sofa.jraft.example.election.ElectionBootstrap /tmp/server1 election_test 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
System.exit(1);
}
final String dataPath = args[0]; // 数据根路径
final String groupId = args[1]; // 组 ID
final String serverIdStr = args[2]; // 节点地址
final String initialConfStr = args[3]; // 初始节点列表

// 节点初始化参数设置
final ElectionNodeOptions electionOpts = new ElectionNodeOptions();
electionOpts.setDataPath(dataPath);
electionOpts.setGroupId(groupId);
electionOpts.setServerAddress(serverIdStr);
electionOpts.setInitialServerAddressList(initialConfStr);

final ElectionNode node = new ElectionNode();
// 注册监听器,监听当前节点竞选 leader 成功或 stepdown
node.addLeaderStateListener(new LeaderStateListener() {

final PeerId serverId = node.getNode().getLeaderId();
final String ip = serverId.getIp();
final int port = serverId.getPort();

@Override
public void onLeaderStart(long leaderTerm) {
System.out.println("[ElectionBootstrap] Leader's ip is: " + ip + ", port: " + port);
System.out.println("[ElectionBootstrap] Leader start on term: " + leaderTerm);
}

@Override
public void onLeaderStop(long leaderTerm) {
System.out.println("[ElectionBootstrap] Leader stop on term: " + leaderTerm);
}
});

// 初始化并启动节点
node.init(electionOpts);
}

由上述实现可以看出在 Leader 选举场景下启动一个 JRaft 节点需要指定 4 个参数,包括:

  • 数据存储根路径,用于存储日志、元数据,以及快照数据。
  • 组 ID,一个组可以看做是一个独立的 Raft 集群,JRaft 支持 MULTI-RAFT-GROUP。
  • 节点地址,即当前节点的 IP 和端口号。
  • 初始集群节点列表,即初始构成 JRaft 集群的节点列表。

ElectionNode 是整个启动示例的核心实现类,方法 ElectionNode#init 实现了初始化和启动单个 JRaft 节点的逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public boolean init(final ElectionNodeOptions opts) {
// 已经启动,避免重复启动
if (this.started) {
LOG.info("[ElectionNode: {}] already started.", opts.getServerAddress());
return true;
}

// 构造 Raft 节点配置对象
NodeOptions nodeOpts = opts.getNodeOptions();
if (nodeOpts == null) {
nodeOpts = new NodeOptions();
}

// 设置选举状态机
this.fsm = new ElectionOnlyStateMachine(this.listeners);
nodeOpts.setFsm(this.fsm);

// 初始化集群节点配置
final Configuration initialConf = new Configuration();
if (!initialConf.parse(opts.getInitialServerAddressList())) {
throw new IllegalArgumentException("Fail to parse initConf: " + opts.getInitialServerAddressList());
}
// Set the initial cluster configuration
nodeOpts.setInitialConf(initialConf);

// 创建数据存储路径,用于存储日志和元数据信息
final String dataPath = opts.getDataPath();
try {
FileUtils.forceMkdir(new File(dataPath));
} catch (final IOException e) {
LOG.error("Fail to make dir for dataPath {}.", dataPath);
return false;
}
// Set the data path
// Log, required
nodeOpts.setLogUri(Paths.get(dataPath, "log").toString());
// Metadata, required
nodeOpts.setRaftMetaUri(Paths.get(dataPath, "meta").toString());
// 对于 Leader 选举而言,无需启用快照机制
// nodeOpts.setSnapshotUri(Paths.get(dataPath, "snapshot").toString());

final String groupId = opts.getGroupId();
final PeerId serverId = new PeerId();
if (!serverId.parse(opts.getServerAddress())) {
throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress());
}

// 创建并初始化 raft 节点,以 RPC 服务的形式运行
final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOpts, rpcServer);
this.node = this.raftGroupService.start();
if (this.node != null) {
this.started = true;
}
return this.started;
}

实现一个 JRaft 节点的初始化和启动过程主要分为两个步骤:

  1. 构造 Raft 节点配置 NodeOptions 对象;
  2. 初始化并启动节点,以 RPC 服务的方式运行。

构建 NodeOptions 配置对象的过程比较简单,这里主要说明一下状态机 StateMachine 选项。以典型的基于 JRaft 实现的 KV 数据库为例,当我们往数据库中写入一条数据时,对于 JRaft 而言就像是我们往 Leader 节点发送了一条指令。Leader 节点会将该指令封装成一条日志条目 LogEntry 记录到本地,并复制给集群中的其它 Follower 节点。当集群中过半数节点都完成了对该 LogEntry 的复制之后,Leader 节点认为可以提交该条目(即将该 LogEntry 的状态修改为 committed),并在未来的某个时刻将该 LogEntry 中的指令应用到本地存储介质中。

整个流程中封装指令为 LogEntry 对象,接着将 LogEntry 复制到大部分 Follower 节点,并提交该 LogEntry 的过程都是通用的,由 JRaft 负责实现。然而,对于指令的解析和应用则需要结合具体的应用场景,以 KV 数据库场景为例,需要解析出 LogEntry 中的指令,并依据指令类型决定对相应的 key 做下一步具体操作,是更新还是删除。JRaft 定义了一个 StateMachine 接口,并通过 StateMachine#onApply 方法将已经成功同步给集群中过半数节点的 LogEntry 对应的指令透传给用户,由用户去实现对指令的处理逻辑。

继续回到本示例,Leader 选举是 Raft 协议内置的功能,可以不涉及用户指令,所以上述示例对于 StateMachine 接口的实现类 ElectionOnlyStateMachine 也仅仅是简单打印了些日志,这里不再展开。

JRaft 节点的初始化和启动过程由 RaftGroupService 类封装,JRaft 将其定义为一个框架类,用于封装 JRaft 节点的创建和启动过程。方法 RaftGroupService#start 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public synchronized Node start(final boolean startRpcServer) {
if (this.started) {
return this.node;
}
// 校验节点地址的有效性,不允许设置 IP 为 0.0.0.0
if (this.serverId == null
|| this.serverId.getEndpoint() == null
|| this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {
throw new IllegalArgumentException("Blank serverId:" + this.serverId);
}

// 组 ID 不允许为空
if (StringUtils.isBlank(this.groupId)) {
throw new IllegalArgumentException("Blank group id" + this.groupId);
}

// 记录进程内的 RPC 服务地址列表
NodeManager.getInstance().addAddress(this.serverId.getEndpoint());

// 创建并初始化 raft 节点
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
if (startRpcServer) {
// 启动节点
this.rpcServer.init(null);
} else {
LOG.warn("RPC server is not started in RaftGroupService.");
}
this.started = true;
LOG.info("Start the RaftGroupService successfully.");
return this.node;
}

其中创建并初始化 JRaft 节点的过程实际上是委托给了 Node#init 方法执行。Node 接口用于描绘一个 JRaft 节点,是整个 JRaft 算法库中最核心的类,本系列文章对于 JRaft 设计和实现的分析基本上都围绕着 Node 接口展开。我们将在本文的后面小节对 JRaft 节点的初始化过程进行展开分析。

整体架构设计

作为本系列文章的第一篇,在开始剖析源码之前,我打算用一小节的篇幅对 JRaft 的整体架构设计进行一个简单的介绍,这样能够让读者对 JRaft 的实现有一个整体层面的认知。

image

上图描绘了 JRaft 的整体架构设计,可以大致将 JRaft 的实现分为以下几个模块:

  • 数据存储模块 :包括日志数据存储、元数据存储,以及快照数据存储。
  • 日志复制模块 :JRaft 针对每个 Follower 节点都会为其创建并绑定一个复制器 Replicator 实例,Replicator 主要负责向对应的 Follower 节点复制数据、安装快照,以及维持心跳。ReplicatorGroup 负责管理一个 group 维度下的所有复制器 Replicator 实例。
  • 周期性任务计时器 :计时器在整个 Raft 协议的选主过程中起着至关重要的作用,JRaft 默认借鉴 Netty 的单层时间轮算法实现了一个高效的计时器,并应用到预选举、正式选举、周期性生成快照,以及 Leader 节点角色降级等多个计时场景。
  • 选票箱模块 :Raft 协议的共识决策采用多数投票机制,所以选票和选票箱是对这一机制的直观实现。
  • 集群配置管理模块 :Raft 作为一个服务于分布式应用的协议,免不了会遇到节点的上下线、网络分区等问题,所以需要对集群中的节点进行管理,以保证整个协议的正常运行。
  • 状态机调度模块 :前面我们已经对状态机 StateMachine 进行了一个简单的介绍,而状态机调度器 FSMCaller 相当于在 JRaft 集群与业务 StateMachine 实现之间建立了一座桥梁,用于调度业务 StateMachine 的运行。
  • CLI 模块 :CLI 即 Client CommandLine Service,是在 JRaft 节点提供的 RPC 服务中暴露的一系列用于管理 JRaft 集群的服务接口,例如增删节点、变更节点配置、重置节点配置,以及转移 Leader 节点等功能。

作为基于 JRaft 算法库实现的应用程序,我们通常可以基于 CLI 服务管理 JRaft 集群,实现 StateMachine 接口以感知 JRaft 集群的运行状态,以及调用 Node#apply 方法向 JRaft 集群提交指令。这些指令在被 JRaft 成功复制到过半数的节点上之后,最终会通过调用 StateMachine#onApply 方法透传给业务,由业务负责处理这些指令。

节点初始化

在对 JRaft 的整体架构设计有一个基本的认知之后,本文的最后我们来分析一下 JRaft 节点的初始化启动过程。前面我们通过 Leader 选举的例子演示了 JRaft 的基本使用,JRaft 的使用可以简单概括为以下四个步骤:

  1. 实现 StateMachine 接口,并创建状态机实例;
  2. 初始化 NodeOptions 配置对象,用于设置节点的运行参数;
  3. 基于 NodeOptions 配置对象创建并初始化 Node 实例;
  4. 启动节点,以 RPC 服务的方式运行。

其中步骤 1 和 2 都比较简单,通过前面的示例可以一目了然,本小节我们重点分析一下步骤 3 和 4,看看 JRaft 是如何初始化和启动一个 JRaft 参与节点的。我们从 Node#init 方法切入,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
public boolean init(final NodeOptions opts) {
Requires.requireNonNull(opts, "Null node options");
Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
this.serviceFactory = opts.getServiceFactory();
this.options = opts;
this.raftOptions = opts.getRaftOptions();
this.metrics = new NodeMetrics(opts.isEnableMetrics());
this.serverId.setPriority(opts.getElectionPriority());
this.electionTimeoutCounter = 0;

// 节点 IP 不允许设置为 0.0.0.0
if (this.serverId.getIp().equals(Utils.IP_ANY)) {
LOG.error("Node can't started from IP_ANY.");
return false;
}

// 正常在初始化 Node 之前需要调用 NodeManager#addAddress 方法记录当前节点地址
if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) {
LOG.error("No RPC server attached to, did you forget to call addService?");
return false;
}

// 创建并初始化延时任务调度器 TimerManager,负责 JRaft 内部的延时任务调度
this.timerManager = TIMER_FACTORY.getRaftScheduler(
this.options.isSharedTimerPool(),
this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");

// Init timers
final String suffix = getNodeId().toString();

// 创建正式选举计时器(周期: 1s ~ 2s)
String name = "JRaft-VoteTimer-" + suffix;
this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getVoteTimer(this.options.isSharedVoteTimer(), name)) {

@Override
protected void onTrigger() {
handleVoteTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};

// 创建预选举计时器(周期:1s ~ 2s)
name = "JRaft-ElectionTimer-" + suffix;
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {

@Override
protected void onTrigger() {
handleElectionTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};

// 创建角色降级计时器(周期:0.5s)
name = "JRaft-StepDownTimer-" + suffix;
this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1,
TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) {

@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};

// 创建快照周期性生成计时器(周期:1h)
name = "JRaft-SnapshotTimer-" + suffix;
this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000,
TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) {

private volatile boolean firstSchedule = true;

@Override
protected void onTrigger() {
handleSnapshotTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
if (!this.firstSchedule) {
return timeoutMs;
}

// Randomize the first snapshot trigger timeout
this.firstSchedule = false;
if (timeoutMs > 0) {
int half = timeoutMs / 2;
return half + ThreadLocalRandom.current().nextInt(half);
} else {
return timeoutMs;
}
}
};

// 创建集群节点配置管理器
this.configManager = new ConfigurationManager();

// 初始化 Task 处理相关的 disruptor 队列,用于异步处理业务调用 Node#apply 方法向集群提交的 Task 列表
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setEventFactory(new LogEntryAndClosureFactory()) //
.setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.applyQueue = this.applyDisruptor.start();
if (this.metrics.getMetricRegistry() != null) {
this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor", new DisruptorMetricSet(this.applyQueue));
}

// 创建状态机调度器
this.fsmCaller = new FSMCallerImpl();

// 初始化日志数据存储模块
if (!initLogStorage()) {
LOG.error("Node {} initLogStorage failed.", getNodeId());
return false;
}

// 初始化元数据存储模块
if (!initMetaStorage()) {
LOG.error("Node {} initMetaStorage failed.", getNodeId());
return false;
}

// 初始化状态机调度器
if (!initFSMCaller(new LogId(0, 0))) {
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}

// 创建并初始化选票箱 BallotBox,每个节点绑定一个选票箱
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
// closureQueue 在初始化 FSMCaller 时创建,相互共用
ballotBoxOpts.setClosureQueue(this.closureQueue);
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}

// 初始化快照数据存储模块
if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
return false;
}

// 对日志数据进行一致性校验
final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
return false;
}

// 初始化集群节点配置,优先从日志中恢复
this.conf = new ConfigurationEntry();
this.conf.setId(new LogId());
// if have log using conf in log, else using conf in options
if (this.logManager.getLastLogIndex() > 0) {
checkAndSetConfiguration(false);
} else {
this.conf.setConf(this.options.getInitialConf());
// 以初始节点中的最大优先级初始化 targetPriority,用于控制当前节点是否继续发起预选举
this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers());
}

// 如果初始集群列表不为空,则需要校验其有效性,即 peers 不为空,且不能和 learners 有交集
if (!this.conf.isEmpty()) {
Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
} else {
LOG.info("Init node {} with empty conf.", this.serverId);
}

// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it
// 创建复制器 Replicator 管理组
this.replicatorGroup = new ReplicatorGroupImpl();
// 创建 RPC 客户端
this.rpcService = new DefaultRaftClientService(this.replicatorGroup);
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(this);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);

// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());

// 初始化 RPC 客户端
if (!this.rpcService.init(this.options)) {
LOG.error("Fail to init rpc service.");
return false;
}
// 初始化复制器管理组
this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);

// 创建并初始化只读服务,用于支持线性一致性读
this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);
if (!this.readOnlyService.init(rosOpts)) {
LOG.error("Fail to init readOnlyService.");
return false;
}

// 切换节点角色为 FOLLOWER
this.state = State.STATE_FOLLOWER;

if (LOG.isInfoEnabled()) {
LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.",
getNodeId(), this.currTerm, this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());
}

// 如果启用了快照生成机制,则启动周期性快照生成任务
if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
this.snapshotTimer.start();
}

// 尝试角色降级,主要用于初始化本地状态,并启动预选举计时器
if (!this.conf.isEmpty()) {
stepDown(this.currTerm, false, new Status());
}

if (!NodeManager.getInstance().add(this)) {
LOG.error("NodeManager add {} failed.", getNodeId());
return false;
}

// Now the raft node is started , have to acquire the writeLock to avoid race conditions
this.writeLock.lock();
// 如果当前集群只有自己一个节点,则尝试选举自己为主节点
if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
// The group contains only this server which must be the LEADER, trigger the timer immediately.
electSelf();
} else {
this.writeLock.unlock();
}

return true;
}

整个 JRaft 节点的初始化过程执行了大量的工作,整体可以概括为以下几个方面:

  • 创建并初始化延时任务调度器 TimerManager,主要用于处理内部的延时任务(与周期性任务相区分)。
  • 创建计时器,用于执行周期性任务,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及快照周期性生成计时器(snapshotTimer)。
  • 创建集群节点配置管理器 ConfigurationManager,并初始化集群节点配置信息。
  • 初始化 Task 处理相关的 disruptor 队列,用于异步处理业务调用 Node#apply 方法向集群提交的 Task 列表。
  • 初始化日志数据存储模块,并对日志数据执行一致性校验。
  • 初始化元数据存储模块。
  • 初始化快照数据存储模块。
  • 创建并初始化状态机调度器 FSMCaller。
  • 创建并初始化选票箱 BallotBox。
  • 创建并初始化复制器管理组 ReplicatorGroup。
  • 创建并初始化 RPC 客户端 RaftClientService。
  • 创建并初始化只读服务 ReadOnlyService,用于支持线性一致性读。
  • 如果启用了快照生成机制,则启动周期性快照生成任务。
  • 如果初始集群节点不为空,则尝试执行角色降级(stepdown),以对本地状态进行初始化,并启动预选举计时器。
  • 如果集群只有当前这一个节点,则尝试选举自己为 Leader。

下面挑选几个稍微复杂一点的展开说明。

周期任务调度器

Raft 协议的运行依赖于超时机制,所以在实现层面需要提供对应的计时器,用于调度周期性任务。上面在初始化 JRaft 节点期间构造了一系列的计时器,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及周期性快照生成计时器(snapshotTimer)。本小节将分析这些计时器背后的实现,即 RepeatedTimer 类。

首先,我们先来体验一下 RepeatedTimer 的使用方式,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static class TestRepeatedTimer extends RepeatedTimer {

public TestRepeatedTimer(String name, int timeoutMs) {
super(name, timeoutMs);
}

@Override
protected void onTrigger() {
System.out.println("on trigger");
}

@Override
protected int adjustTimeout(int timeoutMs) {
// 随机化计时周期
return RandomUtils.nextInt(timeoutMs);
}

}

final TestRepeatedTimer timer = new TestRepeatedTimer("test", (int) TimeUnit.SECONDS.toMillis(1L));
timer.start();

上述示例中我们通过继承 RepeatedTimer 抽象类定义了一个测试用的 TestRepeatedTimer 实现类,该实现类会周期性的往控制台打印“on trigger”字符串。方法 RepeatedTimer#onTrigger 是 RepeatedTimer 中声明的唯一一个抽象方法,我们需要通过该方法实现自己的周期性业务逻辑。上述示例中,我们还覆盖实现了 RepeatedTimer#adjustTimeout 方法,以实现在运行期间对计时周期进行随机化调整。最后,通过调用 RepeatedTimer#start 方法,我们可以启动该计时器。

下面对 RepeatedTimer 的运行机制进行分析。RepeatedTimer 定义了如下构造方法:

1
2
3
4
5
6
7
8
9
10
11
public RepeatedTimer(final String name, final int timeoutMs) {
this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}

public RepeatedTimer(final String name, final int timeoutMs, final Timer timer) {
super();
this.name = name;
this.timeoutMs = timeoutMs;
this.stopped = true;
this.timer = Requires.requireNonNull(timer, "timer");
}

其中 Timer 是一个接口(定义如下),其功能是延迟指定时间执行提交的任务,即 TimerTask。

1
2
3
4
public interface Timer {
Timeout newTimeout(final TimerTask task, final long delay, final TimeUnit unit);
Set<Timeout> stop();
}

围绕 Timer 接口,JRaft 提供了 DefaultTimer 和 HashedWheelTimer 两个实现类,其中前者基于 JDK 内置的 ScheduledExecutorService 实现,后者则基于单层时间轮算法实现。相对而言,HashedWheelTimer 较 DefaultTimer 在性能和精度层面表现更优,所以 JRaft 将其作为默认 Timer 应用于 RepeatedTimer 中。

本小节重点关注 RepeatedTimer 的实现机制,关于 HashedWheelTimer 的设计和实现可以参考 Netty 相关的源码分析文章。接下来,我们从 RepeatedTimer#start 方法开始,该方法用于启动对应的计时器,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void start() {
this.lock.lock();
try {
// 计时器已经被销毁,不允许再被启动
if (this.destroyed) {
return;
}
// 计时器处于运行中,不需要再启动
if (!this.stopped) {
return;
}
this.stopped = false;
if (this.running) {
return;
}
// 标识计时器已经在运行
this.running = true;
// 调度
schedule();
} finally {
this.lock.unlock();
}
}

上述方法主要是设置一些本地状态标识,对于首次启动的计时器会调用 RepeatedTimer#schedule 方法开始调度执行周期性任务,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void schedule() {
// 正常来说,这里的 timeout 应该为 null,否则说明上一轮任务还未执行完毕,尝试取消运行
if (this.timeout != null) {
this.timeout.cancel();
}
// 创建一个新的任务
final TimerTask timerTask = timeout -> {
try {
RepeatedTimer.this.run();
} catch (final Throwable t) {
LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
}
};
// 提交给 Timer 延迟运行(仅运行一次),这里会调用 adjustTimeout 方法,用于调整计时周期
this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
}

public void run() {
this.invoking = true;
try {
// 调用业务逻辑
onTrigger();
} catch (final Throwable t) {
LOG.error("Run timer failed.", t);
}
boolean invokeDestroyed = false;
this.lock.lock();
try {
this.invoking = false;
// 计时器被停止
if (this.stopped) {
this.running = false;
invokeDestroyed = this.destroyed;
}
// 本次任务调度完成,重新发起调度下一轮任务
else {
this.timeout = null; // 注意这里的 timeout 被置为 null
schedule();
}
} finally {
this.lock.unlock();
}
// 在计时器被停止时回调 onDestroy 方法
if (invokeDestroyed) {
onDestroy();
}
}

具体运行逻辑如上述代码注释,当一轮定时任务执行完成时,如果计时器未被停止,则会调用 RepeatedTimer#schedule 方法提交下一轮任务,以此实现周期性任务调度。不同于常规计时器始终按照相同的时间间隔调度任务,RepeatedTimer 定义了一个 RepeatedTimer#adjustTimeout 方法,以支持在运行期间对调度间隔进行动态调整。

这一机制对于 Raft 协议而言尤为重要,在 Raft 集群节点运行期间可能存在两个 Follower 节点同时发起 Leader 选举进程的情况,如果这两个 Follower 节点正好都得到半数投票,则本轮选举失败,需要在下一轮调度周期再次发起 Leader 选举请求。如果计时器始终按照相同的时间间隔进行调度,则这两个节点将会在未来相同的时刻再次发起 Leader 选举请求,如果不幸再次均分投票,则又拉长了集群的无 Leader 节点窗口,而通过动态调整调度间隔这么一个简单的策略则能够很好的避免此类问题。

数据存储

JRaft 的数据存储层主要包含对三类数据的存储:日志数据、元数据,以及快照数据。其中日志数据存储的也就是前面提及到的 LogEntry 数据,包含系统内部运行产生的日志,以及业务向集群提交 Task 所生成的日志,日志数据默认采用 RocksDB 进行存储;元数据用于记录当前节点的 currentTerm 值,以及投票 votedFor 信息;快照数据是对日志数据存储的一种优化手段,用于将那些已经被应用的日志进行压缩存储,以节省磁盘空间占用,同时缩短新接入节点同步集群数据的时间。

日志数据存储

Raft 节点在初始化期间会调用 NodeImpl#initLogStorage 方法初始化日志数据存储模块,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private boolean initLogStorage() {
Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
// 实例化日志存储服务,基于 RocksDBLogStorage 实现类
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
// 创建并初始化日志管理器
this.logManager = new LogManagerImpl();
final LogManagerOptions opts = new LogManagerOptions();
// 设置 LogEntry 编解码器工厂,默认使用 LogEntryV2CodecFactory
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
// 设置日志存储服务
opts.setLogStorage(this.logStorage);
// 设置集群节点配置管理器
opts.setConfigurationManager(this.configManager);
// 设置状态机调度器
opts.setFsmCaller(this.fsmCaller);
opts.setNodeMetrics(this.metrics);
opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
opts.setRaftOptions(this.raftOptions);
// 初始化 LogManager
return this.logManager.init(opts);
}

整个方法的主要逻辑在于创建和初始化 LogManager 实例。LogManager 是一个接口,由名称可以推断出它是一个日志管理器,基于 LogStorage 提供了对于日志数据的读写功能,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface LogManager extends Lifecycle<LogManagerOptions>, Describer {

void addLastLogIndexListener(final LastLogIndexListener listener);
void removeLastLogIndexListener(final LastLogIndexListener listener);
void join() throws InterruptedException;
void appendEntries(final List<LogEntry> entries, StableClosure done);
void setSnapshot(final SnapshotMeta meta);
void clearBufferedLogs();
LogEntry getEntry(final long index);
long getTerm(final long index);
long getFirstLogIndex();
long getLastLogIndex();
long getLastLogIndex(final boolean isFlush);
LogId getLastLogId(final boolean isFlush);
ConfigurationEntry getConfiguration(final long index);
ConfigurationEntry checkAndSetConfiguration(final ConfigurationEntry current);
long wait(final long expectedLastLogIndex, final NewLogCallback cb, final Object arg);
boolean removeWaiter(final long id);
void setAppliedId(final LogId appliedId);
Status checkConsistency();

}

本文重点分析 JRaft 节点的初始化过程,所以不打算对 LogManager 接口中声明各个方法实现逐一展开分析,后续遇到对相应方法的调用时再结合上下文进行介绍。JRaft 针对 LogManager 接口提供了 LogManagerImpl 实现类,对应的 LogManager#init 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public boolean init(final LogManagerOptions opts) {
this.writeLock.lock();
try {
if (opts.getLogStorage() == null) {
LOG.error("Fail to init log manager, log storage is null");
return false;
}
this.raftOptions = opts.getRaftOptions();
this.nodeMetrics = opts.getNodeMetrics();
this.logStorage = opts.getLogStorage();
this.configManager = opts.getConfigurationManager();

LogStorageOptions lsOpts = new LogStorageOptions();
lsOpts.setConfigurationManager(this.configManager);
lsOpts.setLogEntryCodecFactory(opts.getLogEntryCodecFactory());

// 初始化日志存储服务
if (!this.logStorage.init(lsOpts)) {
LOG.error("Fail to init logStorage");
return false;
}

// 基于日志初始化本地 logIndex 和 term 值
this.firstLogIndex = this.logStorage.getFirstLogIndex();
this.lastLogIndex = this.logStorage.getLastLogIndex();
this.diskId = new LogId(this.lastLogIndex, getTermFromLogStorage(this.lastLogIndex));
this.fsmCaller = opts.getFsmCaller();
// 创建对应的 Disruptor 队列,用于异步处理日志操作相关的事件
this.disruptor = DisruptorBuilder.<StableClosureEvent>newInstance() //
.setEventFactory(new StableClosureEventFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-LogManager-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
// Use timeout strategy in log manager. If timeout happens, it will called reportError to halt the node.
.setWaitStrategy(new TimeoutBlockingWaitStrategy(
this.raftOptions.getDisruptorPublishEventWaitTimeoutSecs(), TimeUnit.SECONDS)) //
.build();
this.disruptor.handleEventsWith(new StableClosureEventHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName(),
(event, ex) -> reportError(-1, "LogManager handle event error")));
this.diskQueue = this.disruptor.start();
// ... metrics
} finally {
this.writeLock.unlock();
}
return true;
}

整个 LogManager 的初始化过程除了对本地变量进行赋值外,主要做了两件事情:

  1. 初始化日志存储服务 LogStorage 实例。
  2. 创建并启动一个 Disruptor 队列,用于异步处理日志操作相关的事件。

LogStorage 接口定义了与 LogEntry 存储相关的 API,包括读写、截断,以及获取 logIndex 和 term 等。JRaft 默认基于 RocksDB 存储引擎对 LogEntry 提供本地存储和读写,相应的实现类包括 RocksDBLogStorage 和 RocksDBSegmentLogStorage。

本文同样重点关注针对 LogStorage 的初始化过程,由 RocksDBLogStorage#init 方法实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public boolean init(final LogStorageOptions opts) {
Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager");
Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory");
this.writeLock.lock();
try {
// 已经初始化过,避免重复初始化
if (this.db != null) {
LOG.warn("RocksDBLogStorage init() already.");
return true;
}
// LogEntry 解码器,默认使用 AutoDetectDecoder,支持 V1 和 V2 版本
this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
// LogEntry 编码器,默认使用 V2Encoder
this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
Requires.requireNonNull(this.logEntryDecoder, "Null log entry decoder");
Requires.requireNonNull(this.logEntryEncoder, "Null log entry encoder");
this.dbOptions = createDBOptions();
if (this.openStatistics) {
this.statistics = new DebugStatistics();
this.dbOptions.setStatistics(this.statistics);
}

// 设置 RocksDB WriteOptions
this.writeOptions = new WriteOptions();
this.writeOptions.setSync(this.sync);
// 设置 RocksDB ReadOptions
this.totalOrderReadOptions = new ReadOptions();
this.totalOrderReadOptions.setTotalOrderSeek(true);

// 打开本地存储引擎 RocksDB,并从本地 conf 日志中恢复集群节点配置和 firstLogIndex 数据
return initAndLoad(opts.getConfigurationManager());
} catch (final RocksDBException e) {
LOG.error("Fail to init RocksDBLogStorage, path={}.", this.path, e);
return false;
} finally {
this.writeLock.unlock();
}

}

private boolean initAndLoad(final ConfigurationManager confManager) throws RocksDBException {
this.hasLoadFirstLogIndex = false;
this.firstLogIndex = 1;
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
// 设置 RocksDB ColumnFamilyOptions
final ColumnFamilyOptions cfOption = createColumnFamilyOptions();
this.cfOptions.add(cfOption);
// Column family to store configuration log entry.
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("Configuration".getBytes(), cfOption));
// Default column family to store user data log entry.
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOption));

// 打开 RocksDB,并初始化对应的 ColumnFamily
openDB(columnFamilyDescriptors);
// 从 conf 中加载集群节点配置,以及 firstLogIndex 值,并从本地剔除 firstLogIndex 之前的 conf 和 data 数据
load(confManager);
// 模板方法
return onInitLoaded();
}

JRaft 在 RocksDB 中定义了两个 ColumnFamily,除了默认的 ColumnFamily 外,还定义了一个名为 Configuration 的 ColumnFamily 用于存储集群节点配置相关的 LogEntry 实例,而默认的 ColumnFamily 除了包含 Configuration 中的数据之外,还用于存储用户数据相关的 LogEntry 实例。本文如不做特殊说明,均使用 conf family 指代前者,使用 data family 指代后者。

上述方法中我们重点看一下对于 RocksDBLogStorage#load 方法的调用,该方法会从头遍历 conf family 中的数据,以从中加载之前集群节点的配置信息和 firstLogIndex 值。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void load(final ConfigurationManager confManager) {
checkState();
// 按顺序从头开始遍历处理 RocksDB Conf ColumnFamily 中的数据
try (final RocksIterator it = this.db.newIterator(this.confHandle, this.totalOrderReadOptions)) {
it.seekToFirst();
while (it.isValid()) {
final byte[] ks = it.key();
final byte[] bs = it.value();

// key 的长度为 8,说明是一个 LogEntry 数据,LogEntry 数据的 key 是一个 long 型的 logIndex
if (ks.length == 8) {
// 基于解码器解码
final LogEntry entry = this.logEntryDecoder.decode(bs);
if (entry != null) {
// 仅处理 ENTRY_TYPE_CONFIGURATION 类型的 LogEntry
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
// 基于日志数据设置集群节点配置
final ConfigurationEntry confEntry = new ConfigurationEntry();
confEntry.setId(new LogId(entry.getId().getIndex(), entry.getId().getTerm()));
confEntry.setConf(new Configuration(entry.getPeers(), entry.getLearners()));
if (entry.getOldPeers() != null) {
confEntry.setOldConf(new Configuration(entry.getOldPeers(), entry.getOldLearners()));
}
if (confManager != null) {
confManager.add(confEntry);
}
}
} else {
LOG.warn("Fail to decode conf entry at index {}, the log data is: {}.", Bits.getLong(ks, 0), BytesUtil.toHex(bs));
}
}
// 不是 LogEntry,目前只能是 meta/firstLogIndex,用于记录 firstLogIndex 值
else {
if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) {
// 初始化 firstLogIndex
setFirstLogIndex(Bits.getLong(bs, 0));
// 剔除 [0, firstLogIndex) 之间的 conf 和 data 数据
truncatePrefixInBackground(0L, this.firstLogIndex);
} else {
LOG.warn("Unknown entry in configuration storage key={}, value={}.", BytesUtil.toHex(ks), BytesUtil.toHex(bs));
}
}
it.next();
}
}
}

具体执行过程如上述代码注释。JRaft 在从本地读取到 firstLogIndex 值之后,会启动一个后台线程,用于对本地记录的位于 firstLogIndex 之前的 LogEntry 进行剔除,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void truncatePrefixInBackground(final long startIndex, final long firstIndexKept) {
// delete logs in background.
Utils.runInThread(() -> {
this.readLock.lock();
try {
if (this.db == null) {
return;
}
// 模板方法
onTruncatePrefix(startIndex, firstIndexKept);
// 剔除 [startIndex, firstIndexKept) 之间的 conf 和 data 数据
this.db.deleteRange(this.defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
this.db.deleteRange(this.confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
} catch (final RocksDBException | IOException e) {
LOG.error("Fail to truncatePrefix {}.", firstIndexKept, e);
} finally {
this.readLock.unlock();
}
});
}

上述方法在启动初始化期间会将 [0, firstLogIndex) 之间的 LogEntry 从本地 RocksDB 中剔除。除此之外,方法 LogStorage#truncatePrefix 在执行时也是委托上述方法完成对从 firstLogIndex 到指定 logIndex 之间的日志数据进行剔除操作。

LogManager 在初始化期间还会创建并启动一个 Disruptor 队列,用于异步处理日志操作相关的事件,包括获取最新的 LogId、日志截断、重置日志数据存储服务,以及关闭日志管理器等。

方法 LogManagerImpl#offerEvent 定义了往该 Disruptor 消息队列发送消息的逻辑,而具体处理消息的逻辑则有 StableClosureEventHandler 类实现。StableClosureEventHandler 类实现自 EventHandler 接口,对应的 StableClosureEventHandler#onEvent 方法依据事件类型对消息实施分别处理,具体实现后续结合应用场景进行深入分析,这里不再展开。

JRaft 节点在初始化期间还会调用 LogManager#checkConsistency 方法对日志数据进行一致性校验,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Status checkConsistency() {
this.readLock.lock();
try {
Requires.requireTrue(this.firstLogIndex > 0);
Requires.requireTrue(this.lastLogIndex >= 0);
// 未生成过快照,所以 firstLogIndex 应该是 1
if (this.lastSnapshotId.equals(new LogId(0, 0))) {
if (this.firstLogIndex == 1) {
return Status.OK();
}
return new Status(RaftError.EIO, "Missing logs in (0, %d)", this.firstLogIndex);
}
// 生成过快照,则需要保证快照与当前数据的连续性
else {
if (this.lastSnapshotId.getIndex() >= this.firstLogIndex - 1
&& this.lastSnapshotId.getIndex() <= this.lastLogIndex) {
return Status.OK();
}
return new Status(RaftError.EIO, "There's a gap between snapshot={%d, %d} and log=[%d, %d] ",
this.lastSnapshotId.toString(), this.lastSnapshotId.getTerm(), this.firstLogIndex, this.lastLogIndex);
}
} finally {
this.readLock.unlock();
}
}

校验的逻辑主要是确保快照数据与当前数据的连续性,不允许存在数据断层。

元数据存储

JRaft 节点在初始化期间会调用 NodeImpl#initMetaStorage 方法初始化元数据存储模块,这里的元数据包括 currentTerm 值和当前节点的 votedFor 信息。该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean initMetaStorage() {
// 实例化元数据存储服务,基于 LocalRaftMetaStorage 实现类
this.metaStorage = this.serviceFactory
.createRaftMetaStorage(this.options.getRaftMetaUri(), this.raftOptions);
RaftMetaStorageOptions opts = new RaftMetaStorageOptions();
opts.setNode(this);
// 初始化元数据存储服务
if (!this.metaStorage.init(opts)) {
LOG.error("Node {} init meta storage failed, uri={}.", this.serverId, this.options.getRaftMetaUri());
return false;
}
// 基于本地元数据恢复 currentTerm 和 votedFor 属性值
this.currTerm = this.metaStorage.getTerm();
this.votedId = this.metaStorage.getVotedFor().copy();
return true;
}

JRaft 定义了 RaftMetaStorage 接口用于抽象元数据存储服务,该接口的定义如下:

1
2
3
4
5
6
7
8
9
public interface RaftMetaStorage extends Lifecycle<RaftMetaStorageOptions>, Storage {

boolean setTerm(final long term);
long getTerm();
boolean setVotedFor(final PeerId peerId);
PeerId getVotedFor();
boolean setTermAndVotedFor(final long term, final PeerId peerId);

}

针对该接口,JRaft 提供了 LocalRaftMetaStorage 实现类,基于本地文件系统采用 protobuf 协议对元数据执行序列化之后进行存储。

LocalRaftMetaStorage 在初始化时(即执行 LocalRaftMetaStorage#init 方法期间)会从本地文件系统加载并反序列化元数据,以初始化 currentTerm 和 votedFor 属性值。运行期间对于这两个属性值的更改全部记录在内存中,并在关闭时(即执行 LocalRaftMetaStorage#shutdown 方法期间)将内存中的数据序列化后落盘。

快照数据存储

JRaft 节点在初始化期间会调用 NodeImpl#initSnapshotStorage 方法初始化快照数据存储。与日志数据存储模块的设计相类似,JRaft 针对快照数据存储模块同样采用了操作与存储相分离的策略,其中 SnapshotExecutor 主要负责生成和安装快照,而 SnapshotStorage 则主要负责针对快照文件的读写,以及从远端 Leader 节点拷贝快照数据。

方法 NodeImpl#initSnapshotStorage 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private boolean initSnapshotStorage() {
// 未设置 snapshotUri,说明不希望启动快照模块
if (StringUtils.isEmpty(this.options.getSnapshotUri())) {
LOG.warn("Do not set snapshot uri, ignore initSnapshotStorage.");
return true;
}
// 实例化快照执行器,用于处理快照相关操作
this.snapshotExecutor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
opts.setUri(this.options.getSnapshotUri());
opts.setFsmCaller(this.fsmCaller);
opts.setNode(this);
opts.setLogManager(this.logManager);
opts.setAddr(this.serverId != null ? this.serverId.getEndpoint() : null);
opts.setInitTerm(this.currTerm);
opts.setFilterBeforeCopyRemote(this.options.isFilterBeforeCopyRemote());
// get snapshot throttle
opts.setSnapshotThrottle(this.options.getSnapshotThrottle());
// 初始化快照执行器
return this.snapshotExecutor.init(opts);
}

上述实现主要是用来创建和初始化 SnapshotExecutor,同时我们也可以看到快照机制对于 Raft 协议而言并不是必须的。如果一个应用并不会让 Raft 协议的运行产生大量的日志文件,或者对应的日志无法被压缩,则无需启动快照机制。JRaft 在初始化快照存储模块时会检查应用是否设置了 snapshotUri 参数,如果未设置则表明业务不希望启动快照机制。

SnapshotExecutor 的初始化过程主要是从本地加载最新的快照文件数据,对应的 SnapshotExecutorImpl#init 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public boolean init(final SnapshotExecutorOptions opts) {
if (StringUtils.isBlank(opts.getUri())) {
LOG.error("Snapshot uri is empty.");
return false;
}
this.logManager = opts.getLogManager();
this.fsmCaller = opts.getFsmCaller();
this.node = opts.getNode();
this.term = opts.getInitTerm();
// 创建快照存储服务,基于 LocalSnapshotStorage 实现类
this.snapshotStorage = this.node.getServiceFactory()
.createSnapshotStorage(opts.getUri(), this.node.getRaftOptions());
if (opts.isFilterBeforeCopyRemote()) {
this.snapshotStorage.setFilterBeforeCopyRemote();
}
if (opts.getSnapshotThrottle() != null) {
this.snapshotStorage.setSnapshotThrottle(opts.getSnapshotThrottle());
}
// 初始化快照存储服务,主要工作是从本地删除除最后一次快照所生成的快照文件之外的其它快照数据文件
if (!this.snapshotStorage.init(null)) {
LOG.error("Fail to init snapshot storage.");
return false;
}
final LocalSnapshotStorage tmp = (LocalSnapshotStorage) this.snapshotStorage;
if (tmp != null && !tmp.hasServerAddr()) {
tmp.setServerAddr(opts.getAddr());
}
// 打开快照文件读取器
final SnapshotReader reader = this.snapshotStorage.open();
if (reader == null) {
return true;
}
// 加载快照元数据信息
this.loadingSnapshotMeta = reader.load();
if (this.loadingSnapshotMeta == null) {
LOG.error("Fail to load meta from {}.", opts.getUri());
Utils.closeQuietly(reader);
return false;
}
LOG.info("Loading snapshot, meta={}.", this.loadingSnapshotMeta);
this.loadingSnapshot = true;
this.runningJobs.incrementAndGet();
final FirstSnapshotLoadDone done = new FirstSnapshotLoadDone(reader);
// 加载最近一次的快照数据
Requires.requireTrue(this.fsmCaller.onSnapshotLoad(done));
try {
done.waitForRun();
} catch (final InterruptedException e) {
LOG.warn("Wait for FirstSnapshotLoadDone run is interrupted.");
Thread.currentThread().interrupt();
return false;
} finally {
Utils.closeQuietly(reader);
}
if (!done.status.isOk()) {
LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", opts.getUri(), done.status);
return false;
}
return true;
}

关于加载快照数据的执行过程(即 FSMCaller#onSnapshotLoad 方法的实现)我们将在后面介绍 JRaft 快照机制的文章中针对性的介绍,这里暂且跳过。

状态机调度器

前面我们曾简单介绍过 StateMachine 接口,JRaft 通过该接口抽象描述了 Raft 协议中引入的状态机。这也是 JRaft 向业务透传自己运行状态的核心接口,业务可以通过该接口捕获 JRaft 的运行事件。除了最核心的应用 LogEntry 中的指令外,还包括当前节点作为 LEADER 或 FOLLOWER 角色的启停事件、集群节点配置变更、快照加载与存储,以及集群运行错误与停机等。

StateMachine 接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface StateMachine {

void onApply(final Iterator iter);
void onShutdown();
void onSnapshotSave(final SnapshotWriter writer, final Closure done);
boolean onSnapshotLoad(final SnapshotReader reader);
void onLeaderStart(final long term);
void onLeaderStop(final Status status);
void onError(final RaftException e);
void onConfigurationCommitted(final Configuration conf);
void onStopFollowing(final LeaderChangeContext ctx);
void onStartFollowing(final LeaderChangeContext ctx);

}

那么 JRaft 是如何将这些事件通知到业务的呢?具体点来说,通知到业务实现的状态机的呢?这就是状态机调度器 FSMCaller 所做的工作。该接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface FSMCaller extends Lifecycle<FSMCallerOptions>, Describer {

void addLastAppliedLogIndexListener(final LastAppliedLogIndexListener listener);
boolean onCommitted(final long committedIndex);
boolean onSnapshotLoad(final LoadSnapshotClosure done);
boolean onSnapshotSave(final SaveSnapshotClosure done);
boolean onLeaderStop(final Status status);
boolean onLeaderStart(final long term);
boolean onStartFollowing(final LeaderChangeContext ctx);
boolean onStopFollowing(final LeaderChangeContext ctx);
boolean onError(final RaftException error);
long getLastAppliedIndex();
void join() throws InterruptedException;

}

从 StateMachine 和 FSMCaller 接口的定义上是不是可以看出有一种相互呼应的感觉呢。简而言之,JRaft 通过调用 FSMCaller 中声明的方法实现将内部运行状态透传给业务,而 FSMCaller 在本地则基于 Disruptor 消息队列以事件的形式缓存这些内部状态,并通过异步的方式回调 StateMachine 接口声明的相应方法,这就是 FSMCaller 整体的运行逻辑。

本小节重点介绍 FSMCaller 的初始化过程和整体执行流程,具体实现细节层面先不展开,留到后面结合具体场景再深入分析。

JRaft 节点在初始化期间会调用 NodeImpl#initFSMCaller 方法对 FSMCaller 进行初始化,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean initFSMCaller(final LogId bootstrapId) {
if (this.fsmCaller == null) {
LOG.error("Fail to init fsm caller, null instance, bootstrapId={}.", bootstrapId);
return false;
}
// 创建封装 Closure 的队列,基于 LinkedList 实现
this.closureQueue = new ClosureQueueImpl();
final FSMCallerOptions opts = new FSMCallerOptions();
opts.setAfterShutdown(status -> afterShutdown());
opts.setLogManager(this.logManager);
opts.setFsm(this.options.getFsm());
opts.setClosureQueue(this.closureQueue);
opts.setNode(this);
opts.setBootstrapId(bootstrapId);
opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
// 初始化状态机调度器
return this.fsmCaller.init(opts);
}

FSMCaller 在初始化期间(即执行 FSMCallerImpl#init 方法)除了完成一些属性的赋值工作外,主要是创建和启动了一个 Disruptor 队列,用于异步处理各种状态机事件:

1
2
3
4
5
6
7
8
9
10
this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
.setEventFactory(new ApplyTaskFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.disruptor.handleEventsWith(new ApplyTaskHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.taskQueue = this.disruptor.start();

方法 FSMCallerImpl#enqueueTask 用于往该 Disruptor 队列写入具体的状态机事件,而 FSMCallerImpl 之于 FSMCaller 接口中声明的方法在实现层面基本上都是简单的调用了该方法。这里以 FSMCallerImpl#onCommitted 方法为例,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean onCommitted(final long committedIndex) {
return enqueueTask((task, sequence) -> {
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
});
}

private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
// 尝试将事件发布到消息队列中
if (!this.taskQueue.tryPublishEvent(tpl)) {
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE,
new Status(RaftError.EBUSY, "FSMCaller is overload.")));
return false;
}
return true;
}

那么 FSMCaller 又是怎么处理这些消息队列中的事件的呢?熟悉 Disruptor 的同学应该都知道这个时候应该去看 FSMCaller 是如何实现 EventHandler 接口的。FSMCaller 针对 EventHandler 接口定义了 ApplyTaskHandler 实现类:

1
2
3
4
5
6
7
8
9
10
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;

@Override
public void onEvent(
final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}

ApplyTaskHandler 通过调用 FSMCallerImpl#runApplyTask 方法对 Disruptor 消息队列中缓存的状态机事件进行处理。该方法本质上是一个事件分发器,基于具体的状态机事件类型调用对应的 do* 方法实现对事件的处理操作。

方法 FSMCallerImpl#runApplyTask 的实现比较直观,不再展开,关于各个 do* 方法的实现将留到后续结合具体场景展开分析。

选票箱

投票机制是 Raft 协议运行的基础,JRaft 在实现上为每个节点都设置了一个选票箱 BallotBox 实例,用于对 LogEntry 是否提交进行仲裁。

JRaft 节点在初始化期间会创建并初始化自己的选票箱,具体过程比较简单,实现如下:

1
2
3
4
5
6
7
8
9
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
// closureQueue 在初始化 FSMCaller 时创建,相互共用
ballotBoxOpts.setClosureQueue(this.closureQueue);
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}

这里需要注意的一点是,BallotBox 中持有的 ClosureQueue 实例是在前面介绍的 NodeImpl#initFSMCaller 中创建的,所以 FSMCaller 和 BallotBox 对象持有的 ClosureQueue 实例是同一个。BallotBox 负责往 ClosureQueue 中写数据,而 FSMCaller 则负责从 ClosureQueue 中读数据。

总结

本文我们通过一个 Leader 选举的示例介绍了 JRaft 算法库的基本使用,并对 JRaft 的整体架构设计和节点的初始化过程进行了分析。总的来说,JRaft 在模块划分上还是比较清晰的,不过也有值得吐槽的一点,例如 Node 类的实现太重,是否可以通过类似状态模式一类的思想重构一下?

参考

  1. Raft Consensus Algorithm
  2. SOFA-JRaft 官网
  3. SOFAJRaft:生产级高性能 Java 实现
  4. SOFAJRaft:生产级 Raft 算法库存储模块剖析