聊聊storm nimbus的mkAssignments
序
本文主要研究一下storm nimbus的mkAssignments
Nimbus.mkAssignments
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
void doRebalance(String topoId, StormBase stormBase) throws Exception { RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options(); StormBase updated = new StormBase(); updated.set_topology_action_options(null); updated.set_component_debug(Collections.emptyMap()); if (rbo.is_set_num_executors()) { updated.set_component_executors(rbo.get_num_executors()); } if (rbo.is_set_num_workers()) { updated.set_num_workers(rbo.get_num_workers()); } stormClusterState.updateStorm(topoId, updated); updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal())); idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute. mkAssignments(topoId); } private void mkAssignments() throws Exception { mkAssignments(null); }
- 这里调用stormClusterState.updateStorm(topoId, updated)更新zk数据
- 这里doRebalance以及mkAssignments都调用了mkAssignments(String scratchTopoId)方法
mkAssignments(String scratchTopoId)
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private void mkAssignments(String scratchTopoId) throws Exception { try { if (!isReadyForMKAssignments()) { return; } // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {} // filter out ones which have a executor timeout // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors // should be in each slot (e.g., 4, 4, 4, 5) // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around IStormClusterState state = stormClusterState; //read all the topologies Map<String, StormBase> bases; Map<String, TopologyDetails> tds = new HashMap<>(); synchronized (submitLock) { // should promote: only fetch storm bases of topologies that need scheduling. bases = state.topologyBases(); for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) { Entry<String, StormBase> entry = it.next(); String id = entry.getKey(); try { tds.put(id, readTopologyDetails(id, entry.getValue())); } catch (KeyNotFoundException e) { //A race happened and it is probably not running it.remove(); } } } List<String> assignedTopologyIds = state.assignments(null); Map<String, Assignment> existingAssignments = new HashMap<>(); for (String id : assignedTopologyIds) { //for the topology which wants rebalance (specified by the scratchTopoId) // we exclude its assignment, meaning that all the slots occupied by its assignment // will be treated as free slot in the scheduler code. if (!id.equals(scratchTopoId)) { Assignment currentAssignment = state.assignmentInfo(id, null); if (!currentAssignment.is_set_owner()) { TopologyDetails td = tds.get(id); if (td != null) { currentAssignment.set_owner(td.getTopologySubmitter()); state.setAssignment(id, currentAssignment, td.getConf()); } } existingAssignments.put(id, currentAssignment); } } // make the new assignments for topologies lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds); } catch (Exception e) { this.mkAssignmentsErrors.mark(); throw e; } }
- 通过readTopologyDetails读取topology对应的TopologyDetails信息
- 通过state.assignments(Runnable callback)以及state.assignmentInfo(String stormId, Runnable callback)获取已经存在的assignments
- 然后调用lockingMkAssignments为topologies分配新的assignments
lockingMkAssignments
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases, String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state, Map<String, TopologyDetails> tds) throws Exception { Topologies topologies = new Topologies(tds); synchronized (schedLock) { Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state); //construct the final Assignments by adding start-times etc into it Map<String, Assignment> newAssignments = new HashMap<>(); //...... //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); TopologyDetails td = topologies.getById(topoId); if (assignment.equals(existingAssignment)) { LOG.debug("Assignment for {} hasn't changed", topoId); } else { LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment); state.setAssignment(topoId, assignment, td.getConf()); } } //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment //because the number of existing assignments is small for every scheduling round, //we expect to notify supervisors at almost the same time Map<String, String> totalAssignmentsChangedNodes = new HashMap<>(); for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment)); } notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes, basicSupervisorDetailsMap); Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>(); for (Entry<String, Assignment> entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment == null) { existingAssignment = new Assignment(); existingAssignment.set_executor_node_port(new HashMap<>()); existingAssignment.set_executor_start_time_secs(new HashMap<>()); } Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment); addedSlots.put(topoId, newSlots); } inimbus.assignSlots(topologies, addedSlots); } }
- 这里首先调用computeNewSchedulerAssignments方法计算newSchedulerAssignments
- 对newAssignments进行遍历,如果assignment有变动,则调用state.setAssignment(topoId, assignment, td.getConf())将分配信息写到zk
- 然后计算totalAssignmentsChangedNodes调用notifySupervisorsAssignments,借助AssignmentDistributionService通知到supervisors
computeNewSchedulerAssignments
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments, Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException { Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases); Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream() .filter(topo -> !supportRpcHeartbeat(topo)) .map(TopologyDetails::getId) .collect(Collectors.toSet()); updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies); Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies, topoToExec, scratchTopologyId); Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec, topoToAliveExecutors); Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments, topoToAliveExecutors); Set<String> missingAssignmentTopologies = new HashSet<>(); for (TopologyDetails topo : topologies.getTopologies()) { String id = topo.getId(); Set<List<Integer>> allExecs = topoToExec.get(id); Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id); int numDesiredWorkers = topo.getNumWorkers(); int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id)); if (allExecs == null || allExecs.isEmpty() || !allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) { //We have something to schedule... missingAssignmentTopologies.add(id); } } Map<String, SupervisorDetails> supervisors = readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies); Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); schedulingStartTimeNs.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge //...... return cluster.getAssignments(); }
- computeTopologyToExecutors返回的是Map<String, Set<List<Integer>>> topoToExec,key为topologyId,value为Set<List<Integer>>,Integer为executorId
- computeTopologyToAliveExecutors返回的是Map<String, Set<List<Integer>>> topoToAliveExecutors,key为topologyId,value为Set<List<Integer>>,Integer为executorId
- computeSupervisorToDeadPorts返回的是Map<String, Set<Long>> supervisorToDeadPorts,key为supervisorId,value为Set<Long>,Long为port端口
- computeTopologyToSchedulerAssignment返回的是Map<String, SchedulerAssignmentImpl> topoToSchedAssignment,key为topologyId,value为SchedulerAssignmentImpl
- 之后根据topology配置要求的woker数量及executor数量跟现有的assignment进行对比,计算出missingAssignmentTopologies
- readAllSupervisorDetails返回Map<String, SupervisorDetails> supervisors,即存活的可以分配的supervisor信息
- 最后这里创建Cluster,然后调用scheduler.schedule(topologies, cluster);进行调度
DefaultScheduler.schedule
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
public void schedule(Topologies topologies, Cluster cluster) { defaultSchedule(topologies, cluster); } public static void defaultSchedule(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); } Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size()); Set<WorkerSlot> badSlots = null; if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } if (badSlots != null) { cluster.freeSlots(badSlots); } EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster); } }
- 通过cluster.needsSchedulingTopologies()遍历需要调度的TopologyDetails进行处理
- 通过cluster.getAvailableSlots()获取可用的slots
- 通过EvenScheduler.getAliveAssignedWorkerSlotExecutors获取aliveAssigned,根据aliveAssigned调用slotsCanReassign获取canReassignSlots
- Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size())计算totalSlotsToUse,对badSlots进行释放
- 最后通过EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster)进行分配
EvenScheduler.scheduleTopologiesEvenly
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { String topologyId = topology.getId(); Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster); Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment); for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List<ExecutorDetails> executors = entry.getValue(); cluster.assign(nodePort, topologyId, executors); } } }
- 通过scheduleTopology(topology, cluster)计算newAssignment,转换为Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors
- 通过cluster.assign(nodePort, topologyId, executors)进行分配
EvenScheduler.scheduleTopology
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size()); List<WorkerSlot> sortedList = sortSlots(availableSlots); if (sortedList == null) { LOG.error("No available slots for topology: {}", topology.getName()); return new HashMap<ExecutorDetails, WorkerSlot>(); } //allow requesting slots number bigger than available slots int toIndex = (totalSlotsToUse - aliveAssigned.size()) > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size()); List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex); Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); } Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors); Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>(); if (reassignSlots.size() == 0) { return reassignment; } List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors); Collections.sort(executors, new Comparator<ExecutorDetails>() { @Override public int compare(ExecutorDetails o1, ExecutorDetails o2) { return o1.getStartTask() - o2.getStartTask(); } }); for (int i = 0; i < executors.size(); i++) { reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size())); } if (reassignment.size() != 0) { LOG.info("Available slots: {}", availableSlots.toString()); } return reassignment; }
- 根据availableSlots计算出List<WorkerSlot> reassignSlots
- 通过Sets.difference(allExecutors, aliveExecutors)计算reassignExecutors,并按startTask进行排序
- 最后遍历reassignExecutors,分配slots,slot按reassignSlots.get(i % reassignSlots.size())分配
ExecutorDetails
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
public class ExecutorDetails { public final int startTask; public final int endTask; //...... @Override public boolean equals(Object other) { if (!(other instanceof ExecutorDetails)) { return false; } ExecutorDetails executor = (ExecutorDetails) other; return (this.startTask == executor.startTask) && (this.endTask == executor.endTask); } }
- ExecutorDetails虽然起名为executor,但是里头封装的是startTask及endTask信息
- schedule的过程,其实是将标记staskTask及endTask的任务分配给slot去处理
- 注意这里重写了equals方法,根据startTask及endTask来对比,主要是前面的集合对比操作需要
Cluster.assign
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
/** * Assign the slot to the executors for this topology. * * @throws RuntimeException if the specified slot is already occupied. */ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) { assertValidTopologyForModification(topologyId); if (isSlotOccupied(slot)) { throw new RuntimeException( "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied."); } TopologyDetails td = topologies.getById(topologyId); if (td == null) { throw new IllegalArgumentException( "Trying to schedule for topo " + topologyId + " but that is not a known topology " + topologies.getAllIds()); } WorkerResources resources = calculateWorkerResources(td, executors); SchedulerAssignmentImpl assignment = assignments.get(topologyId); if (assignment == null) { assignment = new SchedulerAssignmentImpl(topologyId); assignments.put(topologyId, assignment); } else { for (ExecutorDetails executor : executors) { if (assignment.isExecutorAssigned(executor)) { throw new RuntimeException( "Attempting to assign executor: " + executor + " of topology: " + topologyId + " to workerslot: " + slot + ". The executor is already assigned to workerslot: " + assignment.getExecutorToSlot().get(executor) + ". The executor must unassigned before it can be assigned to another slot!"); } } } assignment.assign(slot, executors, resources); String nodeId = slot.getNodeId(); double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment); assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory); updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory); totalResourcesPerNodeCache.remove(slot.getNodeId()); }
- 这里主要是委托给assignment.assign(slot, executors, resources)
SchedulerAssignmentImpl.assign
storm-2.0.0/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
/** * Assign the slot to executors. */ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) { assert slot != null; for (ExecutorDetails executor : executors) { this.executorToSlot.put(executor, slot); } slotToExecutors.computeIfAbsent(slot, MAKE_LIST) .addAll(executors); if (slotResources != null) { resources.put(slot, slotResources); } else { resources.remove(slot); } }
- 给executorDetail(
startTask及endTask
)分配slot
小结
- Nimbus.mkAssignments承担着调度分配的任务,主要计算newSchedulerAssignments(
Map<String, SchedulerAssignmentImpl>,key为topologyId
),期间通过scheduler.schedule(topologies, cluster);进行调度 - EvenScheduler.scheduleTopology方法是调度的核心操作,主要是对需要分配的ExecutorDetails分配slots,EvenScheduler采用的是robbin round的策略,对每个reassignExecutor,按reassignSlots.get(i % reassignSlots.size())分配slot
- ExecutorDetail里头有两个属性,一个是startTask,一个是endTask,分配的过程其实是对这些task分配slot
- 分配完之后,lockingMkAssignments里头有两个操作,一个是state.setAssignment(topoId, assignment, td.getConf())将变动的assignment存储到zk;另外一个是调用notifySupervisorsAssignments方法,通知到supervisor,supervisor接收到之后更新本地内存