SOFA-JRaft 源码解析:快照机制

上一篇我们介绍了 JRaft 关于日志复制机制的设计与实现,其中提到了快照机制本质上也是一种对日志数据复制的优化手段,本文我们就对 JRaft 关于快照机制的设计与实现展开分析。在开始之前我们先聊聊为什么需要引入快照机制,思考以下两个问题:

  1. 因为日志数据需要落盘存储,当日志数据量大到磁盘空间无法容纳时,除了扩容是否还有其它的优化手段?
  2. 当一个新的节点加入 Raft 集群时需要重放集群之前接收到的所有指令以追赶上集群的数据状态,这一过程往往比较耗时和消费带宽,如何进行优化?

对于一个生产级别的 Raft 算法库而言必须能够解决好上述问题,而 Raft 协议也为解决上述问题提供了思路,即快照机制。该机制通过定期为本地的数据状态生成对应的快照文件,并删除对应的日志文件,从而降低对于磁盘空间的容量消耗。当一个新的节点加入集群时,不同于从 Leader 节点复制集群在此之前的所有日志文件,基于快照机制该节点只需要从 Leader 节点下载安装最新的快照文件即可。由于快照文件是对某一时刻数据状态的备份,相对于原生日志数据而言在容量上要小很多,所以既降低了本地磁盘空间占用,也降低了新加入节点从 Leader 节点同步历史数据的时间和网络开销,很好的解决了上面抛出的两个问题。

本文将对快照文件的生成和安装过程展开分析。在 Raft 集群中由于各个节点都需要在本地对日志文件进行存储,所以都有生成快照文件的诉求,而安装快照文件则面向于新加入集群的节点,这些节点需要通过从 Leader 节点下载安装快照文件以快速追赶上集群的数据状态。

生成快照

快照机制对于 JRaft 算法库而言是一个可选的功能,如果在启动 JRaft 节点时指定了快照路径 snapshotUri,则表明业务希望启用快照机制。JRaft 节点会在初始化期间(即执行 Node#init 方法)启动快照计时器 snapshotTimer,用于周期性生成快照(默认周期为 1 小时)。该计时器的具体执行逻辑由 NodeImpl#handleSnapshotTimeout 方法实现,该方法会判断当前节点是否处于活跃状态,如果是则会异步调用 NodeImpl#doSnapshot 方法执行生成快照的操作,实现如下:

1
2
3
4
5
6
7
8
9
10
11
private void doSnapshot(final Closure done) {
if (this.snapshotExecutor != null) {
// 调用 SnapshotExecutor 生成快照
this.snapshotExecutor.doSnapshot(done);
} else {
if (done != null) {
final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported");
Utils.runClosureInThread(done, status);
}
}
}

该方法除了被快照计时器定时触发执行外,还可以被 Cli 服务手动触发执行。由上述实现可以看到,该方法只是简单的将请求委托给快照执行器 SnapshotExecutor 执行,方法 SnapshotExecutorImpl#doSnapshot 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public void doSnapshot(final Closure done) {
boolean doUnlock = true;
this.lock.lock();
try {
// SnapshotExecutor 已被停止
if (this.stopped) {
Utils.runClosureInThread(done, new Status(RaftError.EPERM, "Is stopped."));
return;
}

// 正在安装快照
if (this.downloadingSnapshot.get() != null) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot."));
return;
}

// 正在生成快照,不允许重复执行
if (this.savingSnapshot) {
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot."));
return;
}

// 状态机调度器最后应用的 LogEntry 已经被快照,说明没有新的数据可以被快照
if (this.fsmCaller.getLastAppliedIndex() == this.lastSnapshotIndex) {
// There might be false positive as the getLastAppliedIndex() is being updated.
// But it's fine since we will do next snapshot saving in a predictable time.
doUnlock = false;
this.lock.unlock();
this.logManager.clearBufferedLogs();
Utils.runClosureInThread(done);
return;
}

// 可以被快照的数据量小于阈值,暂不生成快照
final long distance = this.fsmCaller.getLastAppliedIndex() - this.lastSnapshotIndex;
if (distance < this.node.getOptions().getSnapshotLogIndexMargin()) {
// If state machine's lastAppliedIndex value minus lastSnapshotIndex value is
// less than snapshotLogIndexMargin value, then directly return.
if (this.node != null) {
LOG.debug("Node {} snapshotLogIndexMargin={}, distance={}, so ignore this time of snapshot by snapshotLogIndexMargin setting.",
this.node.getNodeId(), distance, this.node.getOptions().getSnapshotLogIndexMargin());
}
doUnlock = false;
this.lock.unlock();
Utils.runClosureInThread(done);
return;
}

// 创建并初始化快照写入器,默认使用 LocalSnapshotWriter 实现类
final SnapshotWriter writer = this.snapshotStorage.create();
if (writer == null) {
Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer."));
reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer.");
return;
}

// 标记当前正在安装快照
this.savingSnapshot = true;
// 创建一个回调,用于感知异步快照生成状态
final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null);
if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) {
// 往 Disruptor 队列投递事件失败
Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
return;
}
this.runningJobs.incrementAndGet();
} finally {
if (doUnlock) {
this.lock.unlock();
}
}

}

