借助redisson实现延时队列
public class RedissonDelayQueue { private RedissonClient redissonClient; private RBlockingDeque<String> rBlockingDeque; private RDelayedQueue<String> rDelayedQueue; public void delaySend(String jsonObject, Long delay, TimeUnit timeUnit) { this.rDelayedQueue.offer(jsonObject, delay, timeUnit); } public RedissonDelayQueue() { Config config = new Config(); config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://10.13.65.178:6390"); this.redissonClient = Redisson.create(config); this.rBlockingDeque = redissonClient.getBlockingDeque("MXZ_DELAY_QUEUE"); if (this.rBlockingDeque == null) { return; } this.rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque); if (this.rDelayedQueue == null) { return; } this.startConsumerDelayQueue(); System.out.println("启动时间" + LocalDateTime.now()); } private void startConsumerDelayQueue() { Thread thread = new Thread(() -> { while (true) { try { String jsonObject = this.rBlockingDeque.take(); System.out.println("--> 延迟队列获取数据:{}" + jsonObject); } catch (InterruptedException e) { } } }); thread.setDaemon(true); thread.start(); } public static void main(String[] args) { RedissonDelayQueue queue = new RedissonDelayQueue(); queue.delaySend("one", 10l,TimeUnit.SECONDS); queue.delaySend("two", 20l,TimeUnit.SECONDS); } }
原文:https://www.cnblogs.com/juniorMa/p/14659856.html