openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

上一篇文章写了cinder服务的启动,下面讲一下openstack是如何通过openstack创建一个卷

通过查看cinder的api-paste.ini文件,并且现在是v3版本的API,可以得知目前API的router文件是cinder/api/v3/router.py文件

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

通过查看router.py文件,可以得知,对于volume的操作都会通过mapper重定向到cinder/api/v3/volume.py文件中进行处理

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

   看一下创建volume的源码

def create(self, req, body):
        ........
        ........
        new_volume = self.volume_api.create(context,
                                            size,
                                            volume.get(‘display_name‘),
                                            volume.get(‘display_description‘),
                                            **kwargs)

        retval = self._view_builder.detail(req, new_volume)

        return retval

此处调用了self.volume_api.create()去创建卷,self.volume_api 这个变量是VolumeController从V2的api继承过来来的,在初始话的时候被初始化为cinder.volume.api.API(),所以其create方法为cinder/volume/api.py中API类下的create方法

def create(self, context, size, name, description, snapshot=None,
               image_id=None, volume_type=None, metadata=None,
               availability_zone=None, source_volume=None,
               scheduler_hints=None,
               source_replica=None, consistencygroup=None,
               cgsnapshot=None, multiattach=False, source_cg=None,
               group=None, group_snapshot=None, source_group=None,
               backup=None):
         .........   
         .........
        create_what = {
            ‘context‘: context,
            ‘raw_size‘: size,
            ‘name‘: name,
            ‘description‘: description,
            ‘snapshot‘: snapshot,
            ‘image_id‘: image_id,
            ‘raw_volume_type‘: volume_type,
            ‘metadata‘: metadata or {},
            ‘raw_availability_zone‘: availability_zone,
            ‘source_volume‘: source_volume,
            ‘scheduler_hints‘: scheduler_hints,
            ‘key_manager‘: self.key_manager,
            ‘optional_args‘: {‘is_quota_committed‘: False},
            ‘consistencygroup‘: consistencygroup,
            ‘cgsnapshot‘: cgsnapshot,
            ‘raw_multiattach‘: multiattach,
            ‘group‘: group,
            ‘group_snapshot‘: group_snapshot,
            ‘source_group‘: source_group,
            ‘backup‘: backup,
        }
        try:
            sched_rpcapi = (self.scheduler_rpcapi if (
                            not cgsnapshot and not source_cg and
                            not group_snapshot and not source_group)
                            else None)
            volume_rpcapi = (self.volume_rpcapi if (
                             not cgsnapshot and not source_cg and
                             not group_snapshot and not source_group)
                             else None)
            flow_engine = create_volume.get_flow(self.db,
                                                 self.image_service,
                                                 availability_zones,
                                                 create_what,
                                                 sched_rpcapi,
                                                 volume_rpcapi)
        except Exception:
            msg = _(‘Failed to create api volume flow.‘)
            LOG.exception(msg)
            raise exception.CinderException(msg)

     此处调用了create_flow中的get_flow方法,进行传参和并创建,get_flow采用了taskflow,使用了taskflow中的线性流程,依次添加了ExtractVolumeRequestTesk(), QuotaReserveTask(), EntryCreateTask(), QuotaCommitTask() 以及VolumeCastTask()五个步骤

def get_flow(db_api, image_service_api, availability_zones, create_what,
             scheduler_rpcapi=None, volume_rpcapi=None):
    """Constructs and returns the api entrypoint flow.

    This flow will do the following:

    1. Inject keys & values for dependent tasks.
    2. Extracts and validates the input keys & values.
    3. Reserves the quota (reverts quota on any failures).
    4. Creates the database entry.
    5. Commits the quota.
    6. Casts to volume manager or scheduler for further processing.
    """

    flow_name = ACTION.replace(":", "_") + "_api"
    api_flow = linear_flow.Flow(flow_name)

    api_flow.add(ExtractVolumeRequestTask(
        image_service_api,
        availability_zones,
        rebind={‘size‘: ‘raw_size‘,
                ‘availability_zone‘: ‘raw_availability_zone‘,
                ‘volume_type‘: ‘raw_volume_type‘,
                ‘multiattach‘: ‘raw_multiattach‘}))
    api_flow.add(QuotaReserveTask(),
                 EntryCreateTask(),
                 QuotaCommitTask())

    if scheduler_rpcapi and volume_rpcapi:
        # This will cast it out to either the scheduler or volume manager via
        # the rpc apis provided.
        api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))

    # Now load (but do not run) the flow using the provided initial data.
    return taskflow.engines.load(api_flow, store=create_what)

