mandatory
mandatory=true
,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列,RabbitMQ会调用 Basic.Return
命令将消息返回给生产者,生产者通过调用 channel.addReturnListener
添加监听器接收返回结果
mandatory=false
,上述情形下,RabbitMQ 将消息直接丢弃
immediate
immediate=true
,如果交换器在消息路由到队列时发现队列上并不存在任何消费者,该消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return
返回生产者
和mandatory相比,mandatory如果路由不到队列则返回消息,immediate如果队列中没有消费者则返回消息
备份交换器(Alternate Exchange)
AE可以将未被路由的消息存储到 RabbitMQ 中。简化了mandatory
+addReturnListener
的编程逻辑。
Map<String,Object> args = new HashMap<String,Object>();
args.put("alternate-exchange","myAe");
// 声明普通交换器(AE交换器作为备份交换器)
channel.exchangeDeclare("normalExchange","direct",true,false,args);
// 声明AE交换器
channel.exchangeDeclare("myAe","fanout",true,false,null);
// 普通队列 绑定 普通交换器
channel.queueBind("normalQueue","normalExchange","normalKey");
// 声明 未路由队列
channel.queueDeclare("unroutedQueue",true,false,false,null);
// 未路由队列 绑定 AE交换器
channel.queueBind("unroutedQueue","myAe","");
特殊情况
通过队列属性设置消息TTL
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-message-ttl",6000); // 单位毫秒
channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
消息过期:一旦过期,从队列中抹去。因为消息在队列头部,RabbitMQ只需要定期从头部开始扫描是否有过期消息即可。
设置每条消息TTL
在 channel.basicPublish
方法中加入 expiration 参数,单位毫秒
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
builder.expiration("60000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
消息过期:消息过期后,不会马上从队列中抹去,在即将投递到消费者之前判定。每条消息过期时间不同,删除所有过期消息势必要扫描整个队列,因此不如等到消息需要消费时再判定是否过期,若过期则删除。
设置队列的TTL
通过 channel.queueDeclare
方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间
RabbitMQ 会确保在过期时间到达后将队列删除,在 RabbitMQ 重启后,过期时间会重置
当消息在一个队列中变成死信,会被重新发送到死信交换器(Dead-Letter-Exchange, DLX),绑定DLX的队列称为死信队列
死信原因:1.消息被拒绝; 2.消息过期; 3. 队列达到最大长度
绑定死信队列:在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为此队列添加 DLX
消息当被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,才能拿到消费
场景:
订单超时支付,延时队列做异常处理;
智能设备在指定时间进行工作,延时队列做指令推送;
用法:
具有高优先级的队列有高的优先权,优先级高的消息优先被消费
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("message").getBytes());
远程过程调用(Remote Procedure Call),通过网络从远程计算上请求服务。应用部署在A服务器上,想要调用B服务器上提供的函数或者方法,需要通过网络表达调用的语义和传达调用的数据。
RPC的协议包括:Java RMI、WebService的RPC、THrift、RestfulAPI等。
String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
RPC 处理流程:
持久化可以提高 RabbitMQ 的可靠性,防止在异常情况(重启、关闭、宕机)下数据丢失
持久化的各种情况
RabbitMQ 持久化分为3个部分:交换器的持久化、队列的持久化、消息的持久化
默认情况下,生产者不知道消息有没有正确到达服务器。因此引入事务和发送方确认机制。
事务机制
事务方法:
事务流程:
事务问题:事务机制会耗尽 RabbitMQ 的性能
发送方确认机制
发送方确认机制好处:相比于事务,它是异步非阻塞的。可以在等待信道返回确认的同时,继续发送下一条消息,当消息得到确认后,生产者可以通过回调方法处理确认消息。
发送方确认机制的优势在于不一定需要同步确认:
注意:批量确认提升了confirm效率,但是返回Basic.Nack或者超时,客户端需要将这一个批次的消息全部重发,会带来明显的重复消息数量。消息经常丢失时,批量confirm性能应该不升反降。
消息分发
消息顺序性
顺序性指的是消费者消费的消息和发布者发布的消息的顺序是一致的
顺序性打破的情况:
弃用 QueueingConsumer
消息传输保障等级
At most once:最多一次。消息可能丢失,但绝不会重复传输
At least once:最少一次。消息绝不会丢失,但可能重复传输
Exactly once:恰好一次。每条消息肯定会,有且传输一次
最少一次:需要考虑 事务、mandatory、持久化处理、autoAck
最多一次:无须考虑以上问题,随便发送与接收
恰好一次:RabbitMQ 目前无法保障。比如消费完Ack闪断,或者生产者发送消息到RabbitMQ,返回确认消息时网络闪断。
去重一般是通过业务客户端引入GUID实现
原文:https://www.cnblogs.com/hainingwyx/p/14623884.html