电子说
除了硬编码的异步编程处理方式,SpringBoot 框架还提供了 注解式
解决方案,以 方法体
为边界,方法体内部的代码逻辑全部按异步方式执行。
首先,使用 @EnableAsync
启用异步注解
@SpringBootApplication
@EnableAsync
public class StartApplication {
public static void main(String[] args) {
SpringApplication.run(StartApplication.class, args);
}
}
自定义线程池:
@Configuration
@Slf4j
public class ThreadPoolConfiguration {
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue(10000),
new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
(r, executor) -> log.error("system pool is full! "));
}
}
在异步处理的方法上添加注解 @Async
,当对 execute 方法
调用时,通过自定义的线程池 defaultThreadPoolExecutor
异步化执行 execute 方法
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("defaultThreadPoolExecutor")
public Boolean execute(Integer num) {
System.out.println("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
return true;
}
}
用 @Async 注解标记的方法,称为异步方法。在spring boot应用中使用 @Async 很简单:
事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。
ApplicationContext
通过 ApplicationEvent
类和 ApplicationListener
接口进行事件处理。如果将实现 ApplicationListener
接口的 bean 注入到上下文中,则每次使用 ApplicationContext
发布 ApplicationEvent
时,都会通知该 bean。本质上,这是标准的观察者设计模式
。
ApplicationEvent 是由 Spring 提供的所有 Event 类的基类
首先,自定义业务事件子类,继承自 ApplicationEvent
,通过泛型注入业务模型参数类。相当于 MQ 的消息体。
public class OrderEvent extends AbstractGenericEvent {
public OrderEvent(OrderModel source) {
super(source);
}
}
然后,编写事件监听器。ApplicationListener
接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener
。相当于 MQ 的消费端
@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(OrderEvent event) {
System.out.println("【OrderEventListener】监听器处理!" + JSON.toJSONString(event.getSource()));
}
}
最后,发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。
OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 发布Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));
加个餐:
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生产端]线程:http-nio-8090-exec-1,发布事件 1
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生产端]线程:http-nio-8090-exec-1,发布事件 3
上面是跑了个demo的运行结果,我们发现无论生产端还是消费端,使用了同一个线程 http-nio-8090-exec-1
,Spring 框架的事件机制默认是同步阻塞的。只是在代码规范方面做了解耦,有较好的扩展性,但底层还是采用同步调用方式。
那么问题来了,如果想实现异步调用,如何处理?
我们需要手动创建一个 SimpleApplicationEventMulticaster
,并设置 TaskExecutor
,此时所有的消费事件采用异步线程执行。
@Component
public class SpringConfiguration {
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
return simpleApplicationEventMulticaster;
}
}
我们看下改造后的运行结果:
[生产端]线程:http-nio-8090-exec-1,发布事件 1
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[生产端]线程:http-nio-8090-exec-1,发布事件 3
[消费端]线程:default-executor-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消费端]线程:default-executor-2,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消费端]线程:default-executor-0,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
SimpleApplicationEventMulticaster
这个我们自己实例化的 Bean 与系统默认的加载顺序如何?会不会有冲突?
查了下 Spring 源码,处理逻辑在 AbstractApplicationContext#initApplicationEventMulticaster
方法中,通过 beanFactory 查找是否有自定义的 Bean,如果没有,容器会自己 new 一个 SimpleApplicationEventMulticaster
对象注入到容器中。
代码地址:https://github.com/aalansehaiyang/wx-project
异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。
消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。
消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。
消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式
和发布订阅模式
两种。
消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。
当然市面上消息队列框架非常多,常见的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等
不同的消息队列的功能特性会略有不同,但整体架构类似,这里就不展开了。
我们只需要记住一个关键点,借助消息队列这个中间件可以高效的实现异步编程。
全部0条评论
快来发表一下你的评论吧 !