Android每周一轮子:EventBus
前言
开篇要说声sorry,限于各种原因,Okhttp的下篇和OKIO要delay 了,本周先来一个简单一些的。
EventBus 是一个基于观察者模式的事件发布/订阅框架,开发者可以通过极少的代码去实现多个模块之间的通信,而不需要以层层传递接口的形式去单独构建通信桥梁。从而降低因多重回调导致的模块间强耦合,同时避免产生大量内部类。其可以很好的应用于Activity之间,Fragment之间,后台线程之间的通信,避免使用intent或者handler所带来的复杂度。其缺点则是可能会造成接口的膨胀。特别是当程序要求大量形式各异的通知,而没有做出良好的抽象时,代码中会包含大量的接口,接口数量的增长又会带来命名、注释等等一大堆问题。本质上说观察者要求从零开始实现事件的产生、分发与处理过程,这就要求参与者必须对整个通知过程有着良好的理解。当程序代码适量时,这是一个合理的要求,然而当程序太大时,这将成为一种负担。
EventBus基于观察者模式的Android事件分发总线。
EventBus基本使用
1.定义消息事件MessageEvent,也就是创建事件类型
public class MessageEvent { public final String message; public MessageEvent(String message) { this.message = message; } }
2.注册观察者并订阅事件
选择要订阅该事件的订阅者(subscriber),Activity即在onCreate()加入,调用EventBus的register方法,注册。
EventBus.getDefault().register(this);
在不需要接收事件发生时可以
EventBus.getDefault().unregister(this);
在订阅者里需要用注解关键字 @Subscribe
来告诉EventBus使用什么方法处理event。
@Subscribe public void onMessageEvent(MessageEvent event) { Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show(); }
注意方法只能被public修饰,在EventBus3.0之后该方法名字就可以自由的取了,之前要求只能是onEvent().
3.发送事件
通过EventBus的post方法,发出我们要传递的事件。
EventBus.getDefault().post(new MessageEvent("HelloEveryone"));
这样选择的Activity就会接收到该事件,并且触发onMessageEvent方法。
EventBus源码解析
了解了对于EventBus的基础使用,解析来,我们针对其基础使用的调用流程,来了解EventBus的实现流程和源码细节。
注册观察者
EventBus.getDefault().register(this);
- getDefault()
EventBus.getDefault()是一个单例,实现如下:
public static EventBus getDefault() { if (defaultInstance == null) { synchronized (EventBus.class) { if (defaultInstance == null) { defaultInstance = new EventBus(); } } } return defaultInstance; }
保证了App单个进程中只会有一个EventBus实例。
- register(Object subscriber)
public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
register方法中,首先获得订阅实例的类,然后调用SubscriberMethodFinder实例的findSubscriberMethods
方法来找到该类中订阅的相关方法,然后对这些方法调用订阅方法。注册的过程涉及到两个问题,一个是如何查找注册方法?另一个是如何将这些方法进行存储,方便后面的调用?
SubscriberMethodFinder是如何从实例中查找到相关的注册方法的呢?
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { //根据类信息丛缓存中查找订阅方法 List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } //查找注册方法 if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } //将得到的订阅方法加入到缓存中 if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
首先从缓存的方法中,通过Class作为Key进行查找,如何查找内容为空,则会调用findUsingReflection或者findUsingInfo来从相关类中查找,得到注册的方法列表之后,将其添加到缓存之中。
缓存的数据结构如下:
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
订阅方法
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { //获取订阅方法要监听的事件类型 Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); //根据事件类型查找相应的订阅者 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); //如果不存在该事件类型,则创建,如果已经包含该订阅者,抛出异常 if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } //获得该事件类型的订阅者列表,根据其优先级确定当前插入者的位置 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } //在该注册者中加入对应的监听事件类型 List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); //黏性事件处理 if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
subscribe方法的执行流程是先根据事件类型,判断该注册者是否已经进行过注册,如果未注册将其中的方法进行保存,以事件类型为键保存一份,然后以注册者实例为键保存一份。
发送事件
对于事件的发送,调用的是post函数
- post(Object event)
public void post(Object event) { //获取当前线程的Event队列,并将其添加到队列中 PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); //如果当前PostingThreadState不是在post 中 if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { //遍历事件队列,调用postSingleEvent方法 while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
post方法中,首先从当前的PostingThreadState中获取当前的事件队列,然后将要post的事件添加到其中,之后判断当前的线程是否处在post中,如果不在,那么则会遍历事件队列,调用postSingleEvent
将其中的事件抛出。
currentPostingThreadState是一个ThreadLocal类型的,里面存储了PostingThreadState。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } }
PostingThreadState包含了一个eventQueue和一些标志位。类具体结构如下。
final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
- postSingleEvent
postSingleEvent的具体实现如下。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
通过lookupAllEventTypes(eventClass)
得到当前eventClass的Class,以及父类和接口的Class类型,而后逐个调用postSingleEventForEventType方法。事件派发的核心方法在postSingleEventForEventType方法中。
- postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
从subscriptionsByEventType中拿到订阅了eventClass的订阅者列表 ,遍历,调用postToSubscription方法,逐个将事件抛出。
- postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { //根据订阅者方法的线程模型进行不同的处理 switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
根据threadMode去判断应该在哪个线程去执行该方法,而invokeSubscriber方法内通过反射调用函数。
MainThread
首先去判断当前如果是UI线程,则直接调用;否则, mainThreadPoster.enqueue(subscription, event)
BackgroundThread
如果当前非UI线程,则直接调用;如果是UI线程,则调用backgroundPoster.enqueue方法。
Async
调用asyncPoster.enqueue方法
接下来会针对这几种广播方式展开分析
- invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
通过反射的方式,直接调用订阅该事件方法。
- mainThreadPoster.enqueue
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
mainThreadPoster 通过mainThreadSupport.createPoster创建。
public Poster createPoster(EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10); }
返回HandlerPoster实例。
通过Subscription和Event实例构造出PendingPost,然后将其加入到PendingPostQueue之中,然后调用sendMessage,其handleMessage函数将会被回调。
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
消息处理
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }
当得到消息之后,开启循环,从队列中取PendingPost,调用invokeSubscriber方法执行。
void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }
这里调用了releasePendingPost
static void releasePendingPost(PendingPost pendingPost) { pendingPost.event = null; pendingPost.subscription = null; pendingPost.next = null; synchronized (pendingPostPool) { // Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); } } }
为了避免对象的重复创建,在PendingPost中维护了一个PendingPost列表,方便进行对象的复用。
List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
对于对象的创建,可以通过其obtainPendingPost方法来获得。
- asyncPoster.enqueue
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); }
将PendingPost添加到PendingPost队列中,线程池会从队列中取数据,然后执行。
@Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }
- backgroundPoster.enqueue
相比于asyncPoster,backgroundPoster可以保证添加进来的数据是顺序执行的,通过同步锁和信号量的方式来保证,只有一个线程是在活跃从事件队列中取事件,然后执行。
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } }
public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { } } finally { executorRunning = false; } }
函数扫描
在register方法中对于订阅方法的查找,调用的方法是SubscriberMethodFinder的findSubscriberMethods方法,对于其中方法的查找有两种方式,一个是findUsingInfo
,一个是findUsingReflection
。
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { //获取FindState实例 FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); //从当前类中查找,然后跳到其父类,继续查找相应方法 while (findState.clazz != null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
首先,会获得一个FindState实例,其用来保存查找过程中的一些中间变量和最后结果,首先找当前类中的注册方法,然后跳到其父类之中,其父类会自动过滤掉Java,Android中的相应类,然后继续查找。
查找的核心实现在方法findUsingReflectionInSingleClass中。
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // 获取该类中的所有方法,不包括继承的方法 methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } //遍历获取的方法,判断添加规则为是否为public函数,其参数是否只有一个,获取其注解,然后调用checkAdd, //在加入到订阅方法之前 for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { //多于一个参数 } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { //非public,abstract,非静态的 } } }
按照如下扫描规则,对类中的函数进行扫描
扫描规则:1.函数非静态,抽象函数 2.函数为public;3.函数仅单个参数;4.函数拥有@Subscribe
的注解;
在符合了以上规则之后,还不能够直接将其加入到函数的队列之中,还需要对方法进行校验。
boolean checkAdd(Method method, Class<?> eventType) { Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); } } //函数签名校验,来进行 private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // Only add if not already found in a sub class return true; } else { // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } }
为扫描到的函数做校验,在校验后,释放自己持有的资源。第一层校验在checkAdd函数中,如果当前尚未有函数监听过当前事件,就直接跳过第二层检查。第二层检查为完整的函数签名的检查,将函数名与监听事件类名拼接作为函数签名,如果当前subscriberClassByMethodKey
中不存在相同methodKey时,返回true,检查结束;若存在相同methodKey时,说明子类重写了父类的监听函数,此时应当保留子类的监听函数而忽略父类。由于扫描是由子类向父类的顺序,故此时应当保留methodClassOld而忽略methodClass。
上述的方式是通过在运行期通过注解处理的方式进行的,效率是比较慢的,在EventBus最新版中引入了在编译器通过注解处理器,在编译器生成方法索引的方式进行,以此来提升效率。
粘性事件处理
粘性事件的设计初衷是,在事件的发出早于观察者的注册,EventBus将粘性事件存储起来,在观察者注册后,将其发出。通过其内部的一个数据结构:
Map<Class<?>, Object> stickyEvents
保存每个Event类型的最近一次post出的event
public void postSticky(Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } // Should be posted after it is putted, in case the subscriber wants to remove immediately post(event); }
将粘性事件保存在stickyEvents,而后post出,此时如果存在已经注册的观察者,则情况同普通事件情况相同;如尚无注册的观察者,在postSingleEvent函数中将时间转化为一个NoSubscriberEvent事件发出,可由EventBus消耗并处理。待观察者注册时,从stickyEvents中将事件取出,重新分发给注册的观察者。
if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }
在对于粘性事件处理这段代码中,首先判断是否监听Event的子类,而后调用checkPostStickyEventToSubscription将黏性事件发出,在checkPostStickyEventToSubscription中,判空后按一半事件的post流程将事件传递给观察者。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
小结
轮子的每周一篇,已经到了第四周了,下周是对OkHttp的更细致的一个剖析,然后是对于OkIO的剖