openstack Rocky系列之Cinder:(一)Cinder服务启动
比较忙,很长世间没空看openstack源码,抽时间看了一下cinder的源码,贴下学习心得。本文简单写一下cinder的三个服务的启动,cinder-api, cinder-scheduler, 以及cinder-volume,三者启动都差不多
1、cinder-api
入口文件为/usr/bin/cinder-api,由此可知,入口为cinder.cmd.api文件中的main函数
#!/usr/bin/python2 # PBR Generated from u‘console_scripts‘ import sys from cinder.cmd.api import main if __name__ == "__main__": sys.exit(main())
main函数如下,主要关注14-16行即可
def main(): objects.register_all() gmr_opts.set_defaults(CONF) CONF(sys.argv[1:], project=‘cinder‘, version=version.version_string()) config.set_middleware_defaults() logging.setup(CONF, "cinder") python_logging.captureWarnings(True) utils.monkey_patch() gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) rpc.init(CONF) launcher = service.process_launcher() server = service.WSGIService(‘osapi_volume‘) launcher.launch_service(server, workers=server.workers) launcher.wait()
14行创建一个 ProcessLauncher 对象,以便后续对app进行launch
15行创建 WSGIService 对象,名称为 osapi_volume
16行,launch对象调用launch_service方法对server进行处理,通过查看源码,是调用了ProcessLauncher对象的_start_child对服务进行处理
def launch_service(self, service, workers=1): """Launch a service with a given number of workers. :param service: a service to launch, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: a number of processes in which a service will be running """ _check_service_base(service) #对象类型校验 wrap = ServiceWrapper(service, workers) #对象包装 # Hide existing objects from the garbage collector, so that most # existing pages will remain in shared memory rather than being # duplicated between subprocesses in the GC mark-and-sweep. (Requires # Python 3.7 or later.) if hasattr(gc, ‘freeze‘): gc.freeze() LOG.info(‘Starting %d workers‘, wrap.workers) while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap)
_start_child方法很简单,调用了os.fork()创建了一个子进程,如果创建子进程成功,再次调用_child_process方法
def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: # Limit ourselves to one process a second (over the period of # number of workers * 1 second). This will allow workers to # start up quickly but ensure we don‘t fork off children that # die instantly too quickly. if time.time() - wrap.forktimes[0] < wrap.workers: LOG.info(‘Forking too fast, sleeping‘) time.sleep(1) wrap.forktimes.pop(0) wrap.forktimes.append(time.time()) pid = os.fork() if pid == 0: self.launcher = self._child_process(wrap.service) while True: self._child_process_handle_signal() status, signo = self._child_wait_for_exit_or_signal( self.launcher) if not _is_sighup_and_daemon(signo): self.launcher.wait() break self.launcher.restart() os._exit(status) LOG.debug(‘Started child %d‘, pid) wrap.children.add(pid) self.children[pid] = wrap return pid
def _child_process(self, service): self._child_process_handle_signal() # Reopen the eventlet hub to make sure we don‘t share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() # Close write to ensure only parent has it open os.close(self.writepipe) # Create greenthread to watch for parent to close pipe eventlet.spawn_n(self._pipe_watcher) # Reseed random number generator random.seed() launcher = Launcher(self.conf, restart_method=self.restart_method) launcher.launch_service(service) return launcher
_child_process方法中调用了eventlet,获取hub,并且创建一个线程,对进程进行观察,同时创建一个Launcher对象,对服务进行lanch,launch_service
def launch_service(self, service, workers=1): """Load and start the given service. :param service: The service you would like to start, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: This param makes this method compatible with ProcessLauncher.launch_service. It must be None, 1 or omitted. :returns: None """ if workers is not None and workers != 1: raise ValueError(_("Launcher asked to start multiple workers")) _check_service_base(service) service.backdoor_port = self.backdoor_port self.services.add(service) #关键
Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的service的start()方法
def add(self, service): """Add a service to a list and create a thread to run it. :param service: service to run """ self.services.append(service) self.tg.add_thread(self.run_service, service, self.done) @staticmethod def run_service(service, done): """Service start wrapper. :param service: service to run :param done: event to wait on until a shutdown is triggered :returns: None """ try: service.start() except Exception: LOG.exception(‘Error starting thread.‘) raise SystemExit(1) else: done.wait()
这个service即最初提到的WSGIService 对象,查看一下其start方法
def start(self): """Start serving this service using loaded configuration. Also, retrieve updated port number in case ‘0‘ was passed in, which indicates a random port should be used. :returns: None """ if self.manager: self.manager.init_host() self.server.start() self.port = self.server.port
此时self.manager为None,关键执行步骤为self.server.start(),这个server为WSGIService 进行init的时候构造的对象
class WSGIService(service.ServiceBase): """Provides ability to launch API from a ‘paste‘ configuration.""" def __init__(self, name, loader=None): """Initialize, but do not start the WSGI server. :param name: The name of the WSGI server given to the loader. :param loader: Loads the WSGI application using the given name. :returns: None """ self.name = name self.manager = self._get_manager() self.loader = loader or wsgi.Loader(CONF) self.app = self.loader.load_app(name) self.host = getattr(CONF, ‘%s_listen‘ % name, "0.0.0.0") self.port = getattr(CONF, ‘%s_listen_port‘ % name, 0) self.use_ssl = getattr(CONF, ‘%s_use_ssl‘ % name, False) self.workers = (getattr(CONF, ‘%s_workers‘ % name, None) or processutils.get_worker_count()) if self.workers and self.workers < 1: worker_name = ‘%s_workers‘ % name msg = (_("%(worker_name)s value of %(workers)d is invalid, " "must be greater than 0.") % {‘worker_name‘: worker_name, ‘workers‘: self.workers}) raise exception.InvalidConfigurationValue(msg) setup_profiler(name, self.host) self.server = wsgi.Server(CONF, name, self.app, host=self.host, port=self.port, use_ssl=self.use_ssl) # 这里
def start(self): """Start serving a WSGI application. :returns: None """ # The server socket object will be closed after server exits, # but the underlying file descriptor will remain open, and will # give bad file descriptor error. So duplicating the socket object, # to keep file descriptor usable. self.dup_socket = self.socket.dup() if self._use_ssl: self.dup_socket = sslutils.wrap(self.conf, self.dup_socket) wsgi_kwargs = { ‘func‘: eventlet.wsgi.server, ‘sock‘: self.dup_socket, ‘site‘: self.app, ‘protocol‘: self._protocol, ‘custom_pool‘: self._pool, ‘log‘: self._logger, ‘log_format‘: self.conf.wsgi_log_format, ‘debug‘: False, ‘keepalive‘: self.conf.wsgi_keep_alive, ‘socket_timeout‘: self.client_socket_timeout } if self._max_url_len: wsgi_kwargs[‘url_length_limit‘] = self._max_url_len self._server = eventlet.spawn(**wsgi_kwargs)
至此,cinder-api启动顺利启动
2、cinder-scheduler
入口文件为/usr/bin/cinder-scheduler,则实际调用文件为cinder/cmd/scheduler.py下的main
#!/usr/bin/python2 # PBR Generated from u‘console_scripts‘ import sys from cinder.cmd.scheduler import main if __name__ == "__main__": sys.exit(main())
def main(): objects.register_all() gmr_opts.set_defaults(CONF) CONF(sys.argv[1:], project=‘cinder‘, version=version.version_string()) logging.setup(CONF, "cinder") python_logging.captureWarnings(True) utils.monkey_patch() gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) server = service.Service.create(binary=‘cinder-scheduler‘) service.serve(server) service.wait()
实际启动服务的只有10,11,12行,通过Service对象的类方法create创建一个名server的service,然后用serve方法(实际调用launch对service进行处理),launch方法通过判断serve传进的worker参数来判断,传入的对象是process还是service,但是不管是service还是process,都是调用了launch_service这个接口,此处,同上述api所述,Launcher 对象的关键在于这个add方法,它将所有调用其进行launch的服务添加到Service()的service列表中,最终调用了添加的Service的start()方法。
@classmethod def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, service_name=None, coordination=False, cluster=None, **kwargs): if not host: host = CONF.host if not binary: binary = os.path.basename(inspect.stack()[-1][1]) if not topic: topic = binary if not manager: subtopic = topic.rpartition(‘cinder-‘)[2] manager = CONF.get(‘%s_manager‘ % subtopic, None) if report_interval is None: report_interval = CONF.report_interval if periodic_interval is None: periodic_interval = CONF.periodic_interval if periodic_fuzzy_delay is None: periodic_fuzzy_delay = CONF.periodic_fuzzy_delay service_obj = cls(host, binary, topic, manager, report_interval=report_interval, periodic_interval=periodic_interval, periodic_fuzzy_delay=periodic_fuzzy_delay, service_name=service_name, coordination=coordination, cluster=cluster, **kwargs) return service_obj
def serve(server, workers=None): global _launcher if _launcher: raise RuntimeError(_(‘serve() can only be called once‘)) _launcher = service.launch(CONF, server, workers=workers)
def launch(conf, service, workers=1, restart_method=‘reload‘): """Launch a service with a given number of workers. :param conf: an instance of ConfigOpts :param service: a service to launch, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: a number of processes in which a service will be running :param restart_method: Passed to the constructed launcher. If ‘reload‘, the launcher will call reload_config_files on SIGHUP. If ‘mutate‘, it will call mutate_config_files on SIGHUP. Other values produce a ValueError. :returns: instance of a launcher that was used to launch the service """ if workers is not None and workers <= 0: raise ValueError(_("Number of workers should be positive!")) if workers is None or workers == 1: launcher = ServiceLauncher(conf, restart_method=restart_method) else: launcher = ProcessLauncher(conf, restart_method=restart_method) launcher.launch_service(service, workers=workers) return launcher
至此,cinder-scheduler启动完成
3、cinder-volume
cinder-volume的入口文件为/usr/bin/cinder-volume,由此可知真正的入口函数为cinder/cmd/volume.py中的main函数
#!/usr/bin/python2 # PBR Generated from u‘console_scripts‘ import sys from cinder.cmd.volume import main if __name__ == "__main__": sys.exit(main())
def _launch_services_win32(): if CONF.backend_name and CONF.backend_name not in CONF.enabled_backends: msg = _(‘The explicitly passed backend name "%(backend_name)s" is not ‘ ‘among the enabled backends: %(enabled_backends)s.‘) raise exception.InvalidInput( reason=msg % dict(backend_name=CONF.backend_name, enabled_backends=CONF.enabled_backends)) # We‘ll avoid spawning a subprocess if a single backend is requested. single_backend_name = (CONF.enabled_backends[0] if len(CONF.enabled_backends) == 1 else CONF.backend_name) if single_backend_name: launcher = service.get_launcher() _launch_service(launcher, single_backend_name) elif CONF.enabled_backends: # We‘re using the ‘backend_name‘ argument, requesting a certain backend # and constructing the service object within the child process. launcher = service.WindowsProcessLauncher() py_script_re = re.compile(r‘.*\.py\w?$‘) for backend in filter(None, CONF.enabled_backends): cmd = sys.argv + [‘--backend_name=%s‘ % backend] # Recent setuptools versions will trim ‘-script.py‘ and ‘.exe‘ # extensions from sys.argv[0]. if py_script_re.match(sys.argv[0]): cmd = [sys.executable] + cmd launcher.add_process(cmd) _notify_service_started() _ensure_service_started() launcher.wait() def _launch_services_posix(): launcher = service.get_launcher() for backend in filter(None, CONF.enabled_backends): _launch_service(launcher, backend) _ensure_service_started() launcher.wait() def main(): objects.register_all() gmr_opts.set_defaults(CONF) CONF(sys.argv[1:], project=‘cinder‘, version=version.version_string()) logging.setup(CONF, "cinder") python_logging.captureWarnings(True) priv_context.init(root_helper=shlex.split(utils.get_root_helper())) utils.monkey_patch() gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) global LOG LOG = logging.getLogger(__name__) if not CONF.enabled_backends: LOG.error(‘Configuration for cinder-volume does not specify ‘ ‘"enabled_backends". Using DEFAULT section to configure ‘ ‘drivers is not supported since Ocata.‘) sys.exit(1) if os.name == ‘nt‘: # We cannot use oslo.service to spawn multiple services on Windows. # It relies on forking, which is not available on Windows. # Furthermore, service objects are unmarshallable objects that are # passed to subprocesses. _launch_services_win32() else: _launch_services_posix()
根据平台是windows还是linux,进行不同的调用,因为是在linux上部署,所有调用的函数为_launch_services_posix(),其中调用了_launch_service(launcher, backend)创建Service对象,同上述cinder-schedule的启动流程,后面不再累述了~