首页 > 其他 > 详细

RabbitMQ发送端事务管理 —— 事务机制 和 确认机制

时间:2018-05-22 00:35:00      阅读:443      评论:0      收藏:0      [点我收藏+]

一、AMQP提供

事务机制,比较消耗性能

try {
    channel.txSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN,
            msg.getBytes());
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4,
                byte[] arg5) throws IOException {
            System.out.println("返回的消息是:" + new String(arg5));
        }
    });
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}

二、RabbitMQ提供

消息确认机制(效率比事务机制高)

try {
    channel.confirmSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue2", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    if(!channel.waitForConfirms()) {
        System.out.println("send message failed");
    }
} catch (Exception e) {
    e.printStackTrace();
}

但是确认模式,是每发送一条消息后就调用channel.waitForConfirms()方法,之后等待服务器的确认,这实际上是一种串行、同步等待的方式,事务机制和它一样(但在实际大数量量测试时,会发现确认机制只比事务机制效率高一点点。是因为同步等待的方式下,confirm机制发送一条消息需要通信交互的命令是2条:Basic.Publish和Basic.Ack;事务机制是3条:Basic.Publish、Tx.Commit/Tx.Commit-Ok或者Tx.Rollback/Tx.Rollback-Ok)

优化方案一:

批量confirm方法,比较简单,不写代码了,就是将channel.basicPublish用循环框起来,发送次数到达一定数量后,进行waitForConfirms()即可。缺点是,如果这批出现返回Basic.Nack或者超时情况,客户端需要将这一批重新发送,当消息经常丢失是,性能不升反降。

注意:要将发送出去的消息存入缓存 中,可以是ArrayList或者BlockingQueue之类的,然后适时的清空,或者重发里面的信息。

优化方案二:

异步confirm方法,提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
        //注意这里需要添加处理消息重发的场景
    }
});
channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

三、总结

1、事务机制和确认机制是互斥的,不能共存。

2、事务机制和确认机制确保的是消息能够正确地发送到RabbitMQ的交换器,如果些交换器没有匹配队列,那么消息也会丢失。

3、普通事务机制和普通confirm的方式吞吐量很低,但方式简单,不需要在客户端维护状态(这里指的是维护deliveryTag及缓存未确认的消息)。

  批量confirm方式的问题在于遇到RabbitMQ服务端返回Basicnack需要重发批量导致的性能降低。

  异步confirm方式和批量confirm一样需要在客户端维护状态。

RabbitMQ发送端事务管理 —— 事务机制 和 确认机制

原文:https://www.cnblogs.com/yifanSJ/p/9070039.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!