上述实现在开始生成快照之前会经过一系列的校验,如果校验通过则会创建并初始化快照写入器 SnapshotWriter 实例,并向状态机调度器发布一个 SNAPSHOT_SAVE 事件用于异步生成快照文件,同时会绑定一个 SaveSnapshotDone 回调以感知异步快照生成的状态。

方法 FSMCallerImpl#doSnapshotSave 实现了对于该事件的处理逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doSnapshotSave(final SaveSnapshotClosure done) {
Requires.requireNonNull(done, "SaveSnapshotClosure is null");
final long lastAppliedIndex = this.lastAppliedIndex.get();
// 构造快照元数据信息,封装当前被状态机应用的 LogEntry 的 logIndex 和 term 值,以及对应的集群节点配置信息
final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
.setLastIncludedIndex(lastAppliedIndex) //
.setLastIncludedTerm(this.lastAppliedTerm);
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
Utils.runClosureInThread(done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
for (final PeerId peer : confEntry.getConf()) {
metaBuilder.addPeers(peer.toString());
}
for (final PeerId peer : confEntry.getConf().getLearners()) {
metaBuilder.addLearners(peer.toString());
}
if (confEntry.getOldConf() != null) {
for (final PeerId peer : confEntry.getOldConf()) {
metaBuilder.addOldPeers(peer.toString());
}
for (final PeerId peer : confEntry.getOldConf().getLearners()) {
metaBuilder.addOldLearners(peer.toString());
}
}
// 记录快照元数据
final SnapshotWriter writer = done.start(metaBuilder.build());
if (writer == null) {
done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
return;
}
// 调用状态机 StateMachine#onSnapshotSave 方法生成快照
this.fsm.onSnapshotSave(writer, done);
}

上述方法会以当前已被状态机应用的最新 LogEntry 的 logIndex 和 term 值,以及当前集群的节点配置信息,构造快照元数据信息,并记录到 SaveSnapshotDone 回调中,最后调用状态机 StateMachine#onSnapshotSave 方法由业务负责生成快照数据。

这里我们以 CounterStateMachine 为例了解一下业务是如何生成快照文件的,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
final long currVal = this.value.get();
// 异步将数据落盘
Utils.runInThread(() -> {
final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
if (snapshot.save(currVal)) {
// 记录快照文件名,及其元数据信息
if (writer.addFile("data")) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
}
} else {
done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
}
});
}

生成快照由于涉及到文件 IO,所以相对而言是一个重量级的操作,JRaft 针对是否将相关逻辑实现为异步给到的建议为:

通常情况下,每次 onSnapshotSave 被调用都应该阻塞状态机(同步调用)以保证用户可以捕获当前状态机的状态,如果想通过异步 snapshot 来提升性能,那么需要用户状态机支持快照读,并先同步读快照,再异步保存快照数据。

如果生成快照成功,我们需要调用 SnapshotWriter#addFile 方法将快照文件名和对应的元数据信息记录到快照元数据信息表中。这么做的目的除了能够让 JRaft 识别该快照文件,业务也可以在后续安装快照文件时读取到快照的元数据信息。