taskflow会调用添加的每个步骤类的execute方法,taskflow是openstack中的一个重要组建,用于构建逻辑需要精准步骤的业务,涉及的东西比较多,暂时不在这里记录

       ExcuactVolumeRequestTask类主要对传过来的参数进行校验,提取各类参数,并根据参数进行zone、镜像等选取的操作,并为QuotaReserveTask 类传递参数

       QuotaReserveTask类进行配额检查以及占用

EntryCreateTask类主要是是调用cinder.objects.volume.Volume.create()方法在database中创建记录

       QuotaCommitTask类在数据库中进行配额的确认

       VolumeCastTask类通过rpc对任务进行投递投递的对象为schduler_rpcapi

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

 scheduler_rpcapi在调用get_flow时已经指定

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

    此时cinder-scheduler接收到cinder-api传过来的请求,发送请求的代码部分为 cinder/scheduler/rpcapi.py

def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
                      request_spec=None, filter_properties=None,
                      backup_id=None):
        volume.create_worker()
        cctxt = self._get_cctxt()
        msg_args = {‘snapshot_id‘: snapshot_id, ‘image_id‘: image_id,
                    ‘request_spec‘: request_spec,
                    ‘filter_properties‘: filter_properties,
                    ‘volume‘: volume, ‘backup_id‘: backup_id}
        if not self.client.can_send_version(‘3.10‘):
            msg_args.pop(‘backup_id‘)
        return cctxt.cast(ctxt, ‘create_volume‘, **msg_args)

此处同样,cinder-scheduler接收为cinder-cherduler的cinder/scheduler/manager.SchedulerManager

@objects.Volume.set_workers
    @append_operation_type()
    def create_volume(self, context, volume, snapshot_id=None, image_id=None,
                      request_spec=None, filter_properties=None,
                      backup_id=None):
        self._wait_for_scheduler()

        try:
            flow_engine = create_volume.get_flow(context,
                                                 self.driver,
                                                 request_spec,
                                                 filter_properties,
                                                 volume,
                                                 snapshot_id,
                                                 image_id,
                                                 backup_id)
        except Exception:
            msg = _("Failed to create scheduler manager volume flow")
            LOG.exception(msg)
            raise exception.CinderException(msg)

        with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
            flow_engine.run()

此处cinder-scheduler同样使用了taskflow模型对磁盘进行创建,看一下这个get_flow中包含的几个task类

def get_flow(context, driver_api, request_spec=None,
             filter_properties=None,
             volume=None, snapshot_id=None, image_id=None, backup_id=None):

    create_what = {
        ‘context‘: context,
        ‘raw_request_spec‘: request_spec,
        ‘filter_properties‘: filter_properties,
        ‘volume‘: volume,
        ‘snapshot_id‘: snapshot_id,
        ‘image_id‘: image_id,
        ‘backup_id‘: backup_id,
    }

    flow_name = ACTION.replace(":", "_") + "_scheduler"
    scheduler_flow = linear_flow.Flow(flow_name)

    # This will extract and clean the spec from the starting values.
    scheduler_flow.add(ExtractSchedulerSpecTask(
        rebind={‘request_spec‘: ‘raw_request_spec‘}))

    # This will activate the desired scheduler driver (and handle any
    # driver related failures appropriately).
    scheduler_flow.add(ScheduleCreateVolumeTask(driver_api))

    # Now load (but do not run) the flow using the provided initial data.
    return taskflow.engines.load(scheduler_flow, store=create_what)

