作者:王子源
指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布 - 订阅模式、模型 - 视图模式,它是对象行为型模式。
在观察者模式中,有以下几个角色。
主题也叫被观察者(Subject):
观察者(Observer):
观察者接受到消息后,即进行更新操作,对接收到的信息进行处理。
具体的被观察者(ConcreteSubject):
定义被观察者自己的业务逻辑,同时定义对哪些事件进行通知。
具体的观察者(ConcreteObserver):
具体的观察者,每个观察者接收到消息后的处理反应是不同的,每个观察者都有自己的处理逻辑。
Spring 事件机制是观察者模式的实现。ApplicationContext 中事件处理是由 ApplicationEvent 类和 ApplicationListener 接口来提供的。如果一个 Bean 实现了 ApplicationListener 接口,并且已经发布到容器中去,每次 ApplicationContext 发布一个 ApplicationEvent 事件,这个 Bean 就会接到通知。ApplicationEvent 事件的发布需要显示触发,要么 Spring 触发,要么我们编码触发。spring 内置事件由 spring 触发。我们先来看一下,如何自定义 spring 事件,并使其被监听和发布。
2.1.1 事件
事件,ApplicationEvent,该抽象类继承了 EventObject,EventObject 是 JDK 中的类,并建议所有的事件都应该继承自 EventObject。
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp = System.currentTimeMillis();
public ApplicationEvent(Object source) {
super(source);
}
public final long getTimestamp() {
return this.timestamp;
}
}
2.1.2 监听器
ApplicationListener,是一个接口,该接口继承了 EventListener 接口。EventListener 接口是 JDK 中的,建议所有的事件监听器都应该继承 EventListener。
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}
2.1.3 事件发布器
ApplicationEventPublisher,ApplicationContext 继承了该接口,在 ApplicationContext 的抽象实现类 AbstractApplicationContext 中做了实现下面我们来看一下
org.springframework.context.support.AbstractApplicationContext#publishEvent(java.lang.Object, org.springframework.core.ResolvableType)
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
//获取当前注入的发布器,执行发布方法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
我们可以看到,AbstractApplicationContext 中 publishEvent 方法最终执行发布事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我们再来一起看一下 multicastEvent 方法
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
//拿到所有的监听器,如果异步执行器不为空,异步执行
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
//执行监听方法
invokeListener(listener, event);
}
}
}
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
上面介绍了非 spring 内置的事件发布和监听执行流程。总结一下
上面我们讲到了 spring 事件的发布,那么 spring 事件发布之后,spring 是如何根据事件找到事件对应的监听器呢?我们一起来探究一下。
spring 的容器初始化过程想必大家都已十分了解,这里就不过多赘述,我们直接看 refresh 方法在 refresh 方法中,有这样两个方法,initApplicationEventMulticaster() 和 registerListeners()
我们先来看 initApplicationEventMulticaster() 方法
2.2.1 initApplicationEventMulticaster()
org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster
public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
protected void initApplicationEventMulticaster() {
//获得beanFactory
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//BeanFactory中是否有ApplicationEventMulticaster
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
//如果BeanFactory中不存在,就创建一个SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
上述代码我们可以看出,spring 先从 BeanFactory 中获取 applicationEventMulticaster 如果为空,则直接创建 SimpleApplicationEventMulticaster
2.2.2 registerListeners()
org.springframework.context.support.AbstractApplicationContext#registerListeners
registerListeners 是将各种实现了 ApplicationListener 的监听器注册到 ApplicationEventMulticaster 事件广播器中
protected void registerListeners() {
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
//把监听器注册到事件发布器上
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
//如果内置监听事件集合不为空
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
//执行spring内置的监听方法
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
这里解释一下 earlyApplicationListeners
earlyApplicationListeners 的本质还是 ApplicationListener。Spring 单例 Ban 的实例化是在 Refresh 阶段实例化的,那么用户自定义的一些 ApplicationListener 组件自然也是在这个阶段才初始化,但是 Spring 容器启动过程中,在 Refresh 完成之前还有很多事件:如 Spring 上下文环境准备等事件,这些事件又是 Spring 容器启动必须要监听的。所以 Spring 定义了一个 earlyApplicationListeners 集合,这个集合中的 Listener 在 factories 文件中定义好,在容器 Refresh 之前预先实例化好,然后就可以监听 Spring 容器启动过程中的所有事件。
当 registerListeners 方法执行完成,我们的监听器已经添加到多播器 SimpleApplicationEventMulticaster 中了,并且 earlyEvent 早期事件也已经执行完毕。但是我们发现,如果自定义了一个监听器去监听 spring 内置的事件,此时并没有被执行,那我们注册的监听器是如何被执行的呢?答案在 finishRefresh 方法中。
2.2.3 finishRefresh
org.springframework.context.support.AbstractApplicationContext#finishRefresh
protected void finishRefresh() {
clearResourceCaches();
initLifecycleProcessor();
getLifecycleProcessor().onRefresh();
//容器中的类全部初始化完毕后,触发刷新事件
publishEvent(new ContextRefreshedEvent(this));
LiveBeansView.registerApplicationContext(this);
}
如果我们想要实现在 spring 容器中所有 bean 创建完成后做一些扩展功能,我们就可以实现 ApplicationListener 这样我们就可以实现其功能了。至此,Spring 中同步的事件监听发布模式我们就讲解完了,当然 Spring 还支持异步的消息监听执行机制。
2.2.4 spring 中异步的监听执行机制
我们回过头来看一下 ApplicationEventMulticaster#pushEvent 方法
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
//获取当前注入的发布器,执行发布方法
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
最终执行发布事件的是 ApplicationEventMulticaster#multicastEvent 方法,下面我们再来一起看一下 multicastEvent 方法
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
//拿到所有的监听器,如果异步执行器不为空,异步执行
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
//执行监听方法
invokeListener(listener, event);
}
}
}
可以看到,异步事件通知主要依靠 SimpleApplicationEventMulticaster 类中的 Executor 去实现的,如果这个变量不配置的话默认事件通知是同步的, 否则就是异步通知了,要实现同时支持同步通知和异步通知就得从这里下手;我们上文已经分析过了在 initApplicationEventMulticaster 方法中有这样一段代码
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
如果 BeanFactory 中已经有了 SimpleApplicationEventMulticaster 则不会重新创建,那么我们可以再 spring 中注册一个 SimpleApplicationEventMulticaster 并且向其中注入对应的 Executor 这样我们就可以得到一个异步执行监听的 SimpleApplicationEventMulticaster 了,我们的通知就会通过 Executor 异步执行。这样可以大大提高事件发布的效率。
在 springboot 项目中我们可以增加一个配置类来实现
@Configuration
@EnableAsync
public class Config {
@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster(){
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
return simpleApplicationEventMulticaster;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
spring 项目中我们也可以增加如下 xml 配置
<!--定义事件异步处理-->
<bean id="commonTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->
<property name="corePoolSize" value="5" />
<!-- 最大线程池容量 -->
<property name="maxPoolSize" value="20" />
<!-- 超过最大线程池容量后,允许的线程队列数 -->
<property name="queueCapacity" value="100" />
<!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 -->
<property name="keepAliveSeconds" value="300" />
<!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 -->
<property name="allowCoreThreadTimeOut" value="true" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 -->
<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
</property>
</bean>
<!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个-->
<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
<!--注入任务执行器 这样就实现了异步调用-->
<property name="taskExecutor" ref="commonTaskExecutor"></property>
</bean>
本文主要讲解了观察者模式在 spring 中的应用及事件监听机制,JDK 也有实现提供事件监听机制 Spring 的事件机制也是基于 JDK 来扩展的。Spring 的事件机制默认是同步阻塞的,想要提升对应的效率要考虑异步事件。