rabbitmq作为成熟的企业消息中间件,实现了应用程序间接口调用的解耦,提高系统的吞吐量。
下面介绍下rabbitmq的一些基本概念:
在elong,我们开发了一套基于rabbitmq的消息系统,可以实现消息的可靠传输,提供了简单的restful api, 减少业务使用rabbitmq的学习成本。
serverIP : 服务器ip
Port:服务端口号
UserName: 用户名
PassWord: 密码
MaxPoolSize: 最大连接池大小
RequestedHeartbeat: 请求心跳检查时间(s)
RequestedConnectionTimeout: 请求连接超时时间
FailedLogBaseDir: 失败日志存储目录
ConnectionTimeOut: 连接保持时间(ms)
SendTimeOut: 发送超时(s)
ReceiveTimeOut: 接收超时时间(s)
SendLogBaseDir(发送日志目录)
消息发送客户端,提供发送消息的接口
流程图:
其中比较重要的是RabbitConnectPool(单例创建连接池),该类中比较重要的属性和方法
_max: //可以创建的最大连接数
_created: 已经使用的连接数
_used: 已经使用的连接数
_sendTimeOut: 发送连接请求超时时间
_receiveTimeOut: 接收连接成功的超时时间
_clientExpires: 连接到期时间
_connectionTimeOut: 连接超时
_qos:
重要的方法:
getSendingConnection(): 获取一个发送端的连接, 如果不是强制,就从连接池中获取连接,否则强制创建一个连接
getNextProxy:() 从连接池中过去连接(返回RabbitSendProxy), 如果超过最大连接数,则创建新连接, 否则加锁获取 proxy(pollProxy),如果返回为空,这等待,直到获取连接为止
pollProxy(): 获取连接, 从proxyqueue中poll,如果连接不可用,这_created–, 然后_used++, 如果创建条数 < 最大数, 这获取新连接(newProxy(), create++, _used++
returnToPool(): 返回到连接池,
getNewProxy(): 三次重试, getProxy(), 重试间隔0.1s
getProxy(): 通过工厂模式生成连接
主要属性:
isAvailable: 是否可用,默认true
createTime: 创建时间,
DisposeListener: 连接池关闭需要执行的接口
connectionTimeout: 连接超时时间, 当前时间-createTime <= connectionTimeout可用
receiveTimeOut: 接收超时时间
Connection: 最主要的类,com.rabbitmq.client.Connection 连接
qos: 服务器一次可以传输的消息条数
Channel : 管道,连接创建管道,进行数据传输
ConnectionFactory: 连接工厂,创建rabbitmq连接
主要操作:
isAvailable(): 连接是否可用
send(): 发送方法
流程:转换成byte数组->检查消息长度(小于64K) -> 缓存数据,等待确认->发送(basicPublish) -> 在接收到后删除缓存数据
下面说下如何保证数据一定能发送到rabbit queue中:
为了解决发送失败的问题,解决的思路无非是消息持久化,采用文件做持久化是比较好的选择。
具体的实现是消息失败后,放入blockingqueue作为数据换出的地方,定期从queue中读取数据存储文件,开启定时任务读取数据,重新send到queue中。
作用: 从rabbitmq中读取消息,通过http接口调用消费者
数据库如图:
jmg-server的流程:
消 息校验 -> 获取消息配置,找到消费者-> 判断没有正在处理 -> 消息还没有处理成功or 没有达到最大处理失败次数 – > 首次接收的消息入库- > 广播消息到接收方 -> 处理成功,记录messageLog,修改状态; 处理失败,发送到rabbitmq-server,等待下次处理.
采用集群的方式搭建, 通过nginx对外提供统一的url
集群中一些重要的概念:
network partition: 网络中断,一般是子网之间的设备中断,这样在不同子网的设备通信会出现问题
搭建集群:
abbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在 $home/.erlang.cookie 中(像我的root用户安装的就是放在我的root/.erlang.cookie中),文件是400的权限。所以必须包管各节点cookie对峙一致,不然节点之间就无法通信。
方案1: 普通集群
erlang 通过cookie来决定是否能和另外一个节点通信,通常的做法是在一个机器上生成cookie文件,拷贝到集群中的其他机器。
集群可以通过单逻辑broker的方式来连接多个机器。各机器间通过Erlang消息传递来通信,因此,集群内所有节点都必须有相同的Erlang cookie。集群内机器间的网络连接必须是可信的,且所有机器必须运行相同版本的Erlang和RabbitMQ。
虚拟机、交换机、用户和权限会自动镜像到集群内所有节点。队列可能位于单节点上,或者镜像到多个节点上。客户端连接到集群内任何节点都能看到集群内所有队列。
步骤
1
rabbit1$ rabbitmq-server -detached rabbit2$ rabbitmq-server -detached rabbit3$ rabbitmq-server -detached
2 加入以rabbit3为集群,集群名为rabbit@rabbit1,则需要在rabbit1和rabbit2上执行下面操作,加入rabbit@rabbit1,
rabbit2$ rabbitmqctl stop_app Stopping node rabbit@rabbit2 ...done. rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1 Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done. rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.
3 同样在rabbit3,上操作,加入rabbit@rabbit2
rabbit3$ rabbitmqctl stop_app Stopping node rabbit@rabbit3 ...done. rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2 Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done. rabbit3$ rabbitmqctl start_app Starting node rabbit@rabbit3 ...done.
方案2:镜像队列
上述配置的RabbitMQ默认集群模式,但并不包管队列的高可用性,尽管互换机、绑定这些可以复制到 集群里的任何一个节点,然则队列内容不会复 制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内 容到集群里的每个节点,须要创建镜像队列。
Federation允许一个broker上的交换机接收发布到另一个broker(这个broker可能是单独的机器或者集群)上的交换机的消息。为了节点间能够通过AMQP(带上SSL选项)通信,组成federation的两个交换机之间必须授予适当的用户和权限。
组成federation的交换机之间通过单向点对点连接。缺省情况下,在federation连接上,消息仅仅被转发一次,但是这样可增加更多、更复杂的路由拓扑。
在federation连接上,有些消息可能不会被转发;如果一条消息到达federated交换机后不能被路由到某个队列,则它不会被转发。
你可以在Internet上通过federation连接各个broker来pub/sub消息。
方案3: shovel
相比federation,工作在更低一层,shovel简单从一个broker的一个queue中消费消息,并传递到下一个broker的exchange上
the shovel simply consumes messages from a queue on one broker, and forwards them to an exchange on another.
参考资料:
3 http://www.rabbitmq.com/documentation.html
原文:http://www.cnblogs.com/200911/p/4764779.html