通常在一个jvm进程中,若想实现延迟逻辑,可以使用jdk自带的延迟队列DelayQueue来实现。DelayQueue中的元素PriorityQueue来实现的,DelayQueue中的元素会实现
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
即可在DelayQueue进行poll操作时候获取最近需要的元素。但是这种延时队列是保存在内存中,所以一旦进程关闭或崩溃,队列中的数据都会丢失,所以只有配合持久化才可以保证数据不丢失。
那么如果在多进程条件下,如果要实现延迟队列,则需要一个统一的地方保存延迟元素,这个元素可以被称为任务,redis是一个不错的选择。Redisson实现了集群环境下延迟队列的实现。
引入reddison依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.3</version> </dependency>
redis基本配置
private Config initRedissonConfig() { Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setTimeout(timeout) .setConnectionPoolSize(maxIdle) .setConnectionMinimumIdleSize(minIdle); return config; } @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = initRedissonConfig(); return Redisson.create(config); }
定义redisson阻塞队列,注册相关bean
public class QueueConfig {
private final String queueName = "queue";
@Bean
public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonClient") RedissonClient redissonClient) {
return redissonClient.getBlockingQueue(queueName);
}
@Bean
public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonClient") RedissonClient redissonClient,
@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {
return redissonClient.getDelayedQueue(blockQueue);
}
}
下面进行测试,TakeTask负责消费队列中的任务
public class TakeTask { @Resource(name = "rBlockingQueue") private RBlockingQueue<String> rBlockingQueue; @PostConstruct public void take() { new Thread(() -> { while (true) { try { String s = rBlockingQueue.take(); System.out.println(s); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
在延时队列rDelayQueue中延迟添加任务,这里需要调用带参数的offerAsync的方法,延时添加。
@RestController @RequestMapping("/") public class TestController { @Resource(name = "rDelayedQueue") private RDelayedQueue<String> rDelayedQueue; @GetMapping("/offer") public void offer() { for (int i = 1; i <= 2; i++) { rDelayedQueue.offerAsync("task: " + i, 1, TimeUnit.SECONDS); } } }
由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务。
对于jdk中的DelayQueue延时队列是采用zset来实现,每次add,会立即将元素添加到队列中,zset会根据指定的字段进行排序,维护一个优先队列,当进行take操作时候,取到头节点的数据一定是最大或者最小的,但是此时头节点不一定能取出来,需要多一步判断,这一步其实就是 public long getDelay(TimeUnit unit);要实现的方法,只有返回值大于0才会真正被取出来。redission的延时队列是异步延时加入的,也就是说并没有立刻加入队列中,而是在指定的延时时间delay之后才会加入,所以在take的时候是一定可以直接取出来队列中的元素。
原文:https://www.cnblogs.com/markytsai/p/13800743.html