Ryu 源码解读
Ryu 源码解读
注册命令行解析
CONF.register_cli_opts([ cfg.ListOpt('app-lists', default=[], help='application module name to run'), cfg.MultiStrOpt('app', positional=True, default=[], help='application module name to run'), cfg.StrOpt('pid-file', default=None, help='pid file name'), cfg.BoolOpt('enable-debugger', default=False, help='don\'t overwrite Python standard threading library' '(use only for debugging)') ])
CONF=ConfigOpts(),ConfigOpts类的实例对象
可以在命令行或者配置文件中设置的配置选项配置选项管理器:API用于注册选项模式,分组选项,解析选项值和解锁选项值
对'config_file'和'config_dir'的支持
解析配置文件
CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default\_config\_files=['/usr/local/etc/ryu/ryu.conf’])
args:命令行参数(默认为sys.argv [1:])
param prog:程序的名称(默认为sys.argv [0] basename,不带扩展名.py) #Ryu中此处为None
project:顶层项目名称,用于定位配置文件
version:程序版本
default_config_files:默认使用的配置文件
没有指定应用程序,默认使用ofp_handler
app_lists = ['ryu.controller.ofp_handler']
创建AppManager实例对象
app_mgr = AppManager.get_instance()
#初始化:self.applications_cls={},self.applications={},self.context_cls={},self.context={}
通过app_mgr.load_apps加载应用
app_mgr.load_apps(app_lists)
load_apps | app_manager.py
cls = self.load_app(app_cls_name) #下一个标题有这个函数 self.applications_cls[app_cls_name] = cls #存放应用程序的实例对象 services = [] for key, context_cls in cls.context_iteritems(): #cls.context_iteritems()返回应用程序的_CONTEXTS键值对 v = self.contexts_cls.setdefault(key, context_cls) #存放依赖服务的实例对象 assert v == context_cls context_modules.append(context_cls.__module__) #存放服务的模块名称 if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) ##将依赖的服务模块的依赖也添加到services列表中,其中还包括要监听的事件的模块 # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists]) #服务添加到app_lists中
self.applications_cls存放的是还未初始化的类
cls.context_iteritems() :来自RyuApp的类方法,返回application的_CONTEXTS的迭代器
#ryu.app.ws_topology.py例子 _CONTEXTS = { 'wsgi': WSGIApplication, 'switches': switches.Switches, }
context_modules.append(context_cls.__module__) ,将已经存放到contexts_cls的模块加入到context_modules,防止重复添加
这个函数主要是为了获得app_lists中的_CONTEXT的类以及_CONTEXT类所依赖的类,需要监听的事件的类
将服务也添加到app_lists中,用于后续的create_contexts
get_dependent_services
def get_dependent_services(cls): services = [] for _k, m in inspect.getmembers(cls, _is_method): #返回实例对象的方法 if _has_caller(m): #判断方法是否有caller属性,即装饰器的函数 for ev_cls, c in m.callers.items(): #{事件:_Caller对象},_Caller属性:1.生成事件的阶段 2.生成事件的模块 service = getattr(sys.modules[ev_cls.__module__], '_SERVICE_NAME', None) #事件所在的模块实例 if service: # 避免注册自己事件的cls(比如ofp_handler) if cls.__module__ != service: services.append(service) #事件的模块放到services列表中 m = sys.modules[cls.__module__] services.extend(getattr(m, '_REQUIRED_APP', [])) services = list(set(services)) return services
load_app | app_manager.py
mod = utils.import_module(name) #动态导入模块 #返回加载应用的类:1)是否是类、2)是否是RyuApp子类、3)类名是否等于模块名 clses = inspect.getmembers(mod, lambda cls: (inspect.isclass(cls) and issubclass(cls, RyuApp) and mod.__name__ == cls.__module__)) if clses: return clses[0][1] #返回类的对象,clses[0][0]是类名 return None
app_mgr.create_contexts
contexts = app_mgr.create_contexts()
create_contexts() | app_manager.py
for key, cls in self.contexts_cls.items(): if issubclass(cls, RyuApp): # hack for dpset context = self._instantiate(None, cls) else: context = cls() #参考ws_topology.py中的‘wsgi’的WSGIApplication并不是其RyuAPP LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context #创建好的应用程序存放到contexts中 return self.contexts
由上面可知contexts_cls{}存放的是未初始化的类,初始化结束的存放到contexts{}中
_instantiate(None,cls) | app_manager.py
def _instantiate(self, app_name, cls, *args, **kwargs): # for now, only single instance of a given module # Do we need to support multiple instances? # Yes, maybe for slicing. LOG.info('instantiating app %s of %s', app_name, cls.__name__) #如果跑带有_CONTEXT的应用程序,就能看到None if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None: ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS) #查看应用程序的版本是否支持,不支持的话抛出异常 if app_name is not None: assert app_name not in self.applications app = cls(*args, **kwargs) #创建类实例对象 register_app(app) #注册应用程序实例 ,存到SERVICE_BRICKS中,SERVICE_BRICKS[app.name] = app。 assert app.name not in self.applications self.applications[app.name] = app #将应用程序保存到applications的字典中 return app
初次调用的时候传入的app_name = None,通过--verbose启动应用程序时候可以看到:
$ryu-manager ryu.app.ws_topology --verbose loading app ryu.app.ws_topology loading app ryu.controller.ofp_handler instantiating app None of Switches #初次调用的时候传入的app_name = None creating context switches creating context wsgi
register_app(app)主要是注册应用实例,存放到SERVICE_BRICKS{“应用名”:应用},同时注册应用程序里装饰器的handler。
self.applications存放的是已经初始化好的应用,注意此处存的是context中继承RyuApp的应用。
register_app(app) | handler.py
SERVICE_BRICKS[app.name] = app
之后调用register_instance
def register_instance(i): for _k, m in inspect.getmembers(i, inspect.ismethod): # LOG.debug('instance %s k %s m %s', i, _k, m) if _has_caller(m): #查看有无装饰器 for ev_cls, c in m.callers.items(): i.register_handler(ev_cls, m) #将事件的处理函数注册,在app_manager.py中有对应的函数
事件处理器注册到event_handlers {“事件”:[方法1,方法2,...]}
services.extend(app_mgr.instantiate_apps(**contexts))
def instantiate_apps(self, *args, **kwargs): for app_name, cls in self.applications_cls.items(): self._instantiate(app_name, cls, *args, **kwargs) #刚才调用这个函数是初始化_CONTEXT里的,现在是对应用程序的初始化 self._update_bricks() #注册事件监听器 self.report_bricks() #启动应用时添加”--verbose“可以查看到事件监听的信息,如:PROVIDES EventOFPPortStatus TO {'switches': set(['main']), 'monitor': set(['main'])} threads = [] for app in self.applications.values(): t = app.start() #调用应用程序的启动方法 if t is not None: app.set_main_thread(t) #设置当前应用程序的线程,如果不设置,就没办法在stop()函数里删除这个线程 threads.append(t) #将这个线程添加到线程列表里 return threads
self._instantiate(app_name, cls, args, *kwargs)
此处初始化的是启动的应用程序和非RyuApp的应用,刚才已经将应用程序的依赖已经初始化完成了。
app.start()调用应用程序的启动函数,如默认启动的应用程序OFPHandler的start()会将OpenFlowController启动
_update_bricks | app_manager.py
def _update_bricks(self): for i in SERVICE_BRICKS.values(): for _k, m in inspect.getmembers(i, inspect.ismethod): if not hasattr(m, 'callers'): #对不是装饰器的方法不处理 continue for ev_cls, c in m.callers.items(): if not c.ev_source: #ev_source为定义的模块,可以到handle.py中查看 continue brick = _lookup_service_brick_by_mod_name(c.ev_source) #取出定义事件的模块取出对应的SERVICE实例 if brick: brick.register_observer(ev_cls, i.name, c.dispatchers) #c.dispatchers代表事件产生的阶段,如MAIN_DISPATCHER # allow RyuApp and Event class are in different module for brick in SERVICE_BRICKS.values(): if ev_cls in brick._EVENTS: brick.register_observer(ev_cls, i.name, c.dispatchers)
注册监听器
基类里说明了_EVENTS:
""" A list of event classes which this RyuApp subclass would generate. This should be specified if and only if event classes are defined in a different python module from the RyuApp subclass is. """
_EVENTS是子类生成的事件列表,但是当且仅当事件类是在RyuApp子类的不同python模块中定义时才应该指定。
不知道理解有没有问题,查看Ryu的文档,自定义类看看register_observer | app_manager.py
注册事件监听器
def register_observer(self, ev_cls, name, states=None): states = states or set() ev_cls_observers = self.observers.setdefault(ev_cls, {}) ev_cls_observers.setdefault(name, set()).update(states) #将对应事件的状态更新到键值对中
report_bricks() | app_manager.py
@staticmethod def report_bricks(): for brick, i in SERVICE_BRICKS.items(): AppManager._report_brick(brick, i)
报告监听的事件信息,可以启动ws_topology.py,通过--verbose查看
BRICK switches PROVIDES EventLinkAdd TO {'WebSocketTopology': set([])} PROVIDES EventSwitchEnter TO {'WebSocketTopology': set([])} PROVIDES EventLinkDelete TO {'WebSocketTopology': set([])} PROVIDES EventSwitchLeave TO {'WebSocketTopology': set([])} PROVIDES EventHostAdd TO {'WebSocketTopology': set([])} CONSUMES EventSwitchRequest CONSUMES EventOFPStateChange CONSUMES EventHostRequest CONSUMES EventOFPPacketIn CONSUMES EventLinkRequest CONSUMES EventOFPPortStatus BRICK WebSocketTopology CONSUMES EventLinkAdd CONSUMES EventSwitchEnter CONSUMES EventLinkDelete CONSUMES EventSwitchLeave CONSUMES EventHostAdd BRICK ofp_event PROVIDES EventOFPPacketIn TO {'switches': set(['main'])} PROVIDES EventOFPPortStatus TO {'switches': set(['main'])} PROVIDES EventOFPStateChange TO {'switches': set(['main', 'dead'])} CONSUMES EventOFPSwitchFeatures CONSUMES EventOFPPortDescStatsReply CONSUMES EventOFPHello CONSUMES EventOFPErrorMsg CONSUMES EventOFPEchoRequest CONSUMES EventOFPPortStatus CONSUMES EventOFPEchoReply
app.start | app_manager.py
启动初始化后调用
self.threads.append(hub.spawn(self._event_loop))
当有新的事件来了,获取事件对应的handlers,然后再依次把事件放到handler中处理
不同的应用程序还有进行其他处理,如OFPHandler的start()会将OpenFlowController启动
webapp=wsgi.start_service(app_mgr)| manager.py
def start_service(app_mgr): for instance in app_mgr.contexts.values(): if instance.__class__ == WSGIApplication: #如果实例的类是WSGIApplication return WSGIServer(instance) return None
仅有依赖的服务模块中含有WSGIApplication的时候才启动WSGI服务的实例,提供web应用。
hub.joinall(services) | manager.py
将所有的应用作为任务,作为coroutine的task去执行,join使得程序必须等待所有的task都执行完成才可以退出程序
app_mgr.close() | manager.py
关闭程序,释放资源
相关推荐
class Singleton: def __new__: # 关键在于这,每一次实例化的时候,我们都只会返回这同一个instance对象 if not hasattr: cls.instance =