下面继续来看一下完成生成快照文件之后的逻辑,即回调 SaveSnapshotDone#run 方法,该方法以异步的方式将请求委托给 SaveSnapshotDone#continueRun 方法执行,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
void continueRun(final Status st) {
// 更新已经被快照的 logIndex 和 term 状态值,更新 LogManager 状态
final int ret = onSnapshotSaveDone(st, this.meta, this.writer);
if (ret != 0 && st.isOk()) {
st.setError(ret, "node call onSnapshotSaveDone failed");
}
if (this.done != null) {
Utils.runClosureInThread(this.done, st);
}
}

// com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl#onSnapshotSaveDone
int onSnapshotSaveDone(final Status st, final SnapshotMeta meta, final SnapshotWriter writer) {
int ret;
this.lock.lock();
try {
ret = st.getCode();
// InstallSnapshot can break SaveSnapshot, check InstallSnapshot when SaveSnapshot
// because upstream Snapshot maybe newer than local Snapshot.
if (st.isOk()) {
// 已安装的快照相对于本次生成的快照数据要新
if (meta.getLastIncludedIndex() <= this.lastSnapshotIndex) {
ret = RaftError.ESTALE.getNumber();
if (this.node != null) {
LOG.warn("Node {} discards an stale snapshot lastIncludedIndex={}, lastSnapshotIndex={}.",
this.node.getNodeId(), meta.getLastIncludedIndex(), this.lastSnapshotIndex);
}
writer.setError(RaftError.ESTALE, "Installing snapshot is older than local snapshot");
}
}
} finally {
this.lock.unlock();
}

// 生成快照成功
if (ret == 0) {
// 记录快照元数据信息
if (!writer.saveMeta(meta)) {
LOG.warn("Fail to save snapshot {}.", writer.getPath());
ret = RaftError.EIO.getNumber();
}
}
// 生成快照失败
else {
if (writer.isOk()) {
writer.setError(ret, "Fail to do snapshot.");
}
}
// 关闭快照写入器
try {
writer.close();
} catch (final IOException e) {
LOG.error("Fail to close writer", e);
ret = RaftError.EIO.getNumber();
}
boolean doUnlock = true;
this.lock.lock();
try {
// 生成快照成功
if (ret == 0) {
// 更新最新快照对应的 logIndex 和 term 值
this.lastSnapshotIndex = meta.getLastIncludedIndex();
this.lastSnapshotTerm = meta.getLastIncludedTerm();
doUnlock = false;
this.lock.unlock();
// 更新 LogManager 状态,并将本地已快照的日志剔除
this.logManager.setSnapshot(meta); // should be out of lock
doUnlock = true;
this.lock.lock();
}
if (ret == RaftError.EIO.getNumber()) {
reportError(RaftError.EIO.getNumber(), "Fail to save snapshot.");
}
// 清除正在生成快照的标记
this.savingSnapshot = false;
this.runningJobs.countDown();
return ret;

} finally {
if (doUnlock) {
this.lock.unlock();
}
}
}

快照数据除了可以由本地生成,也可以是从 Leader 节点复制而来,如果从远端复制过来的快照数据相对于本地更新,则应该忽略本地生成快照文件的结果。SnapshotExecutor 定义了 SnapshotExecutorImpl#lastSnapshotIndexSnapshotExecutorImpl#lastSnapshotTerm 两个字段用于记录最近一次快照对应的 logIndex 和 term 值,所以当生成快照成功之后需要更新这两个状态值。此外,既然相应的数据已经被快照,则表示对应的原生日志文件可以从本地存储系统中删除,从而节省存储空间。这一过程由 LogManager#setSnapshot 方法实现,该方法会对本地的日志数据执行截断处理。

安装快照

通过上一篇对于日志复制机制的介绍可知,Leader 节点会给每个 Follower 或 Learner 节点创建一个复制器 Replicator 实例。实际上,Replicator 除了肩负向目标 Follower 或 Learner 节点复制日志数据外,还负责给目标节点安装快照文件。Replicator 定义了 Replicator#installSnapshot 方法,前面在分析日志复制实现时我们曾几次遇到过对于该方法的调用,对应的调用原因可以简单概括为:

