Guava EventBus 发布 / 订阅源码分析
April 15 2016
如果想了解观察者模式的实现或应用场景,Google Guava EventBus 所提供的发布/订阅功能非常简洁优雅的实现这一模式,且对于使用者不需要依赖复杂的接口或实现。其整体设计非常值得学习和参考,本文将从其源码分析其实现细节。
示例代码
以下使用 EventBus 的简要示例代码,针对不同的 Mouse Event 类型,会触发相应 Event handler 方法:
// 定义事件实体
public class ClickEvent {}
public class MouseMoveEvent {}
// 定义事件监听者
public class MouseListener {
@Subscribe
public void handleClickEvent(ClickEvent event) {
println("click event trigger");
}
@Subscribe
public void handleMouseMoveEvent(MouseMoveEvent event) {
println("mouse move event trigger");
}
}
EventBus bus = new EventBus();
// 注册订阅者
bus.register(new MouseListener());
// 发送事件给相应事件订阅者
bus.post(new ClickEvent());
bus.post(new MouseMoveEvent());
// Output:
click event trigger
mouse move event trigger
更加详细的使用说明可以查看 EventBusExplained。
实现细节
从代码示例来看,用到 EventBus 的主要行为为 register
和 post
,分别是注册订阅者和事件发送:
EventBus.register
调用 EventBus.register
将事件类型与所有订阅该事件的订阅者进行关联,该方法通过调用 SubscriberRegistry.register
来执行注册。
当注册完成后,并由全局 subscribers
存储关联关系:
/**
* All registered subscribers, indexed by event type.
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
那么 SubscriberRegistry.register
是如何实现注册的呢?
STEP1:
如下代码,首先通过 findAllSubscribers
查找 listener 实例标识为 @Subscribe
的方法,并通过 Map 将方法参数类型(即事件类型)与对应的方法实例(由 Subscriber
包装 Method Object)进行关联。
void register(Object listener) {
// 这里的 Multimap 实则为 Map<Class<?>, Collection<Subscriber>> 结构
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
STEP2:
接着再将该 listener 所有的事件类型与对应的 Subscriber
关系增量到全局的 subscribers
变量中。以下 for
相当于将 listenerMethods
及全局的 subscribers
进行 Map 合并操作:
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
至此已经完成对监听器的注册,当具体事件类型触发时,可以根据事件类型从全局的 subscribers
中获取该事件类型所有订阅者。
另外,以上注册过程中使用了 ConcurrentMap
和 CopyOnWriteArraySet
进行数据操作,实则为了避免在多线程并发执行 register
时导致的线程安全问题,通过以下链接可以了解具体信息:
EventBus.post
当执行相应事件 post(event)
时,先根据事件类型获取到所有 Subscriber
,再通过 Dispatcher
将事件通知到对应 Subscriber
,最终由 Subscriber
实例执行其 Method
的 invoke
方法,即完成对具体 Listener 中方法的调用。
EventBus 提供了三种 Dispatcher
事件分发机制:
PerThreadQueuedDispatcher
如不指定默认该 dispatcher,通过队列及 ThreadLocal
来保证在同一个线程上发送的事件能够按发布的顺序分发给所有订阅者。
首先将需要分发的事件和对应的订阅者列表添加至当前线程的队列中:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
这里使用了 ThreadLocal<Boolean> dispatching
状态来标识当前线程是否分发完成,以避免重入分发。(什么是可重入?)
if (!dispatching.get()) {
dispatching.set(true);
标识为”分发中”状态后,再通过两层循环,先后遍历事件队列和订阅者,将相应事件送达到订阅者中。
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
LegacyAsyncDispatcher
在使用 AsyncEventBus
,则默认使用该分发器。与 PerThreadQueuedDispatcher
区别在于通过维护全局事件分发队列,使事件发布及订阅者能在多线程下并行执行。但无法完全保证在多线程下按事件发布顺序进行分发。
ImmediateDispatcher
遍历 subscribers
直接进行事件分发。
最后,由对应 Subscriber
接收到相应事件,并完成对订阅方法的调用:
final void dispatchEvent(final Object event) {
// 如果是 AsyncEventBus 则可以支持通过指定 executor 来异步执行 invoke
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}