ExtractSchedulerSpecTask 同样为对请求参数进行提取加工以供后续调用

      ScheduleCreateVolumeTask中execute中有两个操作,1调用drvier_api进行volume的创建,2.如果创建过程中出现失败,则通过message将消息发送给scheduler

def execute(self, context, request_spec, filter_properties, volume):
        try:
            self.driver_api.schedule_create_volume(context, request_spec,
                                                   filter_properties)
        except Exception as e:
            self.message_api.create(
                context,
                message_field.Action.SCHEDULE_ALLOCATE_VOLUME,
                resource_uuid=request_spec[‘volume_id‘],
                exception=e)
            with excutils.save_and_reraise_exception(
                    reraise=not isinstance(e, exception.NoValidBackend)):
                try:
                    self._handle_failure(context, request_spec, e)
                finally:
                    common.error_out(volume, reason=e)

此时driver_api为SchedulerManager初始化时的scheduler_driver

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

   可知driver_api为cinder.cheduler.filter_scheduler.FilterScheduler

def schedule_create_volume(self, context, request_spec, filter_properties):
        backend = self._schedule(context, request_spec, filter_properties)

        if not backend:
            raise exception.NoValidBackend(reason=_("No weighed backends "
                                                    "available"))

        backend = backend.obj
        volume_id = request_spec[‘volume_id‘]

        updated_volume = driver.volume_update_db(
            context, volume_id,
            backend.host,
            backend.cluster_name,
            availability_zone=backend.service[‘availability_zone‘])
        self._post_select_populate_filter_properties(filter_properties,
                                                     backend)

        # context is not serializable
        filter_properties.pop(‘context‘, None)

        self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
                                         filter_properties,
                                         allow_reschedule=True)

self._schedule 通过传入的参数对后端的进行选择(多个后端的情况下)

最后调用self.volume_rpcapi.create_volume进行volume的创建,volume_api为volume_rpcapi.VolumeAPI()

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

def create_volume(self, ctxt, volume, request_spec, filter_properties,
                      allow_reschedule=True):
        cctxt = self._get_cctxt(volume.service_topic_queue)
        cctxt.cast(ctxt, ‘create_volume‘,
                   request_spec=request_spec,
                   filter_properties=filter_properties,
                   allow_reschedule=allow_reschedule,
                   volume=volume)

此时cinder-schedule通过rpc对cinder-volume发送创建volume的消息,接收消息的是cinder-volume的VolumeManager

@objects.Volume.set_workers
    def create_volume(self, context, volume, request_spec=None,
                      filter_properties=None, allow_reschedule=True):
        """Creates the volume."""
        utils.log_unsupported_driver_warning(self.driver)

        self._set_resource_host(volume)

        self._update_allocated_capacity(volume)
        # We lose the host value if we reschedule, so keep it here
        original_host = volume.host

        context_elevated = context.elevated()
        if filter_properties is None:
            filter_properties = {}

        if request_spec is None:
            request_spec = objects.RequestSpec()

        try:
            # NOTE(flaper87): Driver initialization is
            # verified by the task itself.
            flow_engine = create_volume.get_flow(
                context_elevated,
                self,
                self.db,
                self.driver,
                self.scheduler_rpcapi,
                self.host,
                volume,
                allow_reschedule,
                context,
                request_spec,
                filter_properties,
                image_volume_cache=self.image_volume_cache,
            )
        except Exception:
            msg = _("Create manager volume flow failed.")
            LOG.exception(msg, resource={‘type‘: ‘volume‘, ‘id‘: volume.id})
            raise exception.CinderException(msg)

        snapshot_id = request_spec.get(‘snapshot_id‘)
        source_volid = request_spec.get(‘source_volid‘)

        if snapshot_id is not None:
            # Make sure the snapshot is not deleted until we are done with it.
            locked_action = "%s-%s" % (snapshot_id, ‘delete_snapshot‘)
        elif source_volid is not None:
            # Make sure the volume is not deleted until we are done with it.
            locked_action = "%s-%s" % (source_volid, ‘delete_volume‘)
        else:
            locked_action = None

        def _run_flow():
            # This code executes create volume flow. If something goes wrong,
            # flow reverts all job that was done and reraises an exception.
            # Otherwise, all data that was generated by flow becomes available
            # in flow engine‘s storage.
            with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
                flow_engine.run()

        # NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
        # decide if allocated_capacity should be incremented.
        rescheduled = False

        try:
            if locked_action is None:
                _run_flow()
            else:
                with coordination.COORDINATOR.get_lock(locked_action):
                    _run_flow()
        finally:
            try:
                flow_engine.storage.fetch(‘refreshed‘)
            except tfe.NotFound:
                # If there‘s no vol_ref, then flow is reverted. Lets check out
                # if rescheduling occurred.
                try:
                    rescheduled = flow_engine.storage.get_revert_result(
                        create_volume.OnFailureRescheduleTask.make_name(
                            [create_volume.ACTION]))
                except tfe.NotFound:
                    pass

            if rescheduled:
                # NOTE(geguileo): Volume was rescheduled so we need to update
                # volume stats because the volume wasn‘t created here.
                # Volume.host is None now, so we pass the original host value.
                self._update_allocated_capacity(volume, decrement=True,
                                                host=original_host)

        # Shared targets is only relevant for iSCSI connections.
        # We default to True to be on the safe side.
        volume.shared_targets = (
            self.driver.capabilities.get(‘storage_protocol‘) == ‘iSCSI‘ and
            self.driver.capabilities.get(‘shared_targets‘, True))
        # TODO(geguileo): service_uuid won‘t be enough on Active/Active
        # deployments. There can be 2 services handling volumes from the same
        # backend.
        volume.service_uuid = self.service_uuid
        volume.save()

        LOG.info("Created volume successfully.", resource=volume)
        return volume.id