Replicator 期望给目标 Follower 节点复制日志数据时发现对应 logIndex 的数据已经变为快照文件,所以需要先给目标 Follower 节点安装快照。

方法 Replicator#installSnapshot 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
void installSnapshot() {
// 正在给目标 Follower 节点安装快照,无需重复执行
if (this.state == State.Snapshot) {
LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
this.id.unlock();
return;
}
boolean doUnlock = true;
try {
Requires.requireTrue(this.reader == null,
"Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(), this.state);
// 创建并初始化快照读取器,具体实现为 LocalSnapshotReader 类
this.reader = this.options.getSnapshotStorage().open();
if (this.reader == null) {
final NodeImpl node = this.options.getNode();
final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to open snapshot"));
this.id.unlock();
doUnlock = false;
node.onError(error);
return;
}
// 生一个快照访问地址
final String uri = this.reader.generateURIForCopy();
if (uri == null) {
final NodeImpl node = this.options.getNode();
final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to generate uri for snapshot reader"));
releaseReader();
this.id.unlock();
doUnlock = false;
node.onError(error);
return;
}
// 加载快照元数据信息
final RaftOutter.SnapshotMeta meta = this.reader.load();
if (meta == null) {
final String snapshotPath = this.reader.getPath();
final NodeImpl node = this.options.getNode();
final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
error.setStatus(new Status(RaftError.EIO, "Fail to load meta from %s", snapshotPath));
releaseReader();
this.id.unlock();
doUnlock = false;
node.onError(error);
return;
}
// 构造安装快照请求
final InstallSnapshotRequest.Builder rb = InstallSnapshotRequest.newBuilder();
rb.setTerm(this.options.getTerm());
rb.setGroupId(this.options.getGroupId());
rb.setServerId(this.options.getServerId().toString());
rb.setPeerId(this.options.getPeerId().toString());
rb.setMeta(meta);
rb.setUri(uri);

this.statInfo.runningState = RunningState.INSTALLING_SNAPSHOT;
this.statInfo.lastLogIncluded = meta.getLastIncludedIndex();
this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();

final InstallSnapshotRequest request = rb.build();
// 标记当前运行状态为正在给目标节点安装快照
this.state = State.Snapshot;
// noinspection NonAtomicOperationOnVolatileField
this.installSnapshotCounter++;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int stateVersion = this.version;
// 递增请求序列
final int seq = getAndIncrementReqSeq();
// 向目标节点发送安装快照请求
final Future<Message> rpcFuture = this.rpcService.installSnapshot(
this.options.getPeerId().getEndpoint(),
request,
new RpcResponseClosureAdapter<InstallSnapshotResponse>() {

@Override
public void run(final Status status) {
onRpcReturned(Replicator.this.id, RequestType.Snapshot, status, request, getResponse(), seq, stateVersion, monotonicSendTimeMs);
}
});
// 标记当前请求为 in-flight
addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture);
} finally {
if (doUnlock) {
this.id.unlock();
}
}
}

上述方法的整体执行流程可以概括为:

  1. 如果当前正在给目标 Follower 或 Learner 节点安装快照文件,则直接返回;
  2. 否则,构造安装快照 InstallSnapshot RPC 请求对象,除了填充基本的状态数据外,其中还包含快照的远程访问地址,以及快照的元数据信息;
  3. 向目标节点发送安装快照的请求。

关于 Replicator 请求的 pipeline 机制已在上一篇介绍过,这里不再重复介绍。下面来看一下 Follower 节点对于 InstallSnapshot 请求的处理过程,由 NodeImpl#handleInstallSnapshot 方法实现。该方法首先会完成一些基本的状态校验(具体实现与处理 AppendEntries 请求基本相同,不再展开),如果校验通过则调用 SnapshotExecutor#installSnapshot 方法执行安装快照文件逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void installSnapshot(final InstallSnapshotRequest request,
final InstallSnapshotResponse.Builder response,
final RpcRequestClosure done) {
// 从请求中获取快照元数据信息
final SnapshotMeta meta = request.getMeta();
// 新建一个下载快照的任务
final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
// DON'T access request, response, and done after this point
// as the retry snapshot will replace this one.
// 尝试注册当前任务,可能存在有其它任务正在运行的情况
if (!registerDownloadingSnapshot(ds)) {
LOG.warn("Fail to register downloading snapshot.");
// This RPC will be responded by the previous session
return;
}
Requires.requireNonNull(this.curCopier, "curCopier");
try {
// 等待从 Leader 复制快照数据完成
this.curCopier.join();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Install snapshot copy job was canceled.");
return;
}

// 加载刚刚从 Leader 复制过来的快照数据
loadDownloadingSnapshot(ds, meta);
}

