聊聊storm supervisor的启动

本文主要研究一下storm supervisor的启动

Supervisor.launch

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Supervisor.java

/**
     * Launch the supervisor
     */
    public void launch() throws Exception {
        LOG.info("Starting Supervisor with conf {}", conf);
        String path = ConfigUtils.supervisorTmpDir(conf);
        FileUtils.cleanDirectory(new File(path));

        Localizer localizer = getLocalizer();

        SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
        hb.run();
        // should synchronize supervisor so it doesn't launch anything after being down (optimization)
        Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
        heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);

        this.eventManager = new EventManagerImp(false);
        this.readState = new ReadClusterState(this);
        
        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
        Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
        if (portToAssignments != null) {
            Map<String, LocalAssignment> assignments = new HashMap<>();
            for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
                assignments.put(la.get_topology_id(), la);
            }
            for (String topoId : downloadedTopoIds) {
                LocalAssignment la = assignments.get(topoId);
                if (la != null) {
                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
                } else {
                    LOG.warn("Could not find an owner for topo {}", topoId);
                }
            }
        }
        // do this after adding the references so we don't try to clean things being used
        localizer.startCleaner();

        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);

        if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
            // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
            // to date even if callbacks don't all work exactly right
            eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));

            // Blob update thread. Starts with 30 seconds delay, every 30 seconds
            blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));

            // supervisor health check
            eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
        }
        LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());
    }
  • supervisor launch的时候new了一个ReadClusterState

ReadClusterState

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ReadClusterState.java

public ReadClusterState(Supervisor supervisor) throws Exception {
        this.superConf = supervisor.getConf();
        this.stormClusterState = supervisor.getStormClusterState();
        this.syncSupEventManager = supervisor.getEventManger();
        this.assignmentVersions = new AtomicReference<Map<String, VersionedData<Assignment>>>(new HashMap<String, VersionedData<Assignment>>());
        this.assignmentId = supervisor.getAssignmentId();
        this.iSuper = supervisor.getiSupervisor();
        this.localizer = supervisor.getAsyncLocalizer();
        this.host = supervisor.getHostName();
        this.localState = supervisor.getLocalState();
        this.clusterState = supervisor.getStormClusterState();
        this.cachedAssignments = supervisor.getCurrAssignment();
        
        this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
        
        @SuppressWarnings("unchecked")
        List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS);
        for (Number port: ports) {
            slots.put(port.intValue(), mkSlot(port.intValue()));
        }
        
        try {
            Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
            for (Slot slot: slots.values()) {
                String workerId = slot.getWorkerId();
                if (workerId != null) {
                    workers.remove(workerId);
                }
            }
            if (!workers.isEmpty()) {
                supervisor.killWorkers(workers, launcher);
            }
        } catch (Exception e) {
            LOG.warn("Error trying to clean up old workers", e);
        }

        //All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be here
        try {
            localizer.cleanupUnusedTopologies();
        } catch (Exception e) {
            LOG.warn("Error trying to clean up old topologies", e);
        }
        
        for (Slot slot: slots.values()) {
            slot.start();
        }
    }

    private Slot mkSlot(int port) throws Exception {
        return new Slot(localizer, superConf, launcher, host, port,
                localState, clusterState, iSuper, cachedAssignments);
    }
  • 这里读取SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports),默认是[6700,6701,6702,6703]
  • 通过ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext())创建ContainerLauncher
  • 根据slots的port配置调用mkSlot创建slot,最后挨个调用slot的start,启动slot线程

ContainerLauncher.make

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ContainerLauncher.java

/**
     * Factory to create the right container launcher 
     * for the config and the environment.
     * @param conf the config
     * @param supervisorId the ID of the supervisor
     * @param sharedContext Used in local mode to let workers talk together without netty
     * @return the proper container launcher
     * @throws IOException on any error
     */
    public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException {
        if (ConfigUtils.isLocalMode(conf)) {
            return new LocalContainerLauncher(conf, supervisorId, sharedContext);
        }
        
        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
            return new RunAsUserContainerLauncher(conf, supervisorId);
        }
        return new BasicContainerLauncher(conf, supervisorId);
    }
  • 这里根据配置来创建ContainerLauncher的不同子类,local模式的创建的是LocalContainerLauncher;要求runAsUser的创建的是RunAsUserContainerLauncher;其他的创建的是BasicContainerLauncher

