EventBus源码解析
前面一篇文章讲解了EventBus的使用,但是作为开发人员,不能只停留在仅仅会用的层面上,我们还需要弄清楚它的内部实现原理。所以本篇博文将分析EventBus的源码,看看究竟它是如何实现“发布/订阅”功能的。
事件注册
根据前一讲EventBus使用详解我们已经知道EventBus使用首先是需要注册的,注册事件的代码如下:
EventBus.getDefault().register(this);
EventBus对外提供了一个register方法来进行事件注册,该方法接收一个Object类型的参数,下面看下register方法的源码:
public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); // 判断该类是否是匿名内部类 boolean forceReflection = subscriberClass.isAnonymousClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass, forceReflection); for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } }
该方法首先获取获取传进来参数的Class对象,然后判断该类是否是匿名内部类。然后根据这两个参数通过subscriberMethodFinder.findSubscriberMethods方法获取所有的事件处理方法。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass, boolean forceReflection) { String key = subscriberClass.getName(); List<SubscriberMethod> subscriberMethods; synchronized (METHOD_CACHE) { subscriberMethods = METHOD_CACHE.get(key); } if (subscriberMethods != null) { //缓存命中,直接返回 return subscriberMethods; } if (INDEX != null && !forceReflection) { // 如果INDEX不为空,并且subscriberClass为非匿名内部类, // 则通过findSubscriberMethodsWithIndex方法查找事件处理函数 subscriberMethods = findSubscriberMethodsWithIndex(subscriberClass); if (subscriberMethods.isEmpty()) { //如果结果为空,则使用findSubscriberMethodsWithReflection方法再查找一次 subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass); } } else { //INDEX为空或者subscriberClass未匿名内部类,使用findSubscriberMethodsWithReflection方法查找 subscriberMethods = findSubscriberMethodsWithReflection(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { //存入缓存并返回 synchronized (METHOD_CACHE) { METHOD_CACHE.put(key, subscriberMethods); } return subscriberMethods; } }
通过名字我们就知道这个方法是获取subscriberClass类中所有的事件处理方法(即使用了@Subscribe的方法)。该方法首先会从缓存METHOD_CACHE中去获取事件处理方法,如果缓存中不存在,则需要通过findSubscriberMethodsWithIndex或者findSubscriberMethodsWithReflection方法获取所有事件处理方法,获取到之后先存入缓存再返回。
这个方法里面有个INDEX对象,我们看看它是个什么鬼:
/** Optional generated index without entries from subscribers super classes */ private static final SubscriberIndex INDEX; static { SubscriberIndex newIndex = null; try { Class<?> clazz = Class.forName("de.greenrobot.event.GeneratedSubscriberIndex"); newIndex = (SubscriberIndex) clazz.newInstance(); } catch (ClassNotFoundException e) { Log.d(EventBus.TAG, "No subscriber index available, reverting to dynamic look-up"); // Fine } catch (Exception e) { Log.w(EventBus.TAG, "Could not init subscriber index, reverting to dynamic look-up", e); } INDEX = newIndex; }
由上面代码可以看出EventBus会试图加载一个de.greenrobot.event.GeneratedSubscriberIndex类并创建对象赋值给INDEX,但是EventBus3.0 beta并没有为我们提供该类(可能后续版本会提供)。所以INDEX为null。
我们再返回findSubscriberMethods方法,我们知道INDEX已经为null了,所以必然会调用findSubscriberMethodsWithReflection方法查找所有事件处理函数:
private List<SubscriberMethod> findSubscriberMethodsWithReflection(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = new ArrayList<SubscriberMethod>(); Class<?> clazz = subscriberClass; HashSet<String> eventTypesFound = new HashSet<String>(); StringBuilder methodKeyBuilder = new StringBuilder(); while (clazz != null) { String name = clazz.getName(); // 如果查找的类是java、javax或者android包下面的类,则过滤掉 if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) { // Skip system classes, this just degrades performance break; } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) // 通过反射查找所有该类中所有方法 Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { int modifiers = method.getModifiers(); // 事件处理方法必须为public,这里过滤掉所有非public方法 if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); // 事件处理方法必须只有一个参数 if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { String methodName = method.getName(); Class<?> eventType = parameterTypes[0]; methodKeyBuilder.setLength(0); methodKeyBuilder.append(methodName); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); if (eventTypesFound.add(methodKey)) { // Only add if not already found in a sub class // 只有在子类中没有找到,才会添加到subscriberMethods ThreadMode threadMode = subscribeAnnotation.threadMode(); subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification) { // 如果某个方法加了@Subscribe注解,并且不是1个参数,则抛出EventBusException异常 if (method.isAnnotationPresent(Subscribe.class)) { String methodName = name + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } } else if (strictMethodVerification) { // 如果某个方法加了@Subscribe注解,并且不是public修饰,则抛出EventBusException异常 if (method.isAnnotationPresent(Subscribe.class)) { String methodName = name + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } } // 会继续查找父类的方法 clazz = clazz.getSuperclass(); } return subscriberMethods; }
该方法主要作用就是找出subscriberClass类以及subscriberClass的父类中所有的事件处理方法(添加了@Subscribe注解,访问修饰符为public并且只有一个参数)。值得注意的是:如果子类与父类中同时存在了相同事件处理函数,则父类中的不会被添加到subscriberMethods。
好了,查找事件处理函数的过程已经完了,我们继续回到register方法中:
for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); }
找到事件处理函数后,会遍历找到的所有事件处理函数并调用subscribe方法将所有事件处理函数注册到EventBus中。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; // 获取订阅了某种类型数据的 Subscription 。 使用了 CopyOnWriteArrayList ,这个是线程安全的, // CopyOnWriteArrayList 会在更新的时候,重新生成一份 copy,其他线程使用的是 // copy,不存在什么线程安全性的问题。 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); Subscription newSubscription = new Subscription(subscriber, subscriberMethod); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //如果已经被注册过了,则抛出EventBusException异常 if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) // subscriberMethod.method.setAccessible(true); // Got to synchronize to avoid shifted positions when adding/removing concurrently // 根据优先级将newSubscription查到合适位置 synchronized (subscriptions) { int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } } //将处理事件类型添加到typesBySubscriber List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // 如果该事件处理方法为粘性事件,即设置了“sticky = true”,则需要调用checkPostStickyEventToSubscription // 判断是否有粘性事件需要处理,如果需要处理则触发一次事件处理函数 if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
如果事件处理函数设置了“sticky = true”,则会调用checkPostStickyEventToSubscription处理粘性事件。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper()); } }
如果存在粘性事件,则立即调用postToSubscription触发该事件的事件处理函数。postToSubscription函数后面讲post时会讲到。
至此,整个register过程就介绍完了。
总结一下,整个过程分为3步:
- 查找注册的类中所有的事件处理函数(添加了@Subscribe注解且访问修饰符为public的方法)
- 将所有事件处理函数注册到EventBus
- 如果有事件处理函数设置了“sticky = true”,则立即处理该事件
post事件
register过程讲完后,我们知道了EventBus如何找到我们定义好的事件处理函数。有了这些事件处理函数,当post相应事件的时候,EventBus就会触发订阅该事件的处理函数。具体post过程是怎样的呢?我们看看代码:
public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { // 标识post的线程是否是主线程 postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { // 循环处理eventQueue中的每一个event对象 while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { // 处理完之后重置postingState的一些标识信息 postingState.isPosting = false; postingState.isMainThread = false; } } }
currentPostingThreadState是一个ThreadLocal类型,里面存储了PostingThreadState;
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } }; /** For ThreadLocal, much faster to set (and get multiple values). */ final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<Object>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
PostingThreadState包含了一个事件队列eventQueue和一些标志信息。eventQueue存放所有待post的事件对象。
我们再回到post方法,首先会将event对象添加到事件队列eventQueue中。然后判断是否有事件正在post,如果没有则会遍历eventQueue中每一个event对象,并且调用postSingleEvent方法post该事件。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { // 如果允许事件继承,则会调用lookupAllEventTypes查找所有的父类和接口类 List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { // 如果post的事件没有被注册,则post一个NoSubscriberEvent事件 post(new NoSubscriberEvent(this, event)); } } }
如果允许事件继承,则会调用lookupAllEventTypes查找所有的父类和接口类。
private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
这个方法很简单,就是查找eventClass类的所有父类和接口,并将其保存到eventTypesCache中,方便下次使用。
我们再回到postSingleEvent方法。不管允不允许事件继承,都会执行postSingleEventForEventType方法post事件。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
在postSingleEventForEventType方法中,会已eventClass为key从subscriptionsByEventType对象中获取Subscription列表。在上面讲register的时候我们已经看到EventBus在register的时候会将Subscription列表存储在subscriptionsByEventType中。接下来会遍历subscriptions列表然后调用postToSubscription方法进行下一步处理。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case PostThread: // 如果该事件处理函数没有指定线程模型或者线程模型为PostThread // 则调用invokeSubscriber在post的线程中执行事件处理函数 invokeSubscriber(subscription, event); break; case MainThread: // 如果该事件处理函数指定的线程模型为MainThread // 并且当前post的线程为主线程,则调用invokeSubscriber在当前线程(主线程)中执行事件处理函数 // 如果post的线程不是主线程,将使用mainThreadPoster.enqueue该事件处理函数添加到主线程的消息队列中 if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BackgroundThread: // 如果该事件处理函数指定的线程模型为BackgroundThread // 并且当前post的线程为主线程,则调用backgroundPoster.enqueue // 如果post的线程不是主线程,则调用invokeSubscriber在当前线程(非主线程)中执行事件处理函数 if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case Async: //添加到异步线程队列中 asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
该方法主要是根据register注册的事件处理函数的线程模型在指定的线程中触发事件处理函数。在上一讲EventBus使用详解中已经讲过EventBus的线程模型相关概念了,不明白的可以回去看看。
mainThreadPoster、backgroundPoster和asyncPoster分别是HandlerPoster、BackgroundPoster和AsyncPoster的对象,其中HandlerPoster继承自Handle,BackgroundPoster和AsyncPoster继承自Runnable。
我们主要看看HandlerPoster。
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
在EventBus的构造函数中,我们看到mainThreadPoster初始化的时候,传入的是Looper.getMainLooper()。所以此Handle是运行在主线程中的。
mainThreadPoster.enqueue方法:
void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
enqueue方法最终会调用sendMessage方法,所以该Handle的handleMessage方法会被调用。
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }
在该方法中,最终还是会调用eventBus.invokeSubscriber调用事件处理函数。
BackgroundPoster和AsyncPoster继承自Runnable,并且会在enqueue方法中调用eventBus.getExecutorService().execute(this);具体run方法大家可以自己去看源码,最终都会调用eventBus.invokeSubscriber方法。我们看看eventBus.invokeSubscriber方法的源码:
void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }
该方法会调用invokeSubscriber方法进一步处理:
void invokeSubscriber(Subscription subscription, Object event) { try { // 通过反射调用事件处理函数 subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
该方法最终会通过反射来调用事件处理函数。至此,整个post过程分析完了。
总结一下整个post过程,大致分为3步:
- 将事件对象添加到事件队列eventQueue中等待处理
- 遍历eventQueue队列中的事件对象并调用postSingleEvent处理每个事件
- 找出订阅过该事件的所有事件处理函数,并在相应的线程中执行该事件处理函数
取消事件注册
上面已经分析了EventBus的register和post过程,这两个过程是EventBus的核心。不需要订阅事件时需要取消事件注册:
/** Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }
取消事件注册很简单,只是将register过程注册到EventBus的事件处理函数移除掉。
到这里,EventBus源码我们已经分析完了,如有不对的地方还望指点。