安装快照文件的执行过程整体可以分为三个步骤:

  1. 尝试注册一个下载快照数据的 DownloadingSnapshot 任务;
  2. 从 Leader 节点下载快照文件到本地,并阻塞等待文件下载完成;
  3. 从本地加载下载回来的快照文件。

下面首先来看 步骤 1 ,注册一个新的下载快照文件任务时需要考虑可能会与一个正在执行的下载任务相互冲突,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
boolean registerDownloadingSnapshot(final DownloadingSnapshot ds) {
DownloadingSnapshot saved = null;
boolean result = true;

this.lock.lock();
try {
// SnapshotExecutor 已被停止
if (this.stopped) {
LOG.warn("Register DownloadingSnapshot failed: node is stopped.");
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EHOSTDOWN, "Node is stopped."));
return false;
}
// 正在生成快照
if (this.savingSnapshot) {
LOG.warn("Register DownloadingSnapshot failed: is saving snapshot.");
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory().newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EBUSY, "Node is saving snapshot."));
return false;
}

ds.responseBuilder.setTerm(this.term);
// 安装快照请求中的 term 值与当前节点的 term 值不匹配
if (ds.request.getTerm() != this.term) {
LOG.warn("Register DownloadingSnapshot failed: term mismatch, expect {} but {}.", this.term, ds.request.getTerm());
ds.responseBuilder.setSuccess(false);
ds.done.sendResponse(ds.responseBuilder.build());
return false;
}
// 需要安装的快照数据已经被快照
if (ds.request.getMeta().getLastIncludedIndex() <= this.lastSnapshotIndex) {
LOG.warn(
"Register DownloadingSnapshot failed: snapshot is not newer, request lastIncludedIndex={}, lastSnapshotIndex={}.",
ds.request.getMeta().getLastIncludedIndex(), this.lastSnapshotIndex);
ds.responseBuilder.setSuccess(true);
ds.done.sendResponse(ds.responseBuilder.build());
return false;
}
final DownloadingSnapshot m = this.downloadingSnapshot.get();
// null 表示当前没有正在进行中的安装快照操作
if (m == null) {
this.downloadingSnapshot.set(ds);
Requires.requireTrue(this.curCopier == null, "Current copier is not null");
// 从指定的 URI 下载快照数据
this.curCopier = this.snapshotStorage.startToCopyFrom(ds.request.getUri(), newCopierOpts());
if (this.curCopier == null) {
this.downloadingSnapshot.set(null);
LOG.warn("Register DownloadingSnapshot failed: fail to copy file from {}.", ds.request.getUri());
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(),
RaftError.EINVAL, "Fail to copy from: %s", ds.request.getUri()));
return false;
}
this.runningJobs.incrementAndGet();
return true;
}

// A previous snapshot is under installing, check if this is the same snapshot and resume it,
// otherwise drop previous snapshot as this one is newer

// m 为正在安装快照的任务,ds 为当前任务