上述代码中,同样使用了taskflow, 

ExtractVolumeRefTask为提取数据库中volume的具体信息

OnFailureRescheduleTask中execute并无操作,但是revert中有操作,是为了以后的步骤出现错误进行回滚进行部分操作。

ExtractVolumeSpecTask 提取spec信息

NotifyVolumeActionTask 广播volume开始创建的消息

def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume,
             allow_reschedule, reschedule_context, request_spec,
             filter_properties, image_volume_cache=None):

    flow_name = ACTION.replace(":", "_") + "_manager"
    volume_flow = linear_flow.Flow(flow_name)

    # This injects the initial starting flow values into the workflow so that
    # the dependency order of the tasks provides/requires can be correctly
    # determined.
    create_what = {
        ‘context‘: context,
        ‘filter_properties‘: filter_properties,
        ‘request_spec‘: request_spec,
        ‘volume‘: volume,
    }

    volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False))

    retry = filter_properties.get(‘retry‘, None)

    # Always add OnFailureRescheduleTask and we handle the change of volume‘s
    # status when reverting the flow. Meanwhile, no need to revert process of
    # ExtractVolumeRefTask.
    do_reschedule = allow_reschedule and request_spec and retry
    volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, driver,
                                            scheduler_rpcapi, do_reschedule))

    LOG.debug("Volume reschedule parameters: %(allow)s "
              "retry: %(retry)s", {‘allow‘: allow_reschedule, ‘retry‘: retry})

    volume_flow.add(ExtractVolumeSpecTask(db),
                    NotifyVolumeActionTask(db, "create.start"),
                    CreateVolumeFromSpecTask(manager,
                                             db,
                                             driver,
                                             image_volume_cache),
                    CreateVolumeOnFinishTask(db, "create.end"))

    # Now load (but do not run) the flow using the provided initial data.
    return taskflow.engines.load(volume_flow, store=create_what)

CreateVolumeFromSpecTask此处通过传入的create_type不同,调用不同的接口进行卷的创建,以裸磁盘为例(create_type为raw)

CreateVolumeOnFinishTask广播创建磁盘完成

