9. SOFAJRaft源码分析— Follower如何通过Snapshot快速追上Leader日志?
前言
引入快照机制主要是为了解决两个问题:
- JRaft新节点加入后,如何快速追上最新的数据
- Raft 节点出现故障重新启动后如何高效恢复到最新的数据
Snapshot 源码分析
生成 Raft 节点的快照文件
如果用户需开启 SOFAJRaft 的 Snapshot 机制,则需要在其客户端中设置配置参数类 NodeOptions 的“snapshotUri”属性(即为:Snapshot 文件的存储路径),配置该属性后,默认会启动一个定时器任务(“JRaft-SnapshotTimer”)自动去完成 Snapshot 操作,间隔时间通过配置类 NodeOptions 的“snapshotIntervalSecs”属性指定,默认 3600 秒。定时任务启动代码如下:
NodeImpl#init
this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) { @Override protected void onTrigger() { handleSnapshotTimeout(); } }; private void handleSnapshotTimeout() { this.writeLock.lock(); try { if (!this.state.isActive()) { return; } } finally { this.writeLock.unlock(); } // do_snapshot in another thread to avoid blocking the timer thread. //异步调用doSnapshot Utils.runInThread(() -> doSnapshot(null)); } private void doSnapshot(final Closure done) { if (this.snapshotExecutor != null) { this.snapshotExecutor.doSnapshot(done); } else { if (done != null) { final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported"); Utils.runClosureInThread(done, status); } } }
最后这里会调用快照执行器的doSnapshot方法,我们往下看。
SnapshotExecutorImpl#doSnapshot
public void doSnapshot(final Closure done) { boolean doUnlock = true; this.lock.lock(); try { //正在停止 if (this.stopped) { Utils.runClosureInThread(done, new Status(RaftError.EPERM, "Is stopped.")); return; } //正在下载镜像 if (this.downloadingSnapshot.get() != null) { Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot.")); return; } //正在保存镜像 if (this.savingSnapshot) { Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot.")); return; } //当前业务状态机已经提交的 Index 索引是否等于 Snapshot 最后保存的日志 Index 索引 //如果两个值相等则表示,业务数据没有新增,无需再生成一次没有意义的 Snapshot if (this.fsmCaller.getLastAppliedIndex() == this.lastSnapshotIndex) { // There might be false positive as the getLastAppliedIndex() is being // updated. But it's fine since we will do next snapshot saving in a // predictable time. doUnlock = false; this.lock.unlock(); this.logManager.clearBufferedLogs(); Utils.runClosureInThread(done); return; } //创建一个快照存储器,用来写数据 final SnapshotWriter writer = this.snapshotStorage.create(); if (writer == null) { Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer.")); reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer."); return; } this.savingSnapshot = true; //封装了回调方法和快照存储器 final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); //交给状态机来保存快照 if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down.")); return; } this.runningJobs.incrementAndGet(); } finally { if (doUnlock) { this.lock.unlock(); } } }
doSnapshot方法首先会去进行几个校验,然后会调用状态机的onSnapshotSave方法去保存快照
FSMCallerImpl#onSnapshotSave
public boolean onSnapshotSave(final SaveSnapshotClosure done) { //发布事件到ApplyTaskHandler中处理 return enqueueTask((task, sequence) -> { task.type = TaskType.SNAPSHOT_SAVE; task.done = done; }); }
状态机的onSnapshotSave方法会将事件发布到Disruptor中,交给ApplyTaskHandler处理。
最后会调用doSnapshotSave方法进行处理
private void doSnapshotSave(final SaveSnapshotClosure done) { Requires.requireNonNull(done, "SaveSnapshotClosure is null"); //设置最新的任期和index到metaBuilder中 final long lastAppliedIndex = this.lastAppliedIndex.get(); final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() // .setLastIncludedIndex(lastAppliedIndex) // .setLastIncludedTerm(this.lastAppliedTerm); //设置当前配置到metaBuilder final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex); if (confEntry == null || confEntry.isEmpty()) { LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex); Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex)); return; } for (final PeerId peer : confEntry.getConf()) { metaBuilder.addPeers(peer.toString()); } if (confEntry.getOldConf() != null) { for (final PeerId peer : confEntry.getOldConf()) { metaBuilder.addOldPeers(peer.toString()); } } //设置元数据到done实例中 final SnapshotWriter writer = done.start(metaBuilder.build()); if (writer == null) { done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed")); return; } //调用状态机的实例生成快照 this.fsm.onSnapshotSave(writer, done); }
这个方法会将配置参数全部都设置到metaBuilder中,然后调用状态机实例onSnapshotSave方法,我们这里可以看官方的例子Counter 计数器示例:https://www.sofastack.tech/projects/sofa-jraft/counter-example/ ,看看是怎么使用的。
CounterStateMachine#onSnapshotSave
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { final long currVal = this.value.get(); //异步将数据落盘 Utils.runInThread(() -> { final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data"); if (snapshot.save(currVal)) { if (writer.addFile("data")) { done.run(Status.OK()); } else { done.run(new Status(RaftError.EIO, "Fail to add file to writer")); } } else { done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath())); } }); }
这个方法会将数据获取之后写到文件内,然后在保存快照文件后调用传入的参数 closure.run(status) 通知调用者保存成功或者失败。
由于我们这里传入的回调实例是SaveSnapshotDone实例,所以会调用SaveSnapshotDone的run方法中:
SaveSnapshotDone
public void run(final Status status) { Utils.runInThread(() -> continueRun(status)); } void continueRun(final Status st) { //校验index、设置index和任期,更新状态为已保存快照完毕 final int ret = onSnapshotSaveDone(st, this.meta, this.writer); if (ret != 0 && st.isOk()) { st.setError(ret, "node call onSnapshotSaveDone failed"); } if (this.done != null) { Utils.runClosureInThread(this.done, st); } }
run方法会异步的调用continueRun方法,然后调用到onSnapshotSaveDone,里面校验index、设置index和任期,更新状态为已保存快照完毕。
安装快照
Jraft在发送日志到Follower的时候会判断一下需要发送快照,以便让 Follower 快速跟上 Leader 的日志进度,不再回放很早以前的日志信息,即缓解了网络的吞吐量,又提升了日志同步的效率。
Replicator#sendEntries
private boolean sendEntries(final long nextSendingIndex) { final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder(); //填写当前Replicator的配置信息到rb中 if (!fillCommonFields(rb, nextSendingIndex - 1, false)) { // unlock id in installSnapshot installSnapshot(); return false; } ....//省略 }
这里会调用installSnapshot发送rpc请求给Follower
Replicator#installSnapshot
void installSnapshot() { //正在安装快照 if (this.state == State.Snapshot) { LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId()); this.id.unlock(); return; } boolean doUnlock = true; try { Requires.requireTrue(this.reader == null, "Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(), this.state); //初始化SnapshotReader this.reader = this.options.getSnapshotStorage().open(); //如果快照存储功能没有开启,则设置错误信息并返回 if (this.reader == null) { final NodeImpl node = this.options.getNode(); final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT); error.setStatus(new Status(RaftError.EIO, "Fail to open snapshot")); this.id.unlock(); doUnlock = false; node.onError(error); return; } //生成一个读uri连接,给其他节点读取快照 final String uri = this.reader.generateURIForCopy(); if (uri == null) { final NodeImpl node = this.options.getNode(); final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT); error.setStatus(new Status(RaftError.EIO, "Fail to generate uri for snapshot reader")); releaseReader(); this.id.unlock(); doUnlock = false; node.onError(error); return; } //获取从文件加载的元数据信息 final RaftOutter.SnapshotMeta meta = this.reader.load(); if (meta == null) { final String snapshotPath = this.reader.getPath(); final NodeImpl node = this.options.getNode(); final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT); error.setStatus(new Status(RaftError.EIO, "Fail to load meta from %s", snapshotPath)); releaseReader(); this.id.unlock(); doUnlock = false; node.onError(error); return; } //设置请求参数 final InstallSnapshotRequest.Builder rb = InstallSnapshotRequest.newBuilder(); rb.setTerm(this.options.getTerm()); rb.setGroupId(this.options.getGroupId()); rb.setServerId(this.options.getServerId().toString()); rb.setPeerId(this.options.getPeerId().toString()); rb.setMeta(meta); rb.setUri(uri); this.statInfo.runningState = RunningState.INSTALLING_SNAPSHOT; this.statInfo.lastLogIncluded = meta.getLastIncludedIndex(); this.statInfo.lastTermIncluded = meta.getLastIncludedTerm(); final InstallSnapshotRequest request = rb.build(); this.state = State.Snapshot; // noinspection NonAtomicOperationOnVolatileField this.installSnapshotCounter++; final long monotonicSendTimeMs = Utils.monotonicMs(); final int stateVersion = this.version; final int seq = getAndIncrementReqSeq(); //发起InstallSnapshotRequest请求 final Future<Message> rpcFuture = this.rpcService.installSnapshot(this.options.getPeerId().getEndpoint(), request, new RpcResponseClosureAdapter<InstallSnapshotResponse>() { @Override public void run(final Status status) { onRpcReturned(Replicator.this.id, RequestType.Snapshot, status, request, getResponse(), seq, stateVersion, monotonicSendTimeMs); } }); addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture); } finally { if (doUnlock) { this.id.unlock(); } } }
在发送InstallSnapshotRequest请求之前,先会做几个校验:
- 校验用户是否设置配置参数类 NodeOptions 的“snapshotUri”属性,如果没有设置就不会开启快照,返回reader就为空
- 是否可以返回一个获取快照的uri
- 能否从获取从文件加载的元数据信息
如果上面的校验都通过的话,那么就会发送一个InstallSnapshotRequest请求到Follower,交给InstallSnapshotRequestProcessor处理器处理,最后会跳转到NodeImpl的handleInstallSnapshot方法执行具体逻辑。
NodeImpl#handleInstallSnapshot
public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) { // 如果快照安装执行器不存在,则抛出异常不支持快照操作 if (this.snapshotExecutor == null) { return RpcResponseFactory.newResponse(RaftError.EINVAL, "Not supported snapshot"); } // 根据请求携带的 serverId 序列化 PeerId final PeerId serverId = new PeerId(); if (!serverId.parse(request.getServerId())) { LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(), request.getServerId()); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s", request.getServerId()); } this.writeLock.lock(); try { // 判断当前节点的状态 if (!this.state.isActive()) { LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(), this.state); return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s:%s is not in active state, state %s.", this.groupId, this.serverId, this.state.name()); } // 判断 request 携带的 term 比当前节点的 trem,比较 term 的合法性 if (request.getTerm() < this.currTerm) { LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", getNodeId(), request.getPeerId(), request.getTerm(), this.currTerm); return InstallSnapshotResponse.newBuilder() // .setTerm(this.currTerm) // .setSuccess(false) // .build(); } //判断当前节点leader的合法性 checkStepDown(request.getTerm(), serverId); if (!serverId.equals(this.leaderId)) { LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", serverId, this.currTerm, this.leaderId); // Increase the term by 1 and make both leaders step down to minimize the // loss of split brain stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term.")); return InstallSnapshotResponse.newBuilder() // .setTerm(request.getTerm() + 1) // .setSuccess(false) // .build(); } } finally { this.writeLock.unlock(); } final long startMs = Utils.monotonicMs(); try { if (LOG.isInfoEnabled()) { LOG.info( "Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, " + "lastIncludedLogTerm={}, lastLogId={}.", getNodeId(), request.getServerId(), request.getMeta().getLastIncludedIndex(), request.getMeta() .getLastIncludedTerm(), this.logManager.getLastLogId(false)); } // 执行快照安装 this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done); return null; } finally { this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs); } }
这个方法进过一系列的校验后会调用快照执行器的installSnapshot执行快照安装
SnapshotExecutorImpl#installSnapshot
public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, final RpcRequestClosure done) { final SnapshotMeta meta = request.getMeta(); // 创建一个下载快照的任务对象 final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done); //DON'T access request, response, and done after this point //as the retry snapshot will replace this one. // 将下载快照任务进行注册 if (!registerDownloadingSnapshot(ds)) { LOG.warn("Fail to register downloading snapshot"); // This RPC will be responded by the previous session return; } Requires.requireNonNull(this.curCopier, "curCopier"); try { // 阻塞等待 copy 任务完成 this.curCopier.join(); } catch (final InterruptedException e) { // 中断补偿,如果 curCopier 任务被中断过,表明有更新的 snapshot 在接受了,旧的 snapshot 被停止下载 Thread.currentThread().interrupt(); LOG.warn("Install snapshot copy job was canceled."); return; } // 装载下载好的 snapshot 文件 loadDownloadingSnapshot(ds, meta); }
这个方法会调用registerDownloadingSnapshot方法将快照进行下载注册,然后调用join方法阻塞直到下载完成,然后调用loadDownloadingSnapshot方法装载下载好的文件
SnapshotExecutorImpl#loadDownloadingSnapshot
void loadDownloadingSnapshot(final DownloadingSnapshot ds, final SnapshotMeta meta) { SnapshotReader reader; this.lock.lock(); try { // 获取快照任务的结果,如果不相等则表示新的 snapshot 在接收 if (ds != this.downloadingSnapshot.get()) { //It is interrupted and response by other request,just return return; } Requires.requireNonNull(this.curCopier, "curCopier"); reader = this.curCopier.getReader(); //校验复制机状态是否正常 if (!this.curCopier.isOk()) { if (this.curCopier.getCode() == RaftError.EIO.getNumber()) { reportError(this.curCopier.getCode(), this.curCopier.getErrorMsg()); } Utils.closeQuietly(reader); ds.done.run(this.curCopier); Utils.closeQuietly(this.curCopier); this.curCopier = null; this.downloadingSnapshot.set(null); this.runningJobs.countDown(); return; } Utils.closeQuietly(this.curCopier); this.curCopier = null; //校验reader状态是否正常 if (reader == null || !reader.isOk()) { Utils.closeQuietly(reader); this.downloadingSnapshot.set(null); ds.done.sendResponse(RpcResponseFactory.newResponse(RaftError.EINTERNAL, "Fail to copy snapshot from %s", ds.request.getUri())); this.runningJobs.countDown(); return; } this.loadingSnapshot = true; this.loadingSnapshotMeta = meta; } finally { this.lock.unlock(); } // 下载 snapshot 成功,进入状态机进行 snapshot 安装 final InstallSnapshotDone installSnapshotDone = new InstallSnapshotDone(reader); // 送入状态机执行快照安装事件 if (!this.fsmCaller.onSnapshotLoad(installSnapshotDone)) { LOG.warn("Fail to call fsm onSnapshotLoad"); installSnapshotDone.run(new Status(RaftError.EHOSTDOWN, "This raft node is down")); } }
在进行各种校验之后会调用到状态机的onSnapshotLoad方法,执行快照安装
FSMCallerImpl#onSnapshotLoad
public boolean onSnapshotLoad(final LoadSnapshotClosure done) { return enqueueTask((task, sequence) -> { task.type = TaskType.SNAPSHOT_LOAD; task.done = done; }); }
onSnapshotLoad方法会发送一个状态为TaskType.SNAPSHOT_LOAD任务到Disruptor队列中,然后会ApplyTaskHandler中处理,最后调用到doSnapshotLoad方法中进行处理。
FSMCallerImpl#doSnapshotLoad
private void doSnapshotLoad(final LoadSnapshotClosure done) { ....//省略 if (!this.fsm.onSnapshotLoad(reader)) { done.run(new Status(-1, "StateMachine onSnapshotLoad failed")); final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed"); setError(e); return; } ....//省略 done.run(Status.OK()); }
doSnapshotLoad方法最后调用到状态机的实现的onSnapshotLoad方法上,我们这里以CounterStateMachine为例:
CounterStateMachine#onSnapshotLoad
public boolean onSnapshotLoad(final SnapshotReader reader) { if (isLeader()) { LOG.warn("Leader is not supposed to load snapshot"); return false; } if (reader.getFileMeta("data") == null) { LOG.error("Fail to find data file in {}", reader.getPath()); return false; } final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data"); try { this.value.set(snapshot.load()); return true; } catch (final IOException e) { LOG.error("Fail to load snapshot from {}", snapshot.getPath()); return false; } }
onSnapshotLoad方法会将文件内容加载出来然后将值设置到value中,这就表示数据加载完毕了。