首页 > 其他 > 详细

RabbitMQ事务性消息和确认模式

时间:2020-11-06 00:18:30      阅读:25      评论:0      收藏:0      [点我收藏+]

事务消息与数据库的事务类似,只是MQ的消息是要保证消息是否会全部发送成功,防止消息丢失的一种策略。

RabbitMQ有两种策略来解决这个问题:

1.通过AMQP的事务机制实现

2.使用发送者确认模式实现

1.事务

事务的实现主要是对信道(Channel)的设置,主要方法如下:

1. channel.txSelect()  声明启动事务模式

2.channel.txCommit() 提交事务

3.channel.txRollback()回滚事务

1.事务性消息发送

开启事务之后必须手动channel.txCommit();提交或者channel.txRollback();回滚。

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = getConnection();
            channel = connection.createChannel();
            /**
             * 声明一个队列。
             * 参数一:队列名称
             * 参数二:是否持久化
             * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
             * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
             * 参数五:队列的附加属性
             * 注意:
             * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
             * 2.队列名可以任意取值,但需要与消息接收者一致。
             * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
             */
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 启动事务,必须用txCommit()或者txRollback()回滚
            channel.txSelect();

            // 假设这里处理业务逻辑
            String message = "hello,message!";
            /**
             * 发送消息到MQ
             * 参数一:交换机名称,为""表示不用交换机
             * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
             * 参数三:消息的属性信息
             * 参数四:消息内容的字节数组
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            // 提交事务
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    // 回滚。如果未异常会提交事务,此时回滚无影响
                    channel.txRollback();
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  测试可以注释掉提交事务的代码发现mq不会新增消息。

2.消费者事务测试

  经测试,自动确认模式下。即使事务不提交,也会读取到消息并从队列移除。也就是暂时得出的结论是事务对消费者无效。

package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 声明一个队列。
             * 参数一:队列名称
             * 参数二:是否持久化
             * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
             * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
             * 参数五:队列的附加属性
             * 注意:
             * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
             * 2.队列名可以任意取值,但需要与消息接收者一致。
             * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 开启事务
             * 消费者开启事务后,即使不提交也会获取到消息并且从队列删除。
             * 结论:
             *     事务对消费者没有任何影响
             */
            createChannel.txSelect();
            
            /**
             * 接收消息。会持续坚挺,不能关闭channel和Connection
             * 参数一:队列名称
             * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
             * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
             * 参数四:消息接收者
             */
            createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到d消息: -》 " + string);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}

  上面是自动确认模式的消费者,不受事务的影响。

如果是手动确认消息的消费者,在开启事务下,必须手动commit,否则不会移除消息。

如下:手动确认模式+事务的用法

package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 声明一个队列。
             * 参数一:队列名称
             * 参数二:是否持久化
             * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
             * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
             * 参数五:队列的附加属性
             * 注意:
             * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
             * 2.队列名可以任意取值,但需要与消息接收者一致。
             * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 开启事务
             * 消费者开启事务后,即使不提交也会获取到消息并且从队列删除。
             * 结论:
             *     如果是手动确认的消费者,开启事物的情况下必须ack之后再commit,否则不会从队列移除
             */
            createChannel.txSelect();
            
            /**
             * 接收消息。会持续坚挺,不能关闭channel和Connection
             * 参数一:队列名称
             * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
             * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
             * 参数四:消息接收者
             */
            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    // 该消息是否已经被处理过,true表示已经处理过,false表示没有处理过
                    boolean redeliver = envelope.isRedeliver();
                    
                    String string = new String(body, "UTF-8");
                    // 获取消息的编号,根据编号确认消息
                    long deliveryTag = envelope.getDeliveryTag();
                    // 获取当前内部类中的通道
                    Channel channel = this.getChannel();
                    System.out.println("处理消息成功, 消息: " + string + "\t redeliver: " + redeliver);
                    
                    // 手动确认
                    channel.basicAck(deliveryTag, true);
                    
                    // 提交事务
                    channel.txCommit();
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}

这里envelope.isRedeliver() 可以返回该消息是否已经被处理过。

2. 确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。最终确保所有的消息全部发送成功。confirm确认模式要比事务快。

Confirm的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;

方式二:channel.waitForConfirmsOrDie()批量确认模式;

方式三:channel.addConfirmListener()异步监听发送方确认模式;