Slot

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java

public void run() {
        try {
            while(!done) {
                Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());
                Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
                
                DynamicState nextState = 
                        stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
                                .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions), staticState);

                if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {
                    LOG.info("STATE {} -> {}", dynamicState, nextState);
                }
                //Save the current state for recovery
                if (!equivalent(nextState.currentAssignment, dynamicState.currentAssignment)) {
                    LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, nextState.currentAssignment);
                    saveNewAssignment(nextState.currentAssignment);
                }

                if (equivalent(nextState.newAssignment, nextState.currentAssignment) &&
                    nextState.currentAssignment != null && nextState.currentAssignment.get_owner() == null &&
                    nextState.newAssignment != null && nextState.newAssignment.get_owner() != null) {
                    //This is an odd case for a rolling upgrade where the user on the old assignment may be null,
                    // but not on the new one.  Although in all other ways they are the same.
                    // If this happens we want to use the assignment with the owner.
                    LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner());
                    saveNewAssignment(nextState.newAssignment);
                    nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
                }
                
                // clean up the profiler actions that are not being processed
                removed.removeAll(dynamicState.profileActions);
                removed.removeAll(dynamicState.pendingStopProfileActions);
                for (TopoProfileAction action: removed) {
                    try {
                        clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
                    } catch (Exception e) {
                        LOG.error("Error trying to remove profiling request, it will be retried", e);
                    }
                }
                Set<TopoProfileAction> orig, copy;
                do {
                    orig = profiling.get();
                    copy = new HashSet<>(orig);
                    copy.removeAll(removed);
                } while (!profiling.compareAndSet(orig, copy));
                dynamicState = nextState;
            }
        } catch (Throwable e) {
            if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                LOG.error("Error when processing event", e);
                Utils.exitProcess(20, "Error when processing an event");
            }
        }
    }

    private void saveNewAssignment(LocalAssignment assignment) {
        synchronized(staticState.localState) {
            Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap();
            if (assignments == null) {
                assignments = new HashMap<>();
            }
            if (assignment == null) {
                assignments.remove(staticState.port);
            } else {
                assignments.put(staticState.port, assignment);
            }
            staticState.localState.setLocalAssignmentsMap(assignments);
        }
        Map<Long, LocalAssignment> update = null;
        Map<Long, LocalAssignment> orig = null;
        do {
            Long lport = new Long(staticState.port);
            orig = cachedCurrentAssignments.get();
            update = new HashMap<>(orig);
            if (assignment == null) {
                update.remove(lport);
            } else {
                update.put(lport, assignment);
            }
        } while (!cachedCurrentAssignments.compareAndSet(orig, update));
    }

    static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception {
        LOG.debug("STATE {}", dynamicState.state);
        switch (dynamicState.state) {
            case EMPTY:
                return handleEmpty(dynamicState, staticState);
            case RUNNING:
                return handleRunning(dynamicState, staticState);
            case WAITING_FOR_WORKER_START:
                return handleWaitingForWorkerStart(dynamicState, staticState);
            case KILL_AND_RELAUNCH:
                return handleKillAndRelaunch(dynamicState, staticState);
            case KILL:
                return handleKill(dynamicState, staticState);
            case WAITING_FOR_BASIC_LOCALIZATION:
                return handleWaitingForBasicLocalization(dynamicState, staticState);
            case WAITING_FOR_BLOB_LOCALIZATION:
                return handleWaitingForBlobLocalization(dynamicState, staticState);
            default:
                throw new IllegalStateException("Code not ready to handle a state of "+dynamicState.state);
        }
    }
  • 不断循环stateMachineStep方法切换state
  • 当state是WAITING_FOR_BLOB_LOCALIZATION时,会触发handleWaitingForBlobLocalization

handleWaitingForBlobLocalization

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java

