openstack Rocky系列之Cinder:(一)Cinder服务启动

比较忙,很长世间没空看openstack源码,抽时间看了一下cinder的源码,贴下学习心得。本文简单写一下cinder的三个服务的启动,cinder-api, cinder-scheduler, 以及cinder-volume,三者启动都差不多

1、cinder-api

入口文件为/usr/bin/cinder-api,由此可知,入口为cinder.cmd.api文件中的main函数 

#!/usr/bin/python2                                                                                                                                                                                         
# PBR Generated from u‘console_scripts‘

import sys 

from cinder.cmd.api import main


if __name__ == "__main__":
    sys.exit(main())

main函数如下,主要关注14-16行即可

def main():
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    CONF(sys.argv[1:], project=‘cinder‘,
         version=version.version_string())
    config.set_middleware_defaults()
    logging.setup(CONF, "cinder")
    python_logging.captureWarnings(True)
    utils.monkey_patch()

    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)

    rpc.init(CONF)
    launcher = service.process_launcher()
    server = service.WSGIService(‘osapi_volume‘)
    launcher.launch_service(server, workers=server.workers)
    launcher.wait()

14行创建一个 ProcessLauncher 对象,以便后续对app进行launch

15行创建 WSGIService 对象,名称为 osapi_volume

16行,launch对象调用launch_service方法对server进行处理,通过查看源码,是调用了ProcessLauncher对象的_start_child对服务进行处理