// 当前新注册的任务与正在执行的任务安装的是同一份快照数据
if (m.request.getMeta().getLastIncludedIndex() == ds.request.getMeta().getLastIncludedIndex()) {
// m is a retry
// Copy |*ds| to |*m| so that the former session would respond this RPC.
saved = m;
this.downloadingSnapshot.set(ds);
result = false;
}
// 正在执行的安装快照任务操作的数据更新,忽略当前任务
else if (m.request.getMeta().getLastIncludedIndex() > ds.request.getMeta().getLastIncludedIndex()) {
// |is| is older
LOG.warn("Register DownloadingSnapshot failed: is installing a newer one, lastIncludeIndex={}.",
m.request.getMeta().getLastIncludedIndex());
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL,
"A newer snapshot is under installing"));
return false;
}
// 当前安装快照任务操作的数据相对于正在执行的任务更新
else {
// 正在执行的任务已经进入了 loading 阶段
if (this.loadingSnapshot) {
LOG.warn("Register DownloadingSnapshot failed: is loading an older snapshot, lastIncludeIndex={}.",
m.request.getMeta().getLastIncludedIndex());
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(),
RaftError.EBUSY, "A former snapshot is under loading"));
return false;
}
Requires.requireNonNull(this.curCopier, "curCopier");
// 停止当前正在执行的任务
this.curCopier.cancel();
LOG.warn(
"Register DownloadingSnapshot failed: an older snapshot is under installing, cancel downloading, lastIncludeIndex={}.",
m.request.getMeta().getLastIncludedIndex());
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(),
RaftError.EBUSY, "A former snapshot is under installing, trying to cancel"));
return false;
}
} finally {
this.lock.unlock();
}
if (saved != null) {
// Respond replaced session
LOG.warn("Register DownloadingSnapshot failed: interrupted by retry installling request.");
saved.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(),
RaftError.EINTR, "Interrupted by the retry InstallSnapshotRequest"));
}
return result;
}

注册新的快照文件下载任务的整体执行流程可以概括为:

  1. 如果当前 SnapshotExecutor 已被停止,则放弃注册新的任务;
  2. 否则,如果当前正在生成快照文件,则放弃注册新的任务;
  3. 否则,校验安装快照请求中指定的 term 值是否与当前节点的 term 值相匹配,如果不匹配则说明请求来源节点已经不再是 LEADER 角色,放弃为本次安装快照请求注册新的任务;
  4. 否则,校验本次需要安装的快照数据是否已在本地被快照,如果是则放弃注册新的任务;
  5. 否则,尝试为本次安装快照请求注册新的下载快照文件任务,并开始下载快照文件。

其中最后一步可能会遇到当前已有一个正在安装快照的任务在执行的情况,需要决策是让该任务继续执行,还是中断该任务并注册新的任务,具体决策过程如下:

  1. 如果当前待注册的任务与正在执行的任务安装的是同一份快照数据,则让正在执行的任务先响应,并标记待注册的任务为正在执行;
  2. 否则,如果正在执行的任务安装的快照文件相对于待注册的任务更新,则放弃注册;
  3. 否则,说明待注册的任务需要安装的快照文件更新,如果正在执行的任务已经进入了第三阶段(即已经从远程下载快照文件完成,并开始加载这些快照数据),则放弃注册,否则需要取消正在执行的任务。

一旦完成了下载快照文件任务的注册过程,则进入 步骤 2 ,开始从 Leader 节点下载快照文件,并阻塞等待下载过程的完成,这一步由 SnapshotStorage#startToCopyFrom 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public SnapshotCopier startToCopyFrom(final String uri, final SnapshotCopierOptions opts) {
// 新建快照数据拷贝器,用于从 Leader 节点往本地拷贝快照数据
final LocalSnapshotCopier copier = new LocalSnapshotCopier();
copier.setStorage(this);
copier.setSnapshotThrottle(this.snapshotThrottle);
copier.setFilterBeforeCopyRemote(this.filterBeforeCopyRemote);
// 初始化
if (!copier.init(uri, opts)) {
LOG.error("Fail to init copier to {}.", uri);
return null;
}
// 开始异步拷贝数据
copier.start();
return copier;
}

下载快照文件的操作由拷贝器 LocalSnapshotCopier 完成,本质上是一个 RPC 交互的过程,这里不再展开。