/**
     * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
     * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
     * PRECONDITION: The slot should be empty
     * @param dynamicState current state
     * @param staticState static data
     * @return the next state
     * @throws Exception on any error
     */
    static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
        assert(dynamicState.pendingLocalization != null);
        assert(dynamicState.pendingDownload != null);
        assert(dynamicState.container == null);
        
        //Ignore changes to scheduling while downloading the topology blobs
        // We don't support canceling the download through the future yet,
        // so to keep everything in sync, just wait
        try {
            dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
            //Downloading of all blobs finished.
            if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
                //Scheduling changed
                staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
                return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
            }
            Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
            return dynamicState.withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START).withPendingLocalization(null, null);
        } catch (TimeoutException e) {
            //We waited for 1 second loop around and try again....
            return dynamicState;
        }
    }
  • 这里通过staticState.containerLauncher.launchContainer去启动container

BasicContainerLauncher.launchContainer

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java

@Override
    public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
        LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext);
        ret.setup();
        ret.launch();
        return ret;
    }
  • launchContainer的时候,先调用setup,再调用launch方法

Container.setup

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Container.java

/**
     * Setup the container to run.  By default this creates the needed directories/links in the
     * local file system
     * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and
     * placed in the appropriate locations
     * @throws IOException on any error
     */
    protected void setup() throws IOException {
        _type.assertFull();
        if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
            LOG.info("Missing topology storm code, so can't launch  worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
                    _supervisorId, _port, _workerId);
            throw new IllegalStateException("Not all needed files are here!!!!");
        } 
        LOG.info("Setting up {}:{}", _supervisorId, _workerId);

        _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));
        _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
        _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));
        
        File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));
        if (!_ops.fileExists(workerArtifacts)) {
            _ops.forceMkdir(workerArtifacts);
            _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts);
        }
    
        String user = getWorkerUser();
        writeLogMetadata(user);
        saveWorkerUser(user);
        createArtifactsLink();
        createBlobstoreLinks();
    }
  • setup主要做一些创建目录或链接的准备工作

BasicContainer.launch

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainer.java

public void launch() throws IOException {
        _type.assertFull();
        LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,
                _supervisorId, _port, _workerId);
        String logPrefix = "Worker Process " + _workerId;
        ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);
        _exitedEarly = false;
        
        final WorkerResources resources = _assignment.get_resources();
        final int memOnheap = getMemOnHeap(resources);
        final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);
        final String jlp = javaLibraryPath(stormRoot, _conf);
        
        List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);

        Map<String, String> topEnvironment = new HashMap<String, String>();
        @SuppressWarnings("unchecked")
        Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
        if (environment != null) {
            topEnvironment.putAll(environment);
        }
        topEnvironment.put("LD_LIBRARY_PATH", jlp);

        LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));

        String workerDir = ConfigUtils.workerRoot(_conf, _workerId);

        launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));
    }

    /**
     * Launch the worker process (non-blocking)
     * 
     * @param command
     *            the command to run
     * @param env
     *            the environment to run the command
     * @param processExitcallback
     *            a callback for when the process exits
     * @param logPrefix
     *            the prefix to include in the logs
     * @param targetDir
     *            the working directory to run the command in
     * @return true if it ran successfully, else false
     * @throws IOException
     *             on any error
     */
    protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
            ExitCodeCallback processExitCallback, File targetDir) throws IOException {
        SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
    }
  • 这里通过mkLaunchCommand来准备创建命令
  • 然后通过SupervisorUtils.launchProcess启动worker进程

mkLaunchCommand

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java

