聊聊storm的AssignmentDistributionService
序
本文主要研究一下storm的AssignmentDistributionService
AssignmentDistributionService
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/** * A service for distributing master assignments to supervisors, this service makes the assignments notification * asynchronous. * * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer. * * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, * let the supervisors sync instead. * * <p>Caution: this class is not thread safe. * * <pre>{@code * Working mode * +--------+ +-----------------+ * | queue1 | ==> | Working thread1 | * +--------+ shuffle +--------+ +-----------------+ * | Master | ==> * +--------+ +--------+ +-----------------+ * | queue2 | ==> | Working thread2 | * +--------+ +-----------------+ * } * </pre> */ public class AssignmentDistributionService implements Closeable { //...... private ExecutorService service; /** * Assignments request queue. */ private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue; /** * Add an assignments for a node/supervisor for distribution. * @param node node id of supervisor. * @param host host name for the node. * @param serverPort node thrift server port. * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments} */ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) { try { //For some reasons, we can not get supervisor port info, eg: supervisor shutdown, //Just skip for this scheduling round. if (serverPort == null) { LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node); return; } boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS); if (!success) { LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node); } } catch (InterruptedException e) { LOG.error("Add node assignments interrupted: {}", e.getMessage()); throw new RuntimeException(e); } } private LinkedBlockingQueue<NodeAssignments> nextQueue() { return this.assignmentsQueue.get(nextQueueId()); } }
- Nimbus通过调用AssignmentDistributionService的addAssignmentsForNode,将任务分配结果通知到supervisor
- addAssignmentsForNode主要是将SupervisorAssignments放入到assignmentsQueue
AssignmentDistributionService.getInstance
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/** * Factory method for initialize a instance. * @param conf config. * @return an instance of {@link AssignmentDistributionService} */ public static AssignmentDistributionService getInstance(Map conf) { AssignmentDistributionService service = new AssignmentDistributionService(); service.prepare(conf); return service; } /** * Function for initialization. * * @param conf config */ public void prepare(Map conf) { this.conf = conf; this.random = new Random(47); this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10); this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100); this.assignmentsQueue = new HashMap<>(); for (int i = 0; i < threadsNum; i++) { this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize)); } //start the thread pool this.service = Executors.newFixedThreadPool(threadsNum); this.active = true; //start the threads for (int i = 0; i < threadsNum; i++) { this.service.submit(new DistributeTask(this, i)); } // for local cluster localSupervisors = new HashMap<>(); if (ConfigUtils.isLocalMode(conf)) { isLocalMode = true; } }
- getInstance方法new了一个AssignmentDistributionService,同时调用prepare方法进行初始化
- prepare的时候,创建了threadsNum数量的LinkedBlockingQueue,队列大小为DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE
- 另外通过Executors.newFixedThreadPool(threadsNum)创建一个线程池,然后提交threadsNum数量的DistributeTask,每个queue对应一个DistributeTask
DistributeTask
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/** * Task to distribute assignments. */ static class DistributeTask implements Runnable { private AssignmentDistributionService service; private Integer queueIndex; DistributeTask(AssignmentDistributionService service, Integer index) { this.service = service; this.queueIndex = index; } @Override public void run() { while (service.isActive()) { try { NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex); sendAssignmentsToNode(nodeAssignments); } catch (InterruptedException e) { if (service.isActive()) { LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause()); } else { // service is off now just interrupt it. Thread.currentThread().interrupt(); } } } } private void sendAssignmentsToNode(NodeAssignments assignments) { if (this.service.isLocalMode) { //local node Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode()); if (supervisor != null) { supervisor.sendSupervisorAssignments(assignments.getAssignments()); } else { LOG.error("Can not find node {} for assignments distribution", assignments.getNode()); throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance."); } } else { // distributed mode try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), assignments.getHost(), assignments.getServerPort())) { try { client.getClient().sendSupervisorAssignments(assignments.getAssignments()); } catch (Exception e) { //just ignore the exception. LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage()); } } catch (Throwable e) { //just ignore any error/exception. LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage()); } } } } /** * Get an assignments from the target queue with the specific index. * @param queueIndex index of the queue * @return an {@link NodeAssignments} * @throws InterruptedException */ public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException { NodeAssignments target = null; while (true) { target = getQueueById(queueIndex).poll(); if (target != null) { return target; } Time.sleep(100L); } }
- AssignmentDistributionService在prepare的时候,会往线程池提交DistributeTask
- DistributeTask的run方法不断循环,从对应的queue取NodeAssignments,然后调用sendAssignmentsToNode进行远程通信
- sendAssignmentsToNode调用client.getClient().sendSupervisorAssignments(assignments.getAssignments())
Supervisor.launchSupervisorThriftServer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException { // validate port int port = getThriftServerPort(); try { ServerSocket socket = new ServerSocket(port); socket.close(); } catch (BindException e) { LOG.error("{} is not available. Check if another process is already listening on {}", port, port); throw new RuntimeException(e); } TProcessor processor = new org.apache.storm.generated.Supervisor.Processor( new org.apache.storm.generated.Supervisor.Iface() { @Override public void sendSupervisorAssignments(SupervisorAssignments assignments) throws AuthorizationException, TException { checkAuthorization("sendSupervisorAssignments"); LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments); SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments, getReadClusterState()); getEventManger().add(syn); } //...... }); this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR); this.thriftServer.serve(); }
- Supervisor.launchSupervisorThriftServer的时候,添加了TProcessor,将SupervisorAssignments包装为SynchronizeAssignments添加到EventManager中
SynchronizeAssignments.run
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
/** * A runnable which will synchronize assignments to node local and then worker processes. */ public class SynchronizeAssignments implements Runnable { //...... @Override public void run() { // first sync assignments to local, then sync processes. if (null == assignments) { getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId()); } else { assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments); } this.readClusterState.run(); } private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) { if (null == assignments) { //unknown error, just skip return; } Map<String, byte[]> serAssignments = new HashMap<>(); for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) { serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue())); } clusterState.syncRemoteAssignments(serAssignments); } }
- 这里调用了assignedAssignmentsToLocal,然后还触发了this.readClusterState.run()
- assignedAssignmentsToLocal调用了clusterState.syncRemoteAssignments(serAssignments)
StormClusterStateImpl.syncRemoteAssignments
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@Override public void syncRemoteAssignments(Map<String, byte[]> remote) { if (null != remote) { this.assignmentsBackend.syncRemoteAssignments(remote); } else { Map<String, byte[]> tmp = new HashMap<>(); List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false); for (String stormId : stormIds) { byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false); tmp.put(stormId, assignment); } this.assignmentsBackend.syncRemoteAssignments(tmp); } }
- 这里将serAssignments信息更新到assignmentsBackend(
即本地内存
) - 如果remote为null,这里则从zk读取分配信息,然后更新到内存;zk地址为ClusterUtils.assignmentPath(stormId)(
/assignments/{topologyId}
)
ReadClusterState.run
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@Override public synchronized void run() { try { List<String> stormIds = stormClusterState.assignments(null); Map<String, Assignment> assignmentsSnapshot = getAssignmentsSnapshot(stormClusterState); Map<Integer, LocalAssignment> allAssignments = readAssignments(assignmentsSnapshot); if (allAssignments == null) { //Something odd happened try again later return; } Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds); HashSet<Integer> assignedPorts = new HashSet<>(); LOG.debug("Synchronizing supervisor"); LOG.debug("All assignment: {}", allAssignments); LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions); for (Integer port : allAssignments.keySet()) { if (iSuper.confirmAssigned(port)) { assignedPorts.add(port); } } HashSet<Integer> allPorts = new HashSet<>(assignedPorts); iSuper.assigned(allPorts); allPorts.addAll(slots.keySet()); Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>(); for (Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) { String topoId = entry.getKey(); if (entry.getValue() != null) { for (ProfileRequest req : entry.getValue()) { NodeInfo ni = req.get_nodeInfo(); if (host.equals(ni.get_node())) { Long port = ni.get_port().iterator().next(); Set<TopoProfileAction> actions = filtered.get(port.intValue()); if (actions == null) { actions = new HashSet<>(); filtered.put(port.intValue(), actions); } actions.add(new TopoProfileAction(topoId, req)); } } } } for (Integer port : allPorts) { Slot slot = slots.get(port); if (slot == null) { slot = mkSlot(port); slots.put(port, slot); slot.start(); } slot.setNewAssignment(allAssignments.get(port)); slot.addProfilerActions(filtered.get(port)); } } catch (Exception e) { LOG.error("Failed to Sync Supervisor", e); throw new RuntimeException(e); } }
- 这里调用slot的setNewAssignment进行分配,设置slot的AtomicReference<LocalAssignment> newAssignment
- Slot的run方法会轮询通过stateMachineStep方法对newAssignment进行判断然后更新nextState
小结
Nimbus通过调用AssignmentDistributionService的addAssignmentsForNode,将任务分配结果通知到supervisor
- addAssignmentsForNode主要是将SupervisorAssignments放入到assignmentsQueue;AssignmentDistributionService默认创建一个指定线程数的线程池,同时创建指定线程数的队列及DistributeTask
- DistributeTask不断循环从指定queue拉取SynchronizeAssignments,然后调用sendAssignmentsToNode通知到supervisor
Supervisor在启动的时候会launchSupervisorThriftServer,注册了响应sendSupervisorAssignments的processor,将接收到的SupervisorAssignments包装为SynchronizeAssignments添加到EventManager中
- EventManager处理SynchronizeAssignments时执行其run方法,调用了assignedAssignmentsToLocal,然后还触发了this.readClusterState.run()
- assignedAssignmentsToLocal调用了clusterState.syncRemoteAssignments(serAssignments)将分配信息更新到本地内存;而readClusterState.run()主要是更新slot的newAssignment值,之后依赖Slot的轮询去感知状态变化,然后触发相应的处理