完成了将快照文件从 Leader 节点下载到本地之后, 步骤 3 会尝试在本地加载并应用这些数据,这一步由 SnapshotExecutorImpl#loadDownloadingSnapshot 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void loadDownloadingSnapshot(final DownloadingSnapshot ds, final SnapshotMeta meta) {
SnapshotReader reader;
this.lock.lock();
try {
// 当前任务已经失效,说明有新的任务被注册
if (ds != this.downloadingSnapshot.get()) {
// It is interrupted and response by other request, just return
return;
}
Requires.requireNonNull(this.curCopier, "curCopier");
reader = this.curCopier.getReader();
// 从 leader 节点复制快照数据异常
if (!this.curCopier.isOk()) {
if (this.curCopier.getCode() == RaftError.EIO.getNumber()) {
reportError(this.curCopier.getCode(), this.curCopier.getErrorMsg());
}
Utils.closeQuietly(reader);
ds.done.run(this.curCopier);
Utils.closeQuietly(this.curCopier);
this.curCopier = null;
this.downloadingSnapshot.set(null);
this.runningJobs.countDown();
return;
}
Utils.closeQuietly(this.curCopier);
this.curCopier = null;
// 快照读取器状态异常
if (reader == null || !reader.isOk()) {
Utils.closeQuietly(reader);
this.downloadingSnapshot.set(null);
ds.done.sendResponse(RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(),
RaftError.EINTERNAL, "Fail to copy snapshot from %s", ds.request.getUri()));
this.runningJobs.countDown();
return;
}

// 标记正在加载快照数据
this.loadingSnapshot = true;
this.loadingSnapshotMeta = meta;
} finally {
this.lock.unlock();
}

// 创建一个回调,用于感知异步快照加载状态
final InstallSnapshotDone installSnapshotDone = new InstallSnapshotDone(reader);
if (!this.fsmCaller.onSnapshotLoad(installSnapshotDone)) {
// 往 Disruptor 队列投递事件失败
LOG.warn("Fail to call fsm onSnapshotLoad.");
installSnapshotDone.run(new Status(RaftError.EHOSTDOWN, "This raft node is down"));
}
}

前面在分析快照文件的生成过程时已知具体生成快照数据的过程由业务负责完成,而此处加载快照数据的过程同样也需要业务实现,因为这些快照数据都是执行业务指令所生成的特定时刻的数据状态备份。JRaft 同样会向状态机调度器 FSMCaller 发布一个 SNAPSHOT_LOAD 类型的事件,并由 FSMCallerImpl#doSnapshotLoad 方法负责处理此类事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void doSnapshotLoad(final LoadSnapshotClosure done) {
Requires.requireNonNull(done, "LoadSnapshotClosure is null");
// 获取快照数据读取器
final SnapshotReader reader = done.start();
if (reader == null) {
done.run(new Status(RaftError.EINVAL, "open SnapshotReader failed"));
return;
}
// 获取快照元数据信息
final RaftOutter.SnapshotMeta meta = reader.load();
if (meta == null) {
done.run(new Status(RaftError.EINVAL, "SnapshotReader load meta failed"));
if (reader.getRaftError() == RaftError.EIO) {
final RaftException err = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT,
RaftError.EIO, "Fail to load snapshot meta");
setError(err);
}
return;
}
final LogId lastAppliedId = new LogId(this.lastAppliedIndex.get(), this.lastAppliedTerm);
final LogId snapshotId = new LogId(meta.getLastIncludedIndex(), meta.getLastIncludedTerm());
// 本地已经应用的日志 logIndex 和 term 值相对于当前正在安装的快照更新,说明待加载的快照数据已经过期
if (lastAppliedId.compareTo(snapshotId) > 0) {
done.run(new Status(
RaftError.ESTALE,
"Loading a stale snapshot last_applied_index=%d last_applied_term=%d snapshot_index=%d snapshot_term=%d",
lastAppliedId.getIndex(), lastAppliedId.getTerm(), snapshotId.getIndex(), snapshotId.getTerm()));
return;
}
// 调用状态机 StateMachine#onSnapshotLoad 方法加载快照
if (!this.fsm.onSnapshotLoad(reader)) {
done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE,
RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
setError(e);
return;
}
if (meta.getOldPeersCount() == 0) {
// Joint stage is not supposed to be noticeable by end users.
final Configuration conf = new Configuration();
for (int i = 0, size = meta.getPeersCount(); i < size; i++) {
final PeerId peer = new PeerId();
Requires.requireTrue(peer.parse(meta.getPeers(i)), "Parse peer failed");
conf.addPeer(peer);
}
this.fsm.onConfigurationCommitted(conf);
}
// 更新状态数据
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedTerm = meta.getLastIncludedTerm();
done.run(Status.OK());
}