/**
     * Create the command to launch the worker process
     * @param memOnheap the on heap memory for the worker
     * @param stormRoot the root dist dir for the topology
     * @param jlp java library path for the topology
     * @return the command to run
     * @throws IOException on any error.
     */
    private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
            final String jlp) throws IOException {
        final String javaCmd = javaCmd("java");
        final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
        final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
        final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);
        
        List<String> classPathParams = getClassPathParams(stormRoot);
        List<String> commonParams = getCommonParams();
        
        List<String> commandList = new ArrayList<>();
        //Log Writer Command...
        commandList.add(javaCmd);
        commandList.addAll(classPathParams);
        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));
        commandList.addAll(commonParams);
        commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.

        //Worker Command...
        commandList.add(javaCmd);
        commandList.add("-server");
        commandList.addAll(commonParams);
        commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));
        commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
        commandList.addAll(substituteChildopts(OR(
                _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
                _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
        commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
        commandList.add("-Djava.library.path=" + jlp);
        commandList.add("-Dstorm.conf.file=" + stormConfFile);
        commandList.add("-Dstorm.options=" + stormOptions);
        commandList.add("-Djava.io.tmpdir=" + workerTmpDir);
        commandList.addAll(classPathParams);
        commandList.add("org.apache.storm.daemon.worker");
        commandList.add(_topologyId);
        commandList.add(_supervisorId);
        commandList.add(String.valueOf(_port));
        commandList.add(_workerId);
        
        return commandList;
    }
  • 启动参数实例
/usr/lib/jvm/java-1.8-openjdk/jre/bin/java -server -Dlogging.sensitivity=S3 -Dlogfile.name=worker.log -Dstorm.home=/apache-storm-1.2.2 -Dworkers.artifacts=/logs/workers-artifacts -Dstorm.id=DemoTopology-1-1539163962 -Dworker.id=f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf -Dworker.port=6700 -Dstorm.log.dir=/logs -Dlog4j.configurationFile=/apache-storm-1.2.2/log4j2/worker.xml -DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector -Dstorm.local.dir=/data -Xmx768m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -Djava.library.path=/data/supervisor/stormdist/DemoTopology-1-1539163962/resources/Linux-amd64:/data/supervisor/stormdist/DemoTopology-1-1539163962/resources:/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -Dstorm.options=storm.local.hostname%3D192.168.99.100 -Djava.io.tmpdir=/data/workers/f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf/tmp -cp /apache-storm-1.2.2/lib/*:/apache-storm-1.2.2/extlib/*:/conf:/data/supervisor/stormdist/DemoTopology-1-1539163962/stormjar.jar org.apache.storm.daemon.worker DemoTopology-1-1539163962 8dd6dc7f-95cb-49f9-9bd1-f0d638fe6fc6 6700 f0f30bc3-11af-4f4f-b2dd-8cc92d8791bf
  • org.apache.storm.daemon.worker"的路径为storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/worker.clj

SupervisorUtils.launchProcess

storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/SupervisorUtils.java

/**
     * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
     * callback.
     * @param command the command to be executed in the new process
     * @param environment the environment to be applied to the process. Can be
     *                    null.
     * @param logPrefix a prefix for log entries from the output of the process.
     *                  Can be null.
     * @param exitCodeCallback code to be called passing the exit code value
     *                         when the process completes
     * @param dir the working directory of the new process
     * @return the new process
     * @throws IOException
     * @see java.lang.ProcessBuilder
     */
    public static Process launchProcess(List<String> command,
                                        Map<String,String> environment,
                                        final String logPrefix,
                                        final ExitCodeCallback exitCodeCallback,
                                        File dir)
            throws IOException {
        ProcessBuilder builder = new ProcessBuilder(command);
        Map<String,String> procEnv = builder.environment();
        if (dir != null) {
            builder.directory(dir);
        }
        builder.redirectErrorStream(true);
        if (environment != null) {
            procEnv.putAll(environment);
        }
        final Process process = builder.start();
        if (logPrefix != null || exitCodeCallback != null) {
            Utils.asyncLoop(new Callable<Object>() {
                public Object call() {
                    if (logPrefix != null ) {
                        Utils.readAndLogStream(logPrefix,
                                process.getInputStream());
                    }
                    if (exitCodeCallback != null) {
                        try {
                            process.waitFor();
                            exitCodeCallback.call(process.exitValue());
                        } catch (InterruptedException ie) {
                            LOG.info("{} interrupted", logPrefix);
                            exitCodeCallback.call(-1);
                        }
                    }
                    return null; // Run only once.
                }
            });
        }
        return process;
    }
  • 这里通过ProcessBuilder来启动进程

小结

  • storm的supervisor启动的时候,会创建ContainerLauncher以及根据SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports)创建slots
  • slot线程会不断循环state,在WAITING_FOR_BLOB_LOCALIZATION的时候使用ContainerLauncher的launchContainer创建Container并launch
  • container launch的时候通过SupervisorUtils.launchProcess(使用ProcessBuilder)启动worker进程

doc

相关推荐