最近有一个需求,基于消息队列对数据消费,并根据多次消费的结果对数据进行重新组装,如果在指定时间内,需要的数据全部到达,则进行数据组装以及后续逻辑。简单的说,设置一个超时时间,如果在该时间内由MQ中消费到完整的数据则直接处理,否则进入其他流程。
针对这种场景使用了延迟任务来实现,以此为契机对延迟任务相关的技术做了个简单了解...
延迟任务是一种指定任务在未来某个时间点或一定时间后执行的方式。通常情况下,延迟任务可以通过设置任务的执行时间或延迟时间来实现。
延迟任务可以用于异步操作、定时任务和任务调度等场景。例如,在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信,可以通过延迟任务来实现异步操作。定时检查服务器状态、定时备份数据等任务,也可以通过延迟任务来实现定时任务。在某个时间点触发某个任务、在某个时间段内重复执行某个任务等,可以通过延迟任务来实现任务调度。
延迟任务通常使用队列或者定时器来实现。在队列中,任务会被添加到一个等待队列中,等待队列中的任务会在指定的时间点或延迟时间后被取出执行。在定时器中,任务会被添加到一个定时器中,定时器会在指定的时间点触发任务执行。
总之,延迟任务是一种非常实用的技术,可以帮助我们更好地管理系统中的异步操作、定时任务和任务调度等场景。
异步操作:延迟任务可以用于异步操作,例如在用户注册后发送欢迎邮件或者在用户下单后发送订单确认短信。通过使用延迟任务,可以将这些操作推迟到后台处理,从而提高系统的响应速度和并发能力。
定时任务:延迟任务可以用于定时任务,例如定时检查服务器状态、定时备份数据等。通过使用延迟任务,可以在指定的时间点自动触发任务,避免手动操作的繁琐和容易出错。
任务调度:延迟任务可以用于任务调度,例如在某个时间点触发某个任务、在某个时间段内重复执行某个任务等。通过使用延迟任务,可以方便地进行任务调度,提高系统的可靠性和稳定性。
其中前三种Timer、DelayQueue、ScheduledThreadPoolExecutor实现比较简单,只不过只适用于单体应用,任务数据都在内存中,在系统崩溃后数据丢失;后两张实现相对复杂,并且需要依赖于第三方应用,在系统整体结构上更加复杂且消耗更多资源,但能支持分布式系统,且有较高的容错性。
定义延迟任务对象:
@Getter
public class DelayTask implements Serializable{
private static final long serialVersionUID = -5062977578344039366L;
private long delaySeconds;
private TaskExecute taskExecute;
public DelayTask(long delaySeconds, TaskExecute taskExecute) {
this.delaySeconds = delaySeconds;
this.taskExecute = taskExecute;
}
/**
*
*/
public void execute(){
taskExecute.run();
}
public interface TaskExecute extends Runnable, Serializable {
}
}
调度器:
public interface ScheduleTrigger {
/**
* 延迟任务调度
* @param delayTask
*/
void schedule(DelayTask delayTask);
}
public class JavaTrigger implements ScheduleTrigger{
private Timer timer;
public JavaTimer(){
this.timer = new Timer();
}
/**
*
* @param delayTask
*/
public void schedule(DelayTask delayTask){
timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
}
private TimerTask buildTimerTask(Runnable runnable){
return new TimerTask() {
@Override
public void run() {
runnable.run();
}
};
}
}
public class DelayQueueTrigger implements ScheduleTrigger{
private DelayQueue< Task > queue = new DelayQueue< >();
public DelayQueueTrigger() {
Thread thread = new Thread(() - > {
while (true) {
try {
Task task = queue.take();
if(task != null)
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
/**
* @param delayTask
*/
public void schedule(DelayTask delayTask){
if( delayTask instanceof Task ){
queue.put((Task) delayTask);
}
}
}
class Task extends DelayTask implements Delayed{
private long execTime;
public Task(long delaySeconds, TaskExecute taskExecute) {
super(delaySeconds, taskExecute);
this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
}
/**
* 轮询执行该方法判断是否满足执行条件(<=0)
* 同时该返回作为等待时长
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.execTime - System.currentTimeMillis(); // ms
}
public long getExecTime() {
return execTime;
}
@Override
public int compareTo(Delayed other) {
if(this.getExecTime() == ((Task)other).getExecTime()){
return 0;
}
return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
}
}
public class ScheduledExecutorTrigger implements ScheduleTrigger{
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public void schedule(DelayTask delayTask){
executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
notify-keyspace-events Ex
public class RedisTimer{
private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";
@Configuration
@Import(RedisAutoConfiguration.class)
public static class Config{
@Bean(name = "redisTemplate")
public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
RedisTemplate< Object, Object > template = new RedisTemplate< >();
RedisSerializer< String > keySerializer = new StringRedisSerializer();
RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valueSerializer);
return template;
}
/**
* 消息监听器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
redisKeyExpirationListener.setContext(context());
return redisKeyExpirationListener;
}
@Bean
public Context context(){
return new Context();
}
@Bean
public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
return new RedisTrigger(redisTemplate, context());
}
class ObjectRedisSerializer implements RedisSerializer{
@Override
public byte[] serialize(Object o) throws SerializationException {
return SerializeUtils.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return SerializeUtils.deserialize(bytes);
}
}
}
public static class RedisTrigger implements ScheduleTrigger{
private RedisTemplate redisTemplate;
private Context context;
public RedisTrigger(RedisTemplate redisTemplate, Context context){
this.redisTemplate = redisTemplate;
this.context = context;
}
public void schedule(DelayTask delayTask){
context.put(EXPIRATION_KEY, delayTask);
redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
@Slf4j
public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private Context context;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 这里没法拿到过期值
* @param message never {@literal null}.
*/
@SneakyThrows
@Override
public void doHandleMessage(Message message) {
try {
String topic = new String(message.getChannel(), "utf-8");
String key = new String(message.getBody(), "utf-8");
if (EXPIRATION_KEY.equals(key)) {
Object object = context.get(EXPIRATION_KEY);
if( object instanceof DelayTask ){
log.info("redis key[{}] 过期回调", key);
((DelayTask) object).execute();
}
}
} catch (Exception e) {
log.error("处理Redis延迟任务异常:{}", e.getMessage() ,e);
}
}
public void setContext(Context context) {
this.context = context;
}
}
public static class Context{
private Map< String,Object > context = new ConcurrentHashMap< >();
public void put(String key, Object value){
context.put(key, value);
}
public Object get(String key){
return context.get(key);
}
}
}
什么时候消息进入死信?
public class RabbitTimer{
@Configuration
@Import(RabbitAutoConfiguration.class)
public static class Config{
static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
}
@Bean
public Exchange ttlExchange(){
return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
}
@Bean
public Queue commonQueue(){
return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
.deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
.deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
.build();
}
@Bean
public TtlMessageConsumer ttlMessageConsumer(){
return new TtlMessageConsumer();
}
@Bean
public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
return new RabbitTrigger(rabbitTemplate);
}
}
@Slf4j
@RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
public static class TtlMessageConsumer{
@RabbitHandler
public void handle(byte [] message){
Object deserialize = SerializeUtils.deserialize(message);
if( deserialize instanceof DelayTask ){
((DelayTask) deserialize).execute();
}
}
}
public static class RabbitTrigger implements ScheduleTrigger{
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void schedule(DelayTask delayTask){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
}
}
}
public class NettyTrigger implements ScheduleTrigger {
HashedWheelTimer timer = new HashedWheelTimer(200,
TimeUnit.MILLISECONDS,
100); // 时间轮中的槽数
/**
*
*/
@Override
public void schedule(DelayTask delayTask){
TimerTask task = timeout - > delayTask.execute();
//
timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
测试:
ScheduleTrigger.schedule(DelayTask delayTask);
通过几个简单的示例了解延迟队列的实现方式,可以根据实际业务场景以及应用架构做出合理的选择。
全部0条评论
快来发表一下你的评论吧 !