异步编程的几种种实现方式(下)

电子说

1.3w人已加入

描述

五、 SpringBoot 注解 @Async

除了硬编码的异步编程处理方式,SpringBoot 框架还提供了 注解式 解决方案,以 方法体 为边界,方法体内部的代码逻辑全部按异步方式执行。

首先,使用 @EnableAsync 启用异步注解

@SpringBootApplication
@EnableAsync
public class StartApplication {

    public static void main(String[] args) {
        SpringApplication.run(StartApplication.class, args);
    }
}

自定义线程池:

@Configuration
@Slf4j
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {

        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行 execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        System.out.println("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
        return true;
    }

}

用 @Async 注解标记的方法,称为异步方法。在spring boot应用中使用 @Async 很简单:

  • 调用异步方法类上或者启动类加上注解 @EnableAsync
  • 在需要被异步调用的方法外加上 @Async
  • 所使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;

六、Spring ApplicationEvent 事件

事件机制在一些大型项目中被经常使用,Spring 专门提供了一套事件机制的接口,满足了架构原则上的解耦。

ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。本质上,这是标准的观察者设计模式

ApplicationEvent 是由 Spring 提供的所有 Event 类的基类

首先,自定义业务事件子类,继承自 ApplicationEvent,通过泛型注入业务模型参数类。相当于 MQ 的消息体。

public class OrderEvent extends AbstractGenericEvent {
    public OrderEvent(OrderModel source) {
        super(source);
    }
}

然后,编写事件监听器。ApplicationListener 接口是由 Spring 提供的事件订阅者必须实现的接口,我们需要定义一个子类,继承 ApplicationListener。相当于 MQ 的消费端

@Component
public class OrderEventListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(OrderEvent event) {

        System.out.println("【OrderEventListener】监听器处理!" + JSON.toJSONString(event.getSource()));

    }
}

最后,发布事件,把某个事件告诉所有与这个事件相关的监听器。相当于 MQ 的生产端。

OrderModel orderModel = new OrderModel();
orderModel.setOrderId((long) i);
orderModel.setBuyerName("Tom-" + i);
orderModel.setSellerName("judy-" + i);
orderModel.setAmount(100L);
// 发布Spring事件通知
SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));

加个餐:

[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生产端]线程:http-nio-8090-exec-1,发布事件 1
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[消费端]线程:http-nio-8090-exec-1,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生产端]线程:http-nio-8090-exec-1,发布事件 3

上面是跑了个demo的运行结果,我们发现无论生产端还是消费端,使用了同一个线程 http-nio-8090-exec-1,Spring 框架的事件机制默认是同步阻塞的。只是在代码规范方面做了解耦,有较好的扩展性,但底层还是采用同步调用方式。

那么问题来了,如果想实现异步调用,如何处理?

我们需要手动创建一个 SimpleApplicationEventMulticaster,并设置 TaskExecutor,此时所有的消费事件采用异步线程执行。

@Component
public class SpringConfiguration {

    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }

}

我们看下改造后的运行结果:

[生产端]线程:http-nio-8090-exec-1,发布事件 1
[生产端]线程:http-nio-8090-exec-1,发布事件 2
[生产端]线程:http-nio-8090-exec-1,发布事件 3
[消费端]线程:default-executor-1,消费事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消费端]线程:default-executor-2,消费事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消费端]线程:default-executor-0,消费事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}

SimpleApplicationEventMulticaster 这个我们自己实例化的 Bean 与系统默认的加载顺序如何?会不会有冲突?

查了下 Spring 源码,处理逻辑在 AbstractApplicationContext#initApplicationEventMulticaster 方法中,通过 beanFactory 查找是否有自定义的 Bean,如果没有,容器会自己 new 一个 SimpleApplicationEventMulticaster 对象注入到容器中。

代码

代码地址:https://github.com/aalansehaiyang/wx-project

七、消息队列

异步架构是互联网系统中一种典型架构模式,与同步架构相对应。而消息队列天生就是这种异步架构,具有超高吞吐量和超低时延。

消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。

代码

消息生产者就是主应用程序,生产者将调用请求封装成消息发送给消息队列。

消息队列的职责就是缓冲消息,等待消费者消费。根据消费方式又分为点对点模式发布订阅模式两种。

消息消费者,用来从消息队列中拉取、消费消息,完成业务逻辑处理。

当然市面上消息队列框架非常多,常见的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等

代码

不同的消息队列的功能特性会略有不同,但整体架构类似,这里就不展开了。

我们只需要记住一个关键点,借助消息队列这个中间件可以高效的实现异步编程。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分