1. 普通发送方确认模式

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 启动发送者确认模式
            channel.confirmSelect();
    
            String message = "hello,message! confirmSelect";
            /**
             * 发送消息到MQ
             * 参数一:交换机名称,为""表示不用交换机
             * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
             * 参数三:消息的属性信息
             * 参数四:消息内容的字节数组
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回false
            if (channel.waitForConfirms()) {
                System.out.print("发送成功");
            } else {
                // 返回false可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // channel.waitForConfirms 可能返回超时异常
            // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  这里需要用confirmSelect() 开启确认模式,然后channel.waitForConfirms() 阻塞等待发送。返回false或者抛出InterruptedException中断异常都是发送失败。可以进行补发,可以用重试机制或者先存到redis,随后用定时任务发送。

2.批量确认模式

  这种用于确认一大批消息模式。但是一旦消息集合有一个没发送成功就会全部失败,需要全部进行补发。

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 启动发送者确认模式
            channel.confirmSelect();
    
            String message = "hello,message! confirmSelect";
            /**
             * 发送消息到MQ
             * 参数一:交换机名称,为""表示不用交换机
             * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
             * 参数三:消息的属性信息
             * 参数四:消息内容的字节数组
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            try {
                // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
                channel.waitForConfirmsOrDie();
            } catch (InterruptedException e) {
                // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发
            } catch (IOException e) {
                // 可以进行补发。重试几次发送或者利用redis+定时任务来完成补发                
            }
            System.out.print("发送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  这种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:

void waitForConfirmsOrDie() throws IOException, InterruptedException;

3.异步Confirm模式

  异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。需要注意的是不可以关闭channel和connection。

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 启动发送者确认模式
            channel.confirmSelect();
            
            /**
             * 发送消息到MQ
             * 参数一:交换机名称,为""表示不用交换机
             * 参数二:为队列名称或者routingKey.当指定了交换机就是routingKey
             * 参数三:消息的属性信息
             * 参数四:消息内容的字节数组
             */
            for (int i = 0; i< 500; i ++) {
                String message = "hello,message! confirmSelect " + i;
                channel.basicPublish("", "myQueue", null, message.getBytes());
            }
            
            //异步监听确认和未确认的消息
            channel.addConfirmListener(new ConfirmListener() {
                /**
                 * 消息没有确认的回调方法
                 * 参数一:没有确认的消息的编号
                 * 参数二: 是否没有确认多个
                 */
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
                }
                
                /**
                 * 消息确认后回调
                 * 参数一: 确认的消息的编号,从1开始递增
                 * 参数二: 当前消息是否同时确认了多个
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
//            try {
//                if (channel != null) {
//                    channel.close();
//                }
//                if (connection != null) {
//                    connection.close();
//                }
//            } catch (Exception ignore) {
                // ignore
//            }
        }
    }
}

3.消费者确认模式

  为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(Message Acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,rabbitMQ会等待消费者显式发回ack信号后从内存(和磁盘,如果是持久化消息)中删除消息。

  手动确认主要使用一些方法:

basicAck(); 用于肯定确认,multiple参数用于确认多个消息

basicRecover() 是路由不成功的消息可以使用recovery重新发送到队列中

basicReject() 是接收端告诉服务器这个消息我拒绝接受,可以设置是否回到队列中还是丢掉,而且一次只能一次拒绝一条消息。

basicNack() 可以一次拒绝N条消息,客户端可以设置basicNack()的multiple参数为true

例如:

package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 创建连接工程,下面给出的是默认的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 声明一个队列。
             * 参数一:队列名称
             * 参数二:是否持久化
             * 参数三:是否排外  如果排外则这个队列只允许有一个消费者
             * 参数四:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列
             * 参数五:队列的附加属性
             * 注意:
             * 1.声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
             * 2.队列名可以任意取值,但需要与消息接收者一致。
             * 3.下面的代码可有可无,一定在发送消息前确认队列名称已经存在RabbitMQ中,否则消息会发送失败。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 接收消息。会持续坚挺,不能关闭channel和Connection
             * 参数一:队列名称
             * 参数二:消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息
             * 参数三:消息接收者的标签,用于多个消费者同时监听一个队列时用于确认不同消费者。
             * 参数四:消息接收者
             */
            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    
                    String string = new String(body, "UTF-8");
                    // 获取消息的编号,根据编号确认消息
                    long deliveryTag = envelope.getDeliveryTag();
                    // 获取当前内部类中的通道
                    Channel channel = this.getChannel();
                    
                    try {
                        // 模拟处理程序
                        if (deliveryTag % 2 == 0) {
                            // 模拟处理业务, 报错
                            int i = 10 / 0;
                        }
                        
                        // 手动确认消息,确认以后表示当前消息已经成功处理了,需要从队列中移除。这个方法应该在当前消息的处理程序全部处理完成后确认。
                        System.out.println("处理消息成功, 消息: " + string);
                        channel.basicAck(deliveryTag, true);
                    } catch (Exception e) {
                        System.out.println("处理消息失败,重新回到队列, 消息: " + string);
                        // 捕捉到异常,重新回到队列
                        channel.basicReject(deliveryTag, true);
                    }
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}

  这里需要注意。如果一个消息设置了手动确认,就必须应答或者拒绝,否则会一直阻塞。

 

RabbitMQ事务性消息和确认模式

原文:https://www.cnblogs.com/qlqwjy/p/13934573.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!