def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)  #对象类型校验
        wrap = ServiceWrapper(service, workers)  #对象包装

        # Hide existing objects from the garbage collector, so that most
        # existing pages will remain in shared memory rather than being
        # duplicated between subprocesses in the GC mark-and-sweep. (Requires
        # Python 3.7 or later.)
        if hasattr(gc, ‘freeze‘):
            gc.freeze()

        LOG.info(‘Starting %d workers‘, wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:
            self._start_child(wrap)

      _start_child方法很简单,调用了os.fork()创建了一个子进程,如果创建子进程成功,再次调用_child_process方法

def _start_child(self, wrap):
        if len(wrap.forktimes) > wrap.workers:
            # Limit ourselves to one process a second (over the period of
            # number of workers * 1 second). This will allow workers to
            # start up quickly but ensure we don‘t fork off children that
            # die instantly too quickly.
            if time.time() - wrap.forktimes[0] < wrap.workers:
                LOG.info(‘Forking too fast, sleeping‘)
                time.sleep(1)

            wrap.forktimes.pop(0)

        wrap.forktimes.append(time.time())

        pid = os.fork()
        if pid == 0:
            self.launcher = self._child_process(wrap.service)
            while True:
                self._child_process_handle_signal()
                status, signo = self._child_wait_for_exit_or_signal(
                    self.launcher)
                if not _is_sighup_and_daemon(signo):
                    self.launcher.wait()
                    break
                self.launcher.restart()

            os._exit(status)

        LOG.debug(‘Started child %d‘, pid)

        wrap.children.add(pid)
        self.children[pid] = wrap

        return pid
def _child_process(self, service):
        self._child_process_handle_signal()

        # Reopen the eventlet hub to make sure we don‘t share an epoll
        # fd with parent and/or siblings, which would be bad
        eventlet.hubs.use_hub()

        # Close write to ensure only parent has it open
        os.close(self.writepipe)
        # Create greenthread to watch for parent to close pipe
        eventlet.spawn_n(self._pipe_watcher)

        # Reseed random number generator
        random.seed()

        launcher = Launcher(self.conf, restart_method=self.restart_method)
        launcher.launch_service(service)
        return launcher

      _child_process方法中调用了eventlet,获取hub,并且创建一个线程,对进程进行观察,同时创建一个Launcher对象,对服务进行lanch,launch_service

def launch_service(self, service, workers=1):
        """Load and start the given service.

        :param service: The service you would like to start, must be an
                        instance of :class:`oslo_service.service.ServiceBase`
        :param workers: This param makes this method compatible with
                        ProcessLauncher.launch_service. It must be None, 1 or
                        omitted.
        :returns: None

        """
        if workers is not None and workers != 1:
            raise ValueError(_("Launcher asked to start multiple workers"))
        _check_service_base(service)
        service.backdoor_port = self.backdoor_port
        self.services.add(service)   #关键

Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的service的start()方法

def add(self, service):
        """Add a service to a list and create a thread to run it.

        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

@staticmethod
    def run_service(service, done):
        """Service start wrapper.

        :param service: service to run
        :param done: event to wait on until a shutdown is triggered
        :returns: None

        """
        try:
            service.start()
        except Exception:
            LOG.exception(‘Error starting thread.‘)
            raise SystemExit(1)
        else:
            done.wait()

这个service即最初提到的WSGIService 对象,查看一下其start方法

def start(self):
        """Start serving this service using loaded configuration.

        Also, retrieve updated port number in case ‘0‘ was passed in, which
        indicates a random port should be used.

        :returns: None

        """
        if self.manager:
            self.manager.init_host()
        self.server.start()
        self.port = self.server.port

此时self.manager为None,关键执行步骤为self.server.start(),这个server为WSGIService 进行init的时候构造的对象

class WSGIService(service.ServiceBase):
    """Provides ability to launch API from a ‘paste‘ configuration."""

    def __init__(self, name, loader=None):
        """Initialize, but do not start the WSGI server.

        :param name: The name of the WSGI server given to the loader.
        :param loader: Loads the WSGI application using the given name.
        :returns: None

        """
        self.name = name
        self.manager = self._get_manager()
        self.loader = loader or wsgi.Loader(CONF)
        self.app = self.loader.load_app(name)
        self.host = getattr(CONF, ‘%s_listen‘ % name, "0.0.0.0")
        self.port = getattr(CONF, ‘%s_listen_port‘ % name, 0)
        self.use_ssl = getattr(CONF, ‘%s_use_ssl‘ % name, False)
        self.workers = (getattr(CONF, ‘%s_workers‘ % name, None) or
                        processutils.get_worker_count())
        if self.workers and self.workers < 1:
            worker_name = ‘%s_workers‘ % name
            msg = (_("%(worker_name)s value of %(workers)d is invalid, "
                     "must be greater than 0.") %
                   {‘worker_name‘: worker_name,
                    ‘workers‘: self.workers})
            raise exception.InvalidConfigurationValue(msg)
        setup_profiler(name, self.host)

        self.server = wsgi.Server(CONF,
                                  name,
                                  self.app,
                                  host=self.host,
                                  port=self.port,
                                  use_ssl=self.use_ssl)   # 这里
def start(self):
        """Start serving a WSGI application.

        :returns: None
        """
        # The server socket object will be closed after server exits,
        # but the underlying file descriptor will remain open, and will
        # give bad file descriptor error. So duplicating the socket object,
        # to keep file descriptor usable.

        self.dup_socket = self.socket.dup()

        if self._use_ssl:
            self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)

        wsgi_kwargs = {
            ‘func‘: eventlet.wsgi.server,
            ‘sock‘: self.dup_socket,
            ‘site‘: self.app,
            ‘protocol‘: self._protocol,
            ‘custom_pool‘: self._pool,
            ‘log‘: self._logger,
            ‘log_format‘: self.conf.wsgi_log_format,
            ‘debug‘: False,
            ‘keepalive‘: self.conf.wsgi_keep_alive,
            ‘socket_timeout‘: self.client_socket_timeout
            }

        if self._max_url_len:
            wsgi_kwargs[‘url_length_limit‘] = self._max_url_len

        self._server = eventlet.spawn(**wsgi_kwargs)

至此,cinder-api启动顺利启动

2、cinder-scheduler

入口文件为/usr/bin/cinder-scheduler,则实际调用文件为cinder/cmd/scheduler.py下的main

#!/usr/bin/python2                                                                                                                                                                                         
# PBR Generated from u‘console_scripts‘

import sys 

from cinder.cmd.scheduler import main


if __name__ == "__main__":
    sys.exit(main())
def main():
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    CONF(sys.argv[1:], project=‘cinder‘,
         version=version.version_string())
    logging.setup(CONF, "cinder")
    python_logging.captureWarnings(True)
    utils.monkey_patch()
    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    server = service.Service.create(binary=‘cinder-scheduler‘)
    service.serve(server)
    service.wait()

实际启动服务的只有10,11,12行,通过Service对象的类方法create创建一个名server的service,然后用serve方法(实际调用launch对service进行处理),launch方法通过判断serve传进的worker参数来判断,传入的对象是process还是service,但是不管是service还是process,都是调用了launch_service这个接口,此处,同上述api所述,Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的Service的start()方法。

@classmethod
    def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_interval=None,
               periodic_fuzzy_delay=None, service_name=None,
               coordination=False, cluster=None, **kwargs):
        if not host:
            host = CONF.host
        if not binary:
            binary = os.path.basename(inspect.stack()[-1][1])
        if not topic:
            topic = binary
        if not manager:
            subtopic = topic.rpartition(‘cinder-‘)[2]
            manager = CONF.get(‘%s_manager‘ % subtopic, None)
        if report_interval is None:
            report_interval = CONF.report_interval
        if periodic_interval is None:
            periodic_interval = CONF.periodic_interval
        if periodic_fuzzy_delay is None:
            periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
        service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_interval=periodic_interval,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          service_name=service_name,
                          coordination=coordination,
                          cluster=cluster, **kwargs)

        return service_obj
def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_(‘serve() can only be called once‘))

    _launcher = service.launch(CONF, server, workers=workers)
def launch(conf, service, workers=1, restart_method=‘reload‘):
    """Launch a service with a given number of workers.

    :param conf: an instance of ConfigOpts
    :param service: a service to launch, must be an instance of
           :class:`oslo_service.service.ServiceBase`
    :param workers: a number of processes in which a service will be running
    :param restart_method: Passed to the constructed launcher. If ‘reload‘, the
        launcher will call reload_config_files on SIGHUP. If ‘mutate‘, it will
        call mutate_config_files on SIGHUP. Other values produce a ValueError.
    :returns: instance of a launcher that was used to launch the service
    """

    if workers is not None and workers <= 0:
        raise ValueError(_("Number of workers should be positive!"))

    if workers is None or workers == 1:
        launcher = ServiceLauncher(conf, restart_method=restart_method)
    else:
        launcher = ProcessLauncher(conf, restart_method=restart_method)
    launcher.launch_service(service, workers=workers)

    return launcher

至此,cinder-scheduler启动完成

3、cinder-volume

cinder-volume的入口文件为/usr/bin/cinder-volume,由此可知真正的入口函数为cinder/cmd/volume.py中的main函数

#!/usr/bin/python2                                                                                                                                                                                         
# PBR Generated from u‘console_scripts‘

import sys 

from cinder.cmd.volume import main


if __name__ == "__main__":
    sys.exit(main())
def _launch_services_win32():
    if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends:
        msg = _(‘The explicitly passed backend name "%(backend_name)s" is not ‘
                ‘among the enabled backends: %(enabled_backends)s.‘)
        raise exception.InvalidInput(
            reason=msg % dict(backend_name=CONF.backend_name,
                              enabled_backends=CONF.enabled_backends))

    # We‘ll avoid spawning a subprocess if a single backend is requested.
    single_backend_name = (CONF.enabled_backends[0]
                           if len(CONF.enabled_backends) == 1
                           else CONF.backend_name)
    if single_backend_name:
        launcher = service.get_launcher()
        _launch_service(launcher, single_backend_name)
    elif CONF.enabled_backends:
        # We‘re using the ‘backend_name‘ argument, requesting a certain backend
        # and constructing the service object within the child process.
        launcher = service.WindowsProcessLauncher()
        py_script_re = re.compile(r‘.*\.py\w?$‘)
        for backend in filter(None, CONF.enabled_backends):
            cmd = sys.argv + [‘--backend_name=%s‘ % backend]
            # Recent setuptools versions will trim ‘-script.py‘ and ‘.exe‘
            # extensions from sys.argv[0].
            if py_script_re.match(sys.argv[0]):
                cmd = [sys.executable] + cmd
            launcher.add_process(cmd)
            _notify_service_started()

    _ensure_service_started()

    launcher.wait()


def _launch_services_posix():
    launcher = service.get_launcher()

    for backend in filter(None, CONF.enabled_backends):
        _launch_service(launcher, backend)

    _ensure_service_started()

    launcher.wait()


def main():
    objects.register_all()
    gmr_opts.set_defaults(CONF)
    CONF(sys.argv[1:], project=‘cinder‘,
         version=version.version_string())
    logging.setup(CONF, "cinder")
    python_logging.captureWarnings(True)
    priv_context.init(root_helper=shlex.split(utils.get_root_helper()))
    utils.monkey_patch()
    gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
    global LOG
    LOG = logging.getLogger(__name__)

    if not CONF.enabled_backends:
        LOG.error(‘Configuration for cinder-volume does not specify ‘
                  ‘"enabled_backends". Using DEFAULT section to configure ‘
                  ‘drivers is not supported since Ocata.‘)
        sys.exit(1)

    if os.name == ‘nt‘:
        # We cannot use oslo.service to spawn multiple services on Windows.
        # It relies on forking, which is not available on Windows.
        # Furthermore, service objects are unmarshallable objects that are
        # passed to subprocesses.
        _launch_services_win32()
    else:
        _launch_services_posix()

  根据平台是windows还是linux,进行不同的调用,因为是在linux上部署,所有调用的函数为_launch_services_posix(),其中调用了_launch_service(launcher, backend)创建Service对象,同上述cinder-schedule的启动流程,后面不再累述了~

相关推荐