if create_type == ‘raw‘:
            model_update = self._create_raw_volume(volume, **volume_spec)
        elif create_type == ‘snap‘:
            model_update = self._create_from_snapshot(context, volume,
                                                      **volume_spec)
        elif create_type == ‘source_vol‘:
            model_update = self._create_from_source_volume(
                context, volume, **volume_spec)
        elif create_type == ‘image‘:
            model_update = self._create_from_image(context,
                                                   volume,
                                                   **volume_spec)
        elif create_type == ‘backup‘:
            model_update, need_update_volume = self._create_from_backup(
                context, volume, **volume_spec)
            volume_spec.update({‘need_update_volume‘: need_update_volume})
        else:
            raise exception.VolumeTypeNotFound(volume_type_id=create_type)
def _create_raw_volume(self, volume, **kwargs):
        try:
            ret = self.driver.create_volume(volume)
        finally:
            self._cleanup_cg_in_volume(volume)
        return ret

此处self.driver为VolumeManager初始化时进行初始化的,可以看出driver是从配置文件中读取的

self.configuration = config.Configuration(volume_backend_opts,
                                                  config_group=service_name)
        self._set_tpool_size(
            self.configuration.backend_native_threads_pool_size)
        self.stats = {}
        self.service_uuid = None

        if not volume_driver:
            # Get from configuration, which will get the default
            # if its not using the multi backend
            volume_driver = self.configuration.volume_driver
        if volume_driver in MAPPING:
            LOG.warning("Driver path %s is deprecated, update your "
                        "configuration to the new path.", volume_driver)
            volume_driver = MAPPING[volume_driver]

配置文件中有写

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

   

def create_volume(self, volume):
        """Creates a logical volume."""
        mirror_count = 0
        if self.configuration.lvm_mirrors:
            mirror_count = self.configuration.lvm_mirrors

        self._create_volume(volume[‘name‘],
                            self._sizestr(volume[‘size‘]),
                            self.configuration.lvm_type,
                            mirror_count)
def _create_volume(self, name, size, lvm_type, mirror_count, vg=None):
        vg_ref = self.vg
        if vg is not None:
            vg_ref = vg

        vg_ref.create_volume(name, size, lvm_type, mirror_count)

此处self.vg是什么?全局查找一下,具体初始化时间,可以查看上一篇,cinder服务启动中的cinder-volume启动部分

openstack Rocky系列之Cinder:(二)Cinder 创建一个卷

   此时创建卷调用的底层代码就可以得知,调用的是lvcreate对卷进行创建。

def create_volume(self, name, size_str, lv_type=‘default‘, mirror_count=0):
        """Creates a logical volume on the object‘s VG.

        :param name: Name to use when creating Logical Volume
        :param size_str: Size to use when creating Logical Volume
        :param lv_type: Type of Volume (default or thin)
        :param mirror_count: Use LVM mirroring with specified count

        """

        if lv_type == ‘thin‘:
            pool_path = ‘%s/%s‘ % (self.vg_name, self.vg_thin_pool)
            cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-T‘, ‘-V‘, size_str, ‘-n‘,
                                        name, pool_path]
        else:
            cmd = LVM.LVM_CMD_PREFIX + [‘lvcreate‘, ‘-n‘, name, self.vg_name,
                                        ‘-L‘, size_str]

        if mirror_count > 0:
            cmd.extend([‘--type=mirror‘, ‘-m‘, mirror_count, ‘--nosync‘,
                        ‘--mirrorlog‘, ‘mirrored‘])
            terras = int(size_str[:-1]) / 1024.0
            if terras >= 1.5:
                rsize = int(2 ** math.ceil(math.log(terras) / math.log(2)))
                # NOTE(vish): Next power of two for region size. See:
                #             http://red.ht/U2BPOD
                cmd.extend([‘-R‘, str(rsize)])

        try:
            self._execute(*cmd,
                          root_helper=self._root_helper,
                          run_as_root=True)
        except putils.ProcessExecutionError as err:
            LOG.exception(‘Error creating Volume‘)
            LOG.error(‘Cmd     :%s‘, err.cmd)
            LOG.error(‘StdOut  :%s‘, err.stdout)
            LOG.error(‘StdErr  :%s‘, err.stderr)
            LOG.error(‘Current state: %s‘,
                      self.get_all_volume_groups(self._root_helper))
            raise

后续就是一系列回调和通知啦

相关推荐