主要解决应用解耦,流量削峰,异步消息,实现高性能,可升缩,最终一致性的架构。
基于队列(点对点)与发布订阅(有多个消费者)
AMQP 高级消息队列模式,是一个异步消息传递所使用的应用层协议规范, 此协议的客户端和中间件可以无视消息来源,不受客户端,消息中间件,不同开发语言环境等条件限制。
1.下载并安装erlang,下载地址:http://www.erlang.org/download
2.配置erlang环境变量信息
新增环境变量ERLANG_HOME=erlang的安装地址
将%ERLANG_HOME%\bin加入到path中
3.下载并安装RabbitMQ,下载地址: http://www.rabbitmq.com/download.html
4. 使用
RabbitMQ 管理平台地址 http://127.0.0.1:15672
默认账号:guest/guest 用户可以自己创建新的账号
Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那 RabbitMQ呢?RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息 服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服 务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不 能互通。
5.windows访问
RabbitMQ Server 安装后无法访问http://127.0.0.1:15672/的原因及解决办法
然后输入以下内容
打开RabbitMQ节点
rabbitmqctl start_app
开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上
rabbitmq-plugins enable rabbitmq_management
关闭rabbitMQ节点
rabbitmqctl stop
输入完毕
我们来访问
http://127.0.0.1:15672/
相当与mysql里面可以建不同的库。
1. fanout 扇型交换机:将消息路由给绑定到它身上的交换机。
2. direct 直连交换机,根据消息所携带的routingKey将消息投递给指定的队列。
3. topic 主题交换机,根据消息的路由值,将消息投递给指定的队列。
4. header 头交换机,(用的不多)
https://www.rabbitmq.com/getstarted.html 队列模型官方文档
①一个生成者投递一个消费者,只能允许有一个消费者进行消费,如果是消费者集群,者进行均摊消费(取模算法)。(当集群的配置不一时,均摊不是很合理。)
②队列和消费者建立长连接
③自动应答与手动应答
自动应答:不在乎消费者付这个消息是否处理成功,都会告诉队列删除 该消息。如果消息处理失败的情况下,实现自动补偿。
手动应答:消息处理完业务逻辑,手动返回ack(通知)。告诉给队列服 务器是否删除消息
④推和取
推:消费者启动后,建立长连接,一旦生成者向队列投递消息,会立马推送给消费者。
取:生成者先投递消息到队列进行缓存,这时候消费者再去队列获取消息。
生成者投递消息到交换机,交换机根据不同的路由策略转发不同的队列服务器中。
交换机是没有存储功能的,如果交换机没有绑定队列,则消息会丢失。
以用户注册->发送邮件->发送短信 这个场景进行说明。应用解耦。
实现原理:
一个队列可以添加多个routingKey
和路由模式类似,只不过在队列绑定routingKey时可以使用通配符。
符号#:匹配零个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
采用应答模式,分为自动应答和手动应答。
自动应答:业务逻辑代码运行完自动提交。
手动应答:业务代码运行完手动设置sck告诉队列服务器。
在创建队列的时候可以设置消息是否持久化。
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
server:
port: 8080
spring:
#给项目来个名字
application:
name: rabbitmq-provider
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /admin_host
#消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirms: true
#确认消息已发送到队列(Queue)
publisher-returns: true
会产生四种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功
四种情况的返回结果:
①
2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘non-existent-exchange‘ in vhost ‘JCcccHost‘, class-id=60, method-id=40)
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:false
ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘non-existent-exchange‘ in vhost ‘JCcccHost‘, class-id=60, method-id=40)
②
ReturnCallback: 消息:(Body:‘{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }‘ MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回应码:312
ReturnCallback: 回应信息:NO_ROUTE
ReturnCallback: 交换机:lonelyDirectExchange
ReturnCallback: 路由键:TestDirectRouting
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:true
ConfirmCallback: 原因:null
③ 这里返回和①的情况是一致的
④
ConfirmCallback: 相关数据:null
ConfirmCallback: 确认情况:true
ConfirmCallback: 原因:null
1. connection 连接
无论是consumer 和 provider 和broker 通讯都需要建立连接,这个连接就是一条TCP连接,也就是connection。
2. 思考既然connection 可以完成所有通信的工作,为什么还要引入信道?
在高并发场景下如果有很多线程消费或者生产,这时需要频繁的销毁和建立TCP连接(建立TCP连接需要三次握手,销毁TCP连接需要四次握手),RabbitMQ采用在connection连接上建立虚拟的连接,也就是channel。 RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
1. 报错
inequivalent arg ‘durable‘ for exchange ‘delay_exchange‘ in vhost ‘/‘: received ‘false‘ but current is ‘true‘
一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。
2. 注意事项
在消费端最好也要对队列,交换机,等做初始化并绑定,不然如果队列,交换机等在生产者还没有投递消息,没有创建队列和交换机的时候,消费者启动会报错。
如果消费端程序报错,则一直重试直到消费成功为止,可以通过配置文件修改重试次数, 重试时间。
如果设置了手动应答,并且参数设置为以下参数,则此消费者的最大重试次数失效。
channel.basicReject(deliveryTag, true);
server:
port: 8081
spring:
#给项目来个名字
application:
name: rabbitmq-provider
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /admin_host
listener:
simple:
retry:
####开启消费者异常重试
enabled: true
####最大重试次数
max-attempts: 5
multiplier: 5 #应用于前一重试间隔的乘法器。
####重试间隔次数
initial-interval: 2000
####开启手动ack
acknowledge-mode: manual
1.冥等性问题。避免重复处理。
2.如何正确的选择重试机制。
情况一:如果消费者是调用第三方接口超时,或者第三方接口异常等,这个时候可以开启重试机制,但需要注意重复处理的问题。
情况二:如果消费者获取消息后,本身bug导致的异常,就没有必要重试了。
针对第二种情况可以这么处理:
如:商户注册成功需要发送短信和邮箱信息
传统情况下 注册-发送短信-发送邮箱
这里对这个流程进行解耦: 提高QPS(每秒处理的请求次数)
1.写入会员表成功。
2.写入消息记录表,并设置为未发送。
3.查询消息记录表,发送消息到broker同时更新为发送中。
4.假设消息接收成功,broker接收消息成功,并返回给生成者,这时候消息记录表更新为投递成功。
5.假设消息接收失败,可能失败的地方有两个处:Borker接收消息和返回消息都可能发生网络问题或者其他状况,导致生产者接收不到返回值;
6. 定时任务轮询,规定时间内没有发送成功消息再次按照步骤2进行投递,这个规定时间最多容忍2或者次查询轮询同一个条消息,如果依然接收不成功,那么则设置消息接收失败,这里可能是交换机或者队列绑定失败造成的,需要我们人工排查;
①确保生产者能够投递到mq服务器
如果投递到mq服务器,通过confirm机制进行重试。
②消费者消费失败了,生产者是不需要回滚事务。
解决办法:消费者采用手ack的方式,采用MQ进行补偿重试,注意MQ的冥等性问题。
③如果消费者消费成功,但是生产者confirm确认时失败。
原文:https://www.cnblogs.com/chenfei-java/p/12650033.html