FSMCaller 通过调用状态机 StateMachine#onSnapshotLoad 方法将快照数据透传给业务,并由业务决定如何在本地恢复快照所蕴含的状态数据。下面同样以 CounterStateMachine 状态机实现为例来了解下业务加载快照数据的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean onSnapshotLoad(final SnapshotReader reader) {
// Leader 节点不应该执行安装快照的操作
if (isLeader()) {
LOG.warn("Leader is not supposed to load snapshot");
return false;
}
// 获取快照文件对应的元数据信息
if (reader.getFileMeta("data") == null) {
LOG.error("Fail to find data file in {}", reader.getPath());
return false;
}
final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
try {
// 加载快照数据,并更新数据值
this.value.set(snapshot.load());
return true;
} catch (final IOException e) {
LOG.error("Fail to load snapshot from {}", snapshot.getPath());
return false;
}
}

SnapshotExecutor 在向 FSMCaller 发布 SNAPSHOT_LOAD 事件时会设置一个 InstallSnapshotDone 回调,用于感知加载快照数据的状态,如果操作正常则该回调会更新 SnapshotExecutor 本地的状态数据,包括最新被快照的 LogEntry 对应的 logIndex 和 term 值等。此外,还会调用 LogManager#setSnapshot 方法对本地已被快照的日志文件执行截断处理,以节省存储空间。

最后来看一下复制器 Replicator 对于安装快照请求 InstallSnapshot 的响应处理过程,由 Replicator#onInstallSnapshotReturned 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static boolean onInstallSnapshotReturned(final ThreadId id,
final Replicator r,
final Status status,
final InstallSnapshotRequest request,
final InstallSnapshotResponse response) {
boolean success = true;
// 关闭快照数据读取器
r.releaseReader();
// noinspection ConstantConditions
do {
final StringBuilder sb = new StringBuilder("Node "). //
append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
append(" received InstallSnapshotResponse from ").append(r.options.getPeerId()). //
append(" lastIncludedIndex=").append(request.getMeta().getLastIncludedIndex()). //
append(" lastIncludedTerm=").append(request.getMeta().getLastIncludedTerm());
// 目标 Follower 节点运行异常
if (!status.isOk()) {
sb.append(" error:").append(status);
LOG.info(sb.toString());
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to install snapshot at peer={}, error={}", r.options.getPeerId(), status);
}
success = false;
break;
}
// 目标 Follower 节点拒绝本次安装快照的请求
if (!response.getSuccess()) {
sb.append(" success=false");
LOG.info(sb.toString());
success = false;
break;
}
// 目标 Follower 节点成功处理本次安装快照的请求,更新 nextIndex
r.nextIndex = request.getMeta().getLastIncludedIndex() + 1;
sb.append(" success=true");
LOG.info(sb.toString());
} while (false);
// We don't retry installing the snapshot explicitly.
// id is unlock in sendEntries
// 给目标节点安装快照失败,清空 inflight 请求,重新发送探针请求
if (!success) {
//should reset states
r.resetInflights();
r.state = State.Probe;
r.block(Utils.nowMs(), status.getCode());
return false;
}
r.hasSucceeded = true;
// 回调 CatchUpClosure
r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
r.sendTimeoutNow(false, false);
}
// id is unlock in _send_entriesheartbeatCounter
r.state = State.Replicate;
return true;
}

如果给目标 Follower 或 Learner 节点安装快照成功,则对应的复制器 Replicator 会更新下一个待发送的 LogEntry 索引值 Replicator#nextIndex 字段,并切换运行状态为 Replicate,接下去转为向目标 Follower 或 Learner 节点复制日志数据。

总结

本文介绍了 Raft 协议引入快照机制所要解决的问题,并分析了 JRaft 对于生成和安装快照文件的设计与实现。快照机制虽然在 JRaft 算法库中被定义为一个可选的功能,但是对于线上业务而言还是尽量建议启动快照机制,对于降低本地磁盘空间占用和网络开销,缩短新节点上线的初始化时间都能够起到积极的作用。

参考

  1. Raft Consensus Algorithm
  2. SOFA-JRaft 官网
  3. SOFA-JRaft:Snapshot 原理剖析