Ryu 源码解读

Ryu 源码解读

注册命令行解析

CONF.register_cli_opts([
    cfg.ListOpt('app-lists', default=[],
                help='application module name to run'),
    cfg.MultiStrOpt('app', positional=True, default=[],
                    help='application module name to run'),
    cfg.StrOpt('pid-file', default=None, help='pid file name'),
    cfg.BoolOpt('enable-debugger', default=False,
                help='don\'t overwrite Python standard threading library'
                '(use only for debugging)')
])

CONF=ConfigOpts(),ConfigOpts类的实例对象

可以在命令行或者配置文件中设置的配置选项
配置选项管理器:API用于注册选项模式,分组选项,解析选项值和解锁选项值
对'config_file'和'config_dir'的支持

解析配置文件

CONF(args=args, prog=prog,
 project='ryu', version='ryu-manager %s' % version,
 default\_config\_files=['/usr/local/etc/ryu/ryu.conf’])

args:命令行参数(默认为sys.argv [1:])
param prog:程序的名称(默认为sys.argv [0] basename,不带扩展名.py) #Ryu中此处为None
project:顶层项目名称,用于定位配置文件
version:程序版本
default_config_files:默认使用的配置文件

没有指定应用程序,默认使用ofp_handler

app_lists = ['ryu.controller.ofp_handler']

创建AppManager实例对象

app_mgr = AppManager.get_instance() #初始化:self.applications_cls={},self.applications={},self.context_cls={},self.context={}

通过app_mgr.load_apps加载应用

app_mgr.load_apps(app_lists)

load_apps | app_manager.py

cls = self.load_app(app_cls_name) #下一个标题有这个函数
self.applications_cls[app_cls_name] = cls #存放应用程序的实例对象
services = []
for key, context_cls in cls.context_iteritems():  #cls.context_iteritems()返回应用程序的_CONTEXTS键值对
    v = self.contexts_cls.setdefault(key, context_cls)  #存放依赖服务的实例对象
    assert v == context_cls
    context_modules.append(context_cls.__module__)  #存放服务的模块名称

    if issubclass(context_cls, RyuApp):
          services.extend(get_dependent_services(context_cls)) ##将依赖的服务模块的依赖也添加到services列表中,其中还包括要监听的事件的模块
# we can't load an app that will be initiataed for
# contexts.
for i in get_dependent_services(cls):
    if i not in context_modules:
        services.append(i)
if services:
    app_lists.extend([s for s in set(services)
                      if s not in app_lists])         #服务添加到app_lists中

self.applications_cls存放的是还未初始化的类
cls.context_iteritems() :来自RyuApp的类方法,返回application的_CONTEXTS的迭代器

#ryu.app.ws_topology.py例子
_CONTEXTS = {
      'wsgi': WSGIApplication,
      'switches': switches.Switches,
  }

context_modules.append(context_cls.__module__) ,将已经存放到contexts_cls的模块加入到context_modules,防止重复添加
这个函数主要是为了获得app_lists中的_CONTEXT的类以及_CONTEXT类所依赖的类,需要监听的事件的类
将服务也添加到app_lists中,用于后续的create_contexts

get_dependent_services

def get_dependent_services(cls):
    services = []
    for _k, m in inspect.getmembers(cls, _is_method): #返回实例对象的方法
        if _has_caller(m):  #判断方法是否有caller属性,即装饰器的函数
            for ev_cls, c in m.callers.items(): #{事件:_Caller对象},_Caller属性:1.生成事件的阶段 2.生成事件的模块
                service = getattr(sys.modules[ev_cls.__module__],
                                  '_SERVICE_NAME', None)  #事件所在的模块实例
                if service:
                    # 避免注册自己事件的cls(比如ofp_handler)
                    if cls.__module__ != service:
                        services.append(service)   #事件的模块放到services列表中

    m = sys.modules[cls.__module__]
    services.extend(getattr(m, '_REQUIRED_APP', []))
    services = list(set(services))
    return services

load_app | app_manager.py

mod = utils.import_module(name)   #动态导入模块
#返回加载应用的类:1)是否是类、2)是否是RyuApp子类、3)类名是否等于模块名
clses = inspect.getmembers(mod,
                           lambda cls: (inspect.isclass(cls) and
                                        issubclass(cls, RyuApp) and
                                        mod.__name__ ==
                                        cls.__module__))
if clses:
    return clses[0][1]  #返回类的对象,clses[0][0]是类名
return None

app_mgr.create_contexts

contexts = app_mgr.create_contexts()

create_contexts() | app_manager.py

for key, cls in self.contexts_cls.items():
    if issubclass(cls, RyuApp):
        # hack for dpset
        context = self._instantiate(None, cls)
    else:
        context = cls() #参考ws_topology.py中的‘wsgi’的WSGIApplication并不是其RyuAPP
    LOG.info('creating context %s', key)
    assert key not in self.contexts   
    self.contexts[key] = context   #创建好的应用程序存放到contexts中
return self.contexts

由上面可知contexts_cls{}存放的是未初始化的类,初始化结束的存放到contexts{}中

_instantiate(None,cls) | app_manager.py

def _instantiate(self, app_name, cls, *args, **kwargs):
    # for now, only single instance of a given module
    # Do we need to support multiple instances?
    # Yes, maybe for slicing.
    LOG.info('instantiating app %s of %s', app_name, cls.__name__)    #如果跑带有_CONTEXT的应用程序,就能看到None

    if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
        ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)  #查看应用程序的版本是否支持,不支持的话抛出异常

    if app_name is not None:
        assert app_name not in self.applications
    app = cls(*args, **kwargs)    #创建类实例对象
    register_app(app)     #注册应用程序实例 ,存到SERVICE_BRICKS中,SERVICE_BRICKS[app.name] = app。
    assert app.name not in self.applications
    self.applications[app.name] = app      #将应用程序保存到applications的字典中
    return app

初次调用的时候传入的app_name = None,通过--verbose启动应用程序时候可以看到:

$ryu-manager ryu.app.ws_topology --verbose 
loading app ryu.app.ws_topology
loading app ryu.controller.ofp_handler
instantiating app None of Switches          #初次调用的时候传入的app_name = None
creating context switches
creating context wsgi

register_app(app)主要是注册应用实例,存放到SERVICE_BRICKS{“应用名”:应用},同时注册应用程序里装饰器的handler。
self.applications存放的是已经初始化好的应用,注意此处存的是context中继承RyuApp的应用。

register_app(app) | handler.py

SERVICE_BRICKS[app.name] = app
之后调用register_instance

def register_instance(i):
    for _k, m in inspect.getmembers(i, inspect.ismethod):
        # LOG.debug('instance %s k %s m %s', i, _k, m)
        if _has_caller(m):                  #查看有无装饰器
            for ev_cls, c in m.callers.items():
                i.register_handler(ev_cls, m)   #将事件的处理函数注册,在app_manager.py中有对应的函数

事件处理器注册到event_handlers {“事件”:[方法1,方法2,...]}

services.extend(app_mgr.instantiate_apps(**contexts))

def instantiate_apps(self, *args, **kwargs):
        for app_name, cls in self.applications_cls.items():
            self._instantiate(app_name, cls, *args, **kwargs)    #刚才调用这个函数是初始化_CONTEXT里的,现在是对应用程序的初始化

        self._update_bricks() #注册事件监听器
        self.report_bricks()  #启动应用时添加”--verbose“可以查看到事件监听的信息,如:PROVIDES EventOFPPortStatus TO {'switches': set(['main']), 'monitor': set(['main'])}


        threads = []
        for app in self.applications.values():
            t = app.start() #调用应用程序的启动方法
            if t is not None:
                app.set_main_thread(t)  #设置当前应用程序的线程,如果不设置,就没办法在stop()函数里删除这个线程
                threads.append(t) #将这个线程添加到线程列表里
        return threads

self._instantiate(app_name, cls, args, *kwargs)
此处初始化的是启动的应用程序和非RyuApp的应用,刚才已经将应用程序的依赖已经初始化完成了。
app.start()调用应用程序的启动函数,如默认启动的应用程序OFPHandler的start()会将OpenFlowController启动

_update_bricks | app_manager.py

def _update_bricks(self):
    for i in SERVICE_BRICKS.values():
        for _k, m in inspect.getmembers(i, inspect.ismethod):
            if not hasattr(m, 'callers'):     #对不是装饰器的方法不处理
                continue
            for ev_cls, c in m.callers.items():
                if not c.ev_source:   #ev_source为定义的模块,可以到handle.py中查看
                    continue

                brick = _lookup_service_brick_by_mod_name(c.ev_source)   #取出定义事件的模块取出对应的SERVICE实例
                if brick:
                    brick.register_observer(ev_cls, i.name,
                                            c.dispatchers)      #c.dispatchers代表事件产生的阶段,如MAIN_DISPATCHER

                # allow RyuApp and Event class are in different module
                for brick in SERVICE_BRICKS.values():
                    if ev_cls in brick._EVENTS:
                        brick.register_observer(ev_cls, i.name,
                                                c.dispatchers)

注册监听器
基类里说明了_EVENTS:

"""
    A list of event classes which this RyuApp subclass would generate.
    This should be specified if and only if event classes are defined in
    a different python module from the RyuApp subclass is.
"""

_EVENTS是子类生成的事件列表,但是当且仅当事件类是在RyuApp子类的不同python模块中定义时才应该指定。

不知道理解有没有问题,查看Ryu的文档,自定义类看看

register_observer | app_manager.py

注册事件监听器

def register_observer(self, ev_cls, name, states=None):
        states = states or set()
        ev_cls_observers = self.observers.setdefault(ev_cls, {})
        ev_cls_observers.setdefault(name, set()).update(states) #将对应事件的状态更新到键值对中

report_bricks() | app_manager.py

@staticmethod
  def report_bricks():
      for brick, i in SERVICE_BRICKS.items():
          AppManager._report_brick(brick, i)

报告监听的事件信息,可以启动ws_topology.py,通过--verbose查看

BRICK switches
  PROVIDES EventLinkAdd TO {'WebSocketTopology': set([])}
  PROVIDES EventSwitchEnter TO {'WebSocketTopology': set([])}
  PROVIDES EventLinkDelete TO {'WebSocketTopology': set([])}
  PROVIDES EventSwitchLeave TO {'WebSocketTopology': set([])}
  PROVIDES EventHostAdd TO {'WebSocketTopology': set([])}
  CONSUMES EventSwitchRequest
  CONSUMES EventOFPStateChange
  CONSUMES EventHostRequest
  CONSUMES EventOFPPacketIn
  CONSUMES EventLinkRequest
  CONSUMES EventOFPPortStatus
BRICK WebSocketTopology
  CONSUMES EventLinkAdd
  CONSUMES EventSwitchEnter
  CONSUMES EventLinkDelete
  CONSUMES EventSwitchLeave
  CONSUMES EventHostAdd
BRICK ofp_event
  PROVIDES EventOFPPacketIn TO {'switches': set(['main'])}
  PROVIDES EventOFPPortStatus TO {'switches': set(['main'])}
  PROVIDES EventOFPStateChange TO {'switches': set(['main', 'dead'])}
  CONSUMES EventOFPSwitchFeatures
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPHello
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPPortStatus
  CONSUMES EventOFPEchoReply

app.start | app_manager.py

启动初始化后调用
self.threads.append(hub.spawn(self._event_loop))
当有新的事件来了,获取事件对应的handlers,然后再依次把事件放到handler中处理
不同的应用程序还有进行其他处理,如OFPHandler的start()会将OpenFlowController启动

webapp=wsgi.start_service(app_mgr)| manager.py

def start_service(app_mgr):
    for instance in app_mgr.contexts.values():
        if instance.__class__ == WSGIApplication:  #如果实例的类是WSGIApplication
            return WSGIServer(instance)

    return None

仅有依赖的服务模块中含有WSGIApplication的时候才启动WSGI服务的实例,提供web应用。

hub.joinall(services) | manager.py

将所有的应用作为任务,作为coroutine的task去执行,join使得程序必须等待所有的task都执行完成才可以退出程序

app_mgr.close() | manager.py

关闭程序,释放资源

相关推荐