首页 > 其他 > 详细

DelayQueue

时间:2020-10-21 12:11:31      阅读:36      评论:0      收藏:0      [点我收藏+]

场景

在电商环境中,如何处理用户进行预下单后,在一定时间内不进行支付,取消该订单?

延迟队列

经过网上的查询,得知JUC中有一个数据结构,叫做延迟队列DelayQueue。可以将订单存入该队列中,设定一定的时间,当时间到达后,过期的订单出队,进行业务操作。

存放在DelayQueue的元素需要实现Delayed接口

public class Message implements Delayed {
    private int id;
    // 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
    private long executeTime;

     public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public Message(int id,long delayTime) {
        this.id = id;
        this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }

    // 自定义实现比较方法返回 1 0 -1三个参数
    @Override
    public int compareTo(Delayed delayed) {
        Message msg = (Message) delayed;
        return this.executeTime > msg.executeTime ? 1
                : this.executeTime < msg.executeTime ? -1 : 0;
    }

    // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

实现监听类,即消费消息队列中的消息内容,本质就是一个线程

public class Consumer implements Runnable {
    // 延时队列 ,消费者从其中获取消息进行消费
    private DelayQueue<Message> queue;

    public Consumer(DelayQueue<Message> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Message take = queue.take();
                System.out.println("消费消息id:" + take.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试类

public class DelayQueueTest {
    public static void main(String[] args) {
        // 创建延时队列
        DelayQueue<Message> queue = new DelayQueue<Message>();
        // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(new Consumer(queue));
        exec.shutdown();

        // 添加延时消息,m1 延时3s
        Message m1 = new Message(1, 3000);
        // 添加延时消息,m2 延时10s
        Message m2 = new Message(2, 4000);
        //将延时消息放到延时队列中
        queue.offer(m2);
        queue.offer(m1);
        
        Message m3 = new Message(3, 7000);
        queue.offer(m3);
        Message m4 = new Message(4, 5000);
        queue.offer(m4);

    }
}

Spring

经过上面的例子,得知实现一个延迟队列必须要有消息的实体类Message、监听队列的线程Consumer和生成延迟队列的初始类。

在Spring中,如何将以上的类注入容器中?

  • 将生成延迟队列的初始类注入到Spring容器中
  • 在Spring装载完成相关类后,生成一个Consumer线程
  • Consumer线程监听队列中过期的消息,根据业务进行操作

将延迟队列初始化,并注入容器中

@Component
public class OrderQueue {
    private DelayQueue<OrderDelay> queue;

    public OrderQueue() {
        queue = new DelayQueue<OrderDelay>();
    }

    public DelayQueue<OrderDelay> getQueue() {
        return queue;
    }

    public void setQueue(DelayQueue<OrderDelay> queue) {
        this.queue = queue;
    }
}

实体类Message保持不变

Consumer类注入容器

@Service
public class Consumer {
    @Autowired
    private OrderQueue orderQueue;
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private OrderDetailMapper orderDetailMapper;


    //创建一个线程,监听订单队列
    //当每个订单超过规定时间后未支付,则删除相关数据
    @PostConstruct
    public void initThread() {
        //在这里开启线程,执行操作
        ChildThread ct = new ChildThread();
        ct.setName("orderDelayQueue-Thread");
        ct.setDaemon(true);
        ct.start();
    }

    //内部类
    private class ChildThread extends Thread {
        @Override
        public void run() {
            while (true) {
                Messagetake = null;
                try {
                    take = orderQueue.getQueue().take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                dealOrder(take.getId());
            }
        }
    }


    @Async("orderDelayQueue")
    public void dealOrder(Integer id) {
        //如果超时未支付,进行删除订单的操作
        int col = orderMapper.delete(new QueryWrapper<Order>().eq("id", id).eq("state", 4));
        if (col != 0) {
            //继续删除子单的相关内容
            orderDetailMapper.delete(new QueryWrapper<OrderDetails>().eq("order_id", id));
        }
    }

这里使用了@PostConstruct注解,即将会在依赖注入完成后被自动调用。在Spring装载的周期中Constructor >> @Autowired >> @PostConstruct。所以在Spring装载完成之后,再启动线程监听,就可以获取注入容器中的Mapper。

由于dealOrder是一个方法,必须注入线程池中,防止多线程阻塞,所以使用@Async注解。

总结

由于业务的并发量不大,使用延迟队列,比较轻量级。
其实在这个场景也可以使用一些消息中间件MQ,或者Redis之类的来进行处理。

DelayQueue

原文:https://www.cnblogs.com/dxyoung/p/13851258.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!