聊聊elasticsearch的MembershipAction
序
本文主要研究一下elasticsearch的MembershipAction
MembershipAction
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public class MembershipAction { private static final Logger logger = LogManager.getLogger(MembershipAction.class); public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join"; public static final String DISCOVERY_JOIN_VALIDATE_ACTION_NAME = "internal:discovery/zen/join/validate"; public static final String DISCOVERY_LEAVE_ACTION_NAME = "internal:discovery/zen/leave"; //...... private final TransportService transportService; private final MembershipListener listener; public MembershipAction(TransportService transportService, MembershipListener listener, Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) { this.transportService = transportService; this.listener = listener; transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, ThreadPool.Names.GENERIC, new JoinRequestRequestHandler()); transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, () -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators)); transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) { transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME); } public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) { transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) { transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } /** * Validates the join request, throwing a failure if it failed. */ public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) { transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); } //...... }
- MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler
TransportRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequest.java
public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest { public static class Empty extends TransportRequest { public static final Empty INSTANCE = new Empty(); } /** * Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent". */ private TaskId parentTaskId = TaskId.EMPTY_TASK_ID; public TransportRequest() { } public TransportRequest(StreamInput in) throws IOException { parentTaskId = TaskId.readFromStream(in); } /** * Set a reference to task that created this request. */ @Override public void setParentTask(TaskId taskId) { this.parentTaskId = taskId; } /** * Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent". */ @Override public TaskId getParentTask() { return parentTaskId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); parentTaskId = TaskId.readFromStream(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); parentTaskId.writeTo(out); } }
- TransportRequest继承了TransportMessage类,同时声明实现TaskAwareRequest接口
LeaveRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public static class LeaveRequest extends TransportRequest { private DiscoveryNode node; public LeaveRequest() { } private LeaveRequest(DiscoveryNode node) { this.node = node; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = new DiscoveryNode(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); } }
- LeaveRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode
JoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public static class JoinRequest extends TransportRequest { private DiscoveryNode node; public DiscoveryNode getNode() { return node; } public JoinRequest() { } private JoinRequest(DiscoveryNode node) { this.node = node; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = new DiscoveryNode(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); } }
- JoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入DiscoveryNode
ValidateJoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
static class ValidateJoinRequest extends TransportRequest { private ClusterState state; ValidateJoinRequest() {} ValidateJoinRequest(ClusterState state) { this.state = state; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); this.state = ClusterState.readFrom(in, null); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); this.state.writeTo(out); } }
- ValidateJoinRequest继承了TransportRequest,并覆盖了readFrom及writeTo方法,除了调用父类的对应方法外,还同时读取或写入ClusterState
TransportRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/transport/TransportRequestHandler.java
public interface TransportRequestHandler<T extends TransportRequest> { /** * Override this method if access to the Task parameter is needed */ default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception { messageReceived(request, channel); } void messageReceived(T request, TransportChannel channel) throws Exception; }
- TransportRequestHandler接口定义了messageReceived方法,同时还提供了一个messageReceived的default方法
LeaveRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> { @Override public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception { listener.onLeave(request.node); channel.sendResponse(TransportResponse.Empty.INSTANCE); } }
- LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法
JoinRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> { @Override public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception { listener.onJoin(request.getNode(), new JoinCallback() { @Override public void onSuccess() { try { channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (Exception e) { onFailure(e); } } @Override public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn("failed to send back failure on join request", inner); } } }); } }
- JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法
ValidateJoinRequestRequestHandler
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> { private final Supplier<DiscoveryNode> localNodeSupplier; private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators; ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) { this.localNodeSupplier = localNodeSupplier; this.joinValidators = joinValidators; } @Override public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception { DiscoveryNode node = localNodeSupplier.get(); assert node != null : "local node is null"; joinValidators.stream().forEach(action -> action.accept(node, request.state)); channel.sendResponse(TransportResponse.Empty.INSTANCE); } }
- ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验
MembershipAction.MembershipListener
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java
public interface MembershipListener { void onJoin(DiscoveryNode node, JoinCallback callback); void onLeave(DiscoveryNode node); } public interface JoinCallback { void onSuccess(); void onFailure(Exception e); }
- MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中
ZenDiscovery.MembershipListener
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener { //...... private class MembershipListener implements MembershipAction.MembershipListener { @Override public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) { handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback); } @Override public void onLeave(DiscoveryNode node) { handleLeaveRequest(node); } } void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) { if (nodeJoinController == null) { throw new IllegalStateException("discovery module is not yet started"); } else { // we do this in a couple of places including the cluster update thread. This one here is really just best effort // to ensure we fail as fast as possible. onJoinValidators.stream().forEach(a -> a.accept(node, state)); if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion()); } // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); // validate the join request, will throw a failure if it fails, which will get back to the // node calling the join request try { membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e); callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); return; } nodeJoinController.handleJoinRequest(node, callback); } } private void handleLeaveRequest(final DiscoveryNode node) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a node failure return; } if (localNodeMaster()) { removeNode(node, "zen-disco-node-left", "left"); } else if (node.equals(clusterState().nodes().getMasterNode())) { handleMasterGone(node, null, "shut_down"); } } private void removeNode(final DiscoveryNode node, final String source, final String reason) { masterService.submitStateUpdateTask( source + "(" + node + "), reason(" + reason + ")", new NodeRemovalClusterStateTaskExecutor.Task(node, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), nodeRemovalExecutor, nodeRemovalExecutor); } private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a master failure return; } if (localNodeMaster()) { // we might get this on both a master telling us shutting down, and then the disconnect failure return; } logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause); synchronized (stateMutex) { if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) { // flush any pending cluster states from old master, so it will not be set as master again pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason)); rejoin("master left (reason = " + reason + ")"); } } } //...... }
- ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone
- removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor
- handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))
NodeJoinController.handleJoinRequest
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
public class NodeJoinController { //...... /** * processes or queues an incoming join request. * <p> * Note: doesn't do any validation. This should have been done before. */ public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { if (electionContext != null) { electionContext.addIncomingJoin(node, callback); checkPendingJoinsAndElectIfNeeded(); } else { masterService.submitStateUpdateTask("zen-disco-node-join", node, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, new JoinTaskListener(callback, logger)); } } //...... }
- NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener
MasterService.submitStateUpdateTask
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java
public class MasterService extends AbstractLifecycleComponent { //...... public <T> void submitStateUpdateTask(String source, T task, ClusterStateTaskConfig config, ClusterStateTaskExecutor<T> executor, ClusterStateTaskListener listener) { submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor); } public <T> void submitStateUpdateTasks(final String source, final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor<T> executor) { if (!lifecycle.started()) { return; } final ThreadContext threadContext = threadPool.getThreadContext(); final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.markAsSystemContext(); List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream() .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)) .collect(Collectors.toList()); taskBatcher.submitTasks(safeTasks, config.timeout()); } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } } } class Batcher extends TaskBatcher { Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { super(logger, threadExecutor); } @Override protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) { threadPool.generic().execute( () -> tasks.forEach( task -> ((UpdateTask) task).listener.onFailure(task.source, new ProcessClusterEventTimeoutException(timeout, task.source)))); } @Override protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) { ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey; List<UpdateTask> updateTasks = (List<UpdateTask>) tasks; runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); } class UpdateTask extends BatchedTask { final ClusterStateTaskListener listener; UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener, ClusterStateTaskExecutor<?> executor) { super(priority, source, executor, task); this.listener = listener; } @Override public String describeTasks(List<? extends BatchedTask> tasks) { return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks( tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList())); } } } protected class TaskInputs { public final String summary; public final List<Batcher.UpdateTask> updateTasks; public final ClusterStateTaskExecutor<Object> executor; TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) { this.summary = summary; this.executor = executor; this.updateTasks = updateTasks; } public boolean runOnlyWhenMaster() { return executor.runOnlyOnMaster(); } public void onNoLongerMaster() { updateTasks.forEach(task -> task.listener.onNoLongerMaster(task.source())); } } //...... }
- submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法
ClusterStateTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java
public interface ClusterStateTaskExecutor<T> { ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception; default boolean runOnlyOnMaster() { return true; } default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } default String describeTasks(List<T> tasks) { return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator); } //...... }
- ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor
JoinTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
// visible for testing public static class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> { private final AllocationService allocationService; private final ElectMasterService electMasterService; private final Logger logger; private final int minimumMasterNodesOnLocalNode; public JoinTaskExecutor(Settings settings, AllocationService allocationService, ElectMasterService electMasterService, Logger logger) { this.allocationService = allocationService; this.electMasterService = electMasterService; this.logger = logger; minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); } @Override public ClusterTasksResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception { final ClusterTasksResult.Builder<DiscoveryNode> results = ClusterTasksResult.builder(); final DiscoveryNodes currentNodes = currentState.nodes(); boolean nodesChanged = false; ClusterState.Builder newState; if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { return results.successes(joiningNodes).build(currentState); } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes; // use these joins to try and become the master. // Note that we don't have to do any validation of the amount of joining nodes - the commit // during the cluster state publishing guarantees that we have enough newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes); nodesChanged = true; } else if (currentNodes.isLocalNodeElectedMaster() == false) { logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode()); throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request"); } else { newState = ClusterState.builder(currentState); } DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); assert nodesBuilder.isLocalNodeElectedMaster(); Version minClusterNodeVersion = newState.nodes().getMinNodeVersion(); Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion(); // we only enforce major version transitions on a fully formed clusters final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false; // processing any joins for (final DiscoveryNode node : joiningNodes) { if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { // noop } else if (currentNodes.nodeExists(node)) { logger.debug("received a join request for an existing node [{}]", node); } else { try { if (enforceMajorVersion) { MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); } MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices // we have to reject nodes that don't support all indices we have in this cluster MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData()); nodesBuilder.add(node); nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); } catch (IllegalArgumentException | IllegalStateException e) { results.failure(node, e); continue; } } results.success(node); } if (nodesChanged) { newState.nodes(nodesBuilder); return results.build(allocationService.reroute(newState.build(), "node_join")); } else { // we must return a new cluster state instance to force publishing. This is important // for the joining node to finalize its join and set us as a master return results.build(newState.build()); } } //...... @Override public boolean runOnlyOnMaster() { // we validate that we are allowed to change the cluster state during cluster state processing return false; } @Override public void clusterStatePublished(ClusterChangedEvent event) { electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } }
- JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute
NodeRemovalClusterStateTaskExecutor
elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener { private final AllocationService allocationService; private final ElectMasterService electMasterService; private final Consumer<String> rejoin; private final Logger logger; public static class Task { private final DiscoveryNode node; private final String reason; public Task(final DiscoveryNode node, final String reason) { this.node = node; this.reason = reason; } public DiscoveryNode node() { return node; } public String reason() { return reason; } @Override public String toString() { return node + " " + reason; } } public NodeRemovalClusterStateTaskExecutor( final AllocationService allocationService, final ElectMasterService electMasterService, final Consumer<String> rejoin, final Logger logger) { this.allocationService = allocationService; this.electMasterService = electMasterService; this.rejoin = rejoin; this.logger = logger; } @Override public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception { final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes()); boolean removed = false; for (final Task task : tasks) { if (currentState.nodes().nodeExists(task.node())) { remainingNodesBuilder.remove(task.node()); removed = true; } else { logger.debug("node [{}] does not exist in cluster state, ignoring", task); } } if (!removed) { // no nodes to remove, keep the current cluster state return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState); } final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder); final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks); if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) { final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes()); rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])", masterNodes, electMasterService.minimumMasterNodes())); return resultBuilder.build(currentState); } else { ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState); return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks))); } } // visible for testing // hook is used in testing to ensure that correct cluster state is used to test whether a // rejoin or reroute is needed ClusterState remainingNodesClusterState(final ClusterState currentState, DiscoveryNodes.Builder remainingNodesBuilder) { return ClusterState.builder(currentState).nodes(remainingNodesBuilder).build(); } @Override public void onFailure(final String source, final Exception e) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); } @Override public void onNoLongerMaster(String source) { logger.debug("no longer master while processing node removal [{}]", source); } }
- NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer
小结
- MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler
- LeaveRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onLeave方法;JoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用了MembershipListener的onJoin方法;ValidateJoinRequestRequestHandler实现了TransportRequestHandler接口,它的messageReceived主要是调用joinValidators挨个进行校验
- MembershipListener接口定义了onJoin及onLeave方法,其中onJoin方法接收JoinCallback;它有一个同名实现类,在ZenDiscovery类中;ZenDiscovery.MembershipListener的onJoin方法调用了handleJoinRequest方法,该方法主要是调用了nodeJoinController.handleJoinRequest(node, callback);onLeave方法调用了handleLeaveRequest方法,该方法针对local的执行removeNode,否则执行handleMasterGone;removeNode方法主要是执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor及ClusterStateTaskListener均为nodeRemovalExecutor;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason))
- NodeJoinController的handleJoinRequest方法在electionContext不为null的时候执行electionContext.addIncomingJoin;否则执行masterService.submitStateUpdateTask,传递的ClusterStateTaskExecutor为joinTaskExecutor,ClusterStateTaskListener为JoinTaskListener
- MasterService的submitStateUpdateTasks方法主要是创建Batcher.UpdateTask,然后通过taskBatcher.submitTasks提交运行;Batcher继承了TaskBatcher,其run方法是调用runTasks方法,传递的参数为TaskInputs;TaskInputs的runOnlyWhenMaster方法调用的是executor.runOnlyOnMaster(),onNoLongerMaster调用的是task.listener.onNoLongerMaster(task.source())方法
- ClusterStateTaskExecutor定义了execute方法,同时提供了runOnlyOnMaster、clusterStatePublished、describeTasks这几个default方法;它有很多实现类,比如JoinTaskExecutor、NodeRemovalClusterStateTaskExecutor;JoinTaskExecutor的execute方法要是构建根据joiningNodes构建ClusterState,如果nodes有变化,则调用allocationService.reroute(newState.build(), "node_join")对location进行reroute;NodeRemovalClusterStateTaskExecutor同时实现了ClusterStateTaskExecutor, ClusterStateTaskListener接口;其execute方法主要是从currentState移除相应的node,构建remainingNodesClusterState,对于hasEnoughMasterNodes的情况则执行allocationService.disassociateDeadNodes,否则执行名为rejoin的Consumer
doc
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。