一、消息队列相关概念
二、安装rabbitmq
三、配置rabbitmq
四、运行时参数配置
五、rabbitmq集群
一、消息队列相关概念
消息中间件:
AMQP:高级消息队列协议
MQ是消费-生产者模型的一个典型的代表 //
一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息
无需即时返回且耗时的操作提取出来,进行了异步处理,
而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
消息生产者--->中间件 //中间件负责消息转发
1.AMQP组件:
1.Server:接受client请求,实现AMQP消息队列和路由功能的进程
2.VirtualHost,类似于权限控制组。一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
3.Exchange,接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout、headers和Topic,不同类型的Exchange路由的行为是不一样的。
4.Message Queue:消息队列,用于存储还未被消费者消费的消息。
5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
图一:AMQP逻辑模型
图二:AMQP协议模型
1.Modle Layer,位于协议最高层,主要定义了一些供客户端调用的命令。client可以利用这些命令实现收发消息等。
2.session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。
3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。
图三:
broker:的基本组件
Exchange:交换机,决定了消息路由规则;
Queue:消息队列;
Channel:进行消息读写的通道;
Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪一个消息队列;
virtualhost:虚拟主机,实现隔离,每一个虚拟主机内部都可以有以上三个组件。
中间件的实现:
Qpid,ActiveMQ(apahce)//java
RabbitMQ //erlang语言开发是AMQP协议的实现
Kafka //火热,重量级
0MQ/ZeroMQ //
http://blog.csdn.net/lovesomnus/article/details/52223288 对比
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定器,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
2.消息队列的使用过程如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange类型:
1.完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
2.对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
3.还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。
消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
3.消息队列的声明:
在Rabbit MQ中,无论是producter发送消息还是consumer接受消息,都首先需要声明一个MessageQueue。问题是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息的。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
因此如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
b)Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c)Durable:持久化,这个会在后面作为专门一个章节讨论。
d)其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
4.生产者发送消息:
在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。可以通过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。下面我们就分析一下不同的ExchangeType表现出的不同路由规则。
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType:
a)如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
图:Direct模型
b)如果是 Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。
图:Fanout模型
c)如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中
图:Topic模型
5.消费者订阅消息
在RabbitMQ中消费者有2种方式获取队列中的消息:
a)一种是通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。
b)另外一种方式是通过basic.get命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。
如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。
这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个即可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者,与RPC不同。 如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。
既然RabbitMQ提供了ACK某一个消息的命令,当然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。
6.持久化:
Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。
7.事务
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
8.Confirm机制
使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。
9.RabbitMQ的设计和消息队列的声明周期
RabbitMQ完全实现了AMQP协议,类似于一个邮箱服务。Exchange负责根据ExchangeType和RoutingKey将消息投递到对应的消息队列中,消息队列负责在消费者获取消息前暂存消息。
RabbitMQ中的MessageQueue主要由两部分组成:
AMQQueue,主要负责实现AMQP协议的逻辑功能。
BackingQueue用来存储消息
BackingQueue详解:
图:BackingQueue
RabbitMQ中BackingQueue又由5个子队列组成:Q1、Q2、Delta、Q3和Q4。RabbitMQ中的消息一旦进入队列,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态不断发生变化。RabbitMQ中的消息一共有5种状态:
a)Alpha:消息的内容和消息索引都保存在内存中;
b)Beta:消息内容保存在磁盘上,消息索引保存在内存中;
c)Gamma:消息内容保存在磁盘上,消息索引在磁盘和内存都有;
d)Delta:消息内容和索引都在磁盘上;
注意:对于持久化的消息,消息内容和消息索引都必须先保存到磁盘上,才会处于上述状态中的一种,而Gamma状态的消息只有持久化的消息才会有该状态
BackingQueue中5个子队列中的消息状态,Q1和Q4对应的是Alpha状态;Q2和Q3是Beta状态;Delta对应的是Delta状态。
上述就是 RabbitMQ 的多层队列结构的设计,我们可以看出从 Q1 到 Q4 ,基本经历的是由 RAM 到 DISK,再到 RAM 的设计。这样的设计的好处就是当队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,当负载降低的时候,这部分消息又渐渐回到内存,被消费者获取,使得整个队列有很好的弹性。
引起消息流动主要有两方面的因素:其一是消费者获取消息;其二是由于内存不足,引起消息的换出到磁盘上( Q1-.>Q2 、 Q2->Delta 、 Q3->Delta 、 Q4->Q3 )。
RabbitMQ 在系统运行时会根据消息传输的速度计算一个当前内存中能够保存的最大消息数量( Target_RAM_Count ),当内存中的消息数量大于该值时,就会引起消息的流动。进入队列的消息,一般会按着 Q1->Q2->Delta->Q3->Q4 的顺序进行流动,但是并不是每条消息都一定会经历所有的状态,这个取决于当时系统的负载状况。
当消费者获取消息时,首先会从 Q4 队列中获取消息,如果 Q4 获取成功,则返回,如果 Q4 为空,则尝试从 Q3 获取消息;首先,系统会判断 Q3 队列是否为空,如果为空,则直接返回队列为空,即此时队列中无消息(后续会论证)。如果不为空,则取出 Q3 的消息,然后判断此时 Q3 和 Delta 队列的长度,如果都为空,则可认为 Q2 、 Delta 、 Q3 和 Q4 全部为空 (后续说明 ) ,此时将 Q1 中消息直接转移到 Q4 中,下次直接从 Q4 中获取消息。如果 Q3 为空, Delta 不空,则将 Delta 中的消息转移到 Q3 中;如果 Q3 非空,则直接下次从 Q3 中获取消息。在将 Delta 转移到 Q3 的过程中, RabbitMQ 是按照索引分段读取的,首先读取某一段,直到读到的消息非空为止,然后判断读取的消息个数与 Delta 中的消息个数是否相等,如果相等,则断定此时 Delta 中已无消息,则直接将 Q2 和刚读到的消息一并放入 Q3 中。如果不相等,则仅将此次读到的消息转移到Q3 中。这就是消费者引起的消息流动过程。
图:状态转换
由于内存不足引起的消息换出。消息换出的条件是内存中保存的消息数量 + 等待 ACK 的消息的数量 >Target_RAM_Count 。当条件触发时,系统首先会判断如果当前进入等待 ACK 的消息的速度大于进入队列的消息的速度时,会先处理等待 ACK 的消息。步骤基本上 Q1->Q2 或者 Q3 移动,取决于 Delta 队列是否为空。 Q4->Q3 移动, Q2 和Q3 向 Delta 移动。
最后,我们来分析一下前面遗留的两个问题,一个是为什么 Q3 队列为空即可认定整个队列为空。试想如果 Q3 为空,Delta 不空,则在 Q3 取出最后一条消息时, Delta 上的消息就会被转移到 Q3 上,与 Q3 空矛盾。如果 Q2 不空,则在 Q3 取出最后一条消息,如果 Delta 为空时,会将 Q2 的消息并入 Q3 ,与 Q3 为空矛盾。如果 Q1 不空,则在 Q3 取出最后一条消息,如果 Delta 和 Q3 均为空时,则将 Q1 的消息转移到 Q4 中,与 Q4 为空矛盾。这也解释了另外一个问题,即为什么 Q3 和Delta 为空, Q2 就为空。
上述就是整个消息在 RabbitMQ 队列中流动过程。从上述流程可以看出,消息如果能够被尽早消费掉,就不需要经历持久化的过程,因为这样会加系统的开销。如果消息被消费的速度过慢, RabbitMQ 通过换出内存的方式,防止内存溢出。
二、安装rabbitmq
epel:rabbitmq-server
插件:rabbitmq-plugins {enable|disable|list}
rabbitmq_management,监听在15672端口//web界面的管理接口
yum install rabbitmq-server
/usr/sbin/rabbitmq-plugins //管理rabbitmq-plugins插件
/usr/sbin/rabbitmq-server
/usr/sbin/rabbitmqctl
rabbitmq-plugins list //查看插件
rabbitmq-plugins enable rabbitmq_management //启用该插件
[root@node1 ~]# rabbitmq-plugins list
[E]:明确启用的
[e]:被依赖启用
systemctl restart rabbitmq-server //启动插件后,会监听在某个端口,因此需要重启
http://192.168.5.113:15672/
guest:guest
页面介绍:
connections:client连接
Channels:频道。用户与queue关联的组件。//消费者与生产者之间的联系
Queues:队列,订阅的数据。
三、配置rabbitmq
配置方式:
环境变量:定义网络参数及配置文件路径等
配置文件:服务器各组件的权限、资源限制、插件及集群
运行时参数:集群的运行时参数设定。
/usr/lib/rabbitmq/bin/rabbitmq-defaults //默认配置
[root@localhost ~]# cat /usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/sbin/rabbitmq-defaults |egrep -v '(^#|^$|^[[:space:]]+)'
SYS_PREFIX= ERL_DIR= CLEAN_BOOT_FILE=start_clean SASL_BOOT_FILE=start_sasl CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq LOG_BASE=${SYS_PREFIX}/var/log/rabbitmq MNESIA_BASE=${SYS_PREFIX}/var/lib/rabbitmq/mnesia ENABLED_PLUGINS_FILE=${SYS_PREFIX}/etc/rabbitmq/enabled_plugins PLUGINS_DIR="${RABBITMQ_HOME}/plugins" CONF_ENV_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq-env.conf
1.环境变量:/etc/rabbitmq/rabbitmq-env.conf
变量优先级:
shell环境变量->rabbitmq-env.conf->内置变量值
RABBITMQ_BASE(Not used) //安装目录,安装rabbitmq的安装和数据库和日志文件
RABBITMQ_CONFIG_FILE$RABBITMQ_HOME/etc/rabbitmq/rabbitmq //配置文件路径
RABBITMQ_MNESIA_BASE$RABBITMQ_HOME/var/lib/rabbitmq/mnesia
RABBITMQ_MNESIA_DIR$RABBITMQ_MNESIA_BASE/$RABBITMQ_NODENAME
RABBITMQ_LOG_BASE$RABBITMQ_HOME/var/log/rabbitmq
RABBITMQ_LOGS$RABBITMQ_LOG_BASE/$RABBITMQ_NODENAME.log //日志路径
RABBITMQ_NODE_IP_ADDRESS //监听的ip
RABBITMQ_SASL_LOGS$RABBITMQ_LOG_BASE/$RABBITMQ_NODENAME-sasl.log
RABBITMQ_PLUGINS_DIR$RABBITMQ_HOME/plugins //插件路径
RABBITMQ_PLUGINS_EXPAND_DIR$RABBITMQ_MNESIA_BASE/$RABBITMQ_NODENAME-plugins-expand
RABBITMQ_ENABLED_PLUGINS_FILE$RABBITMQ_HOME/etc/rabbitmq/enabled_plugins
RABBITMQ_PID_FILE$RABBITMQ_MNESIA_DIR.pid
2.配置文件:/etc/rabbitmq/rabbitmq-env.conf
auth_mechanisms:认证机制,是amqp还是明文认证,都是基于SASL的,默认使用amqp自己的认证机制
SASL认证
内置的机制:
PLAIN
AMQPLAIN //默认
RABBIT-CR-DEMO
default_user: guest
default_pass: guest
default_permission:默认权限,共有三个组件,分别定义权限。支持模式匹配
disk_free_limit:磁盘的最少预留空间,默认50M
rabbitmqctl environment| grep -i disk // {disk_free_limit,50000000},
heartbeat:client与broker之间是否建立持久链接//默认580ms
hipe_comple:使用高性能的hipe编译器 //erlang原生的编译器性能较差,用这个提高性能 false/true
log_levels:日志级别,默认info {none|info|warning|error}
tcp_listeners:监听的地址和端口
ssl_listeners:基于ssl通信协议监听的地址和端口,默认也是tcp_listeners的地址
vm_memory_high_watermark:定义最高使用多少内存 //40%的物理内存
rabbitmqctl environment| grep -i water // {vm_memory_high_watermark,0.4},
//多数都有默认值
四、运行时参数配置
rabbitmqctl -h
Commands: stop [<pid_file>] //停止整个erlang虚拟机 stop_app//停止虚拟机上的应用程序 http://192.168.5.113:15672/#/ 将不能访问,因为该管理控制台,也是其应用之一 start_app wait <pid_file> reset force_reset rotate_logs <suffix> //集群 join_cluster <clusternode> [--ram] cluster_status change_cluster_node_type disc | ram //磁盘和ram(内存)两种方式 forget_cluster_node [--offline] update_cluster_nodes clusternode sync_queue queue cancel_sync_queue queue set_cluster_name name //用户管理 add_user <username> <password> delete_user <username> change_password <username> <newpassword> clear_password <username> set_user_tags <username> <tag> ... list_users //权限管理&虚拟主机相关: add_vhost <vhostpath> delete_vhost <vhostpath> list_vhosts [<vhostinfoitem> ...] set_permissions [-p <vhostpath>] <user> <conf> <write> <read> clear_permissions [-p <vhostpath>] <username> list_permissions [-p <vhostpath>] list_user_permissions <username> //设置参数(运行时参数) set_parameter [-p <vhostpath>] <component_name> <name> <value> clear_parameter [-p <vhostpath>] <component_name> <key> list_parameters [-p <vhostpath>] //策略(运行时参数) set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition> clear_policy [-p <vhostpath>] <name> //策略 list_policies [-p <vhostpath>] //组件查看 list_queues [-p <vhostpath>] [<queueinfoitem> ...] list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] list_connections [<connectioninfoitem> ...] list_channels [<channelinfoitem> ...] list_consumers [-p <vhostpath>] //broker状态查看 status//查看配置和参数,AMQP监听5672,集群监听:25672,rabbitmq-manage监听在:15672 report:生成状态报告 //rabbitmqctl report //环境变量查看 environment eval <expr> //执行erlang底层表达式 //关闭指定链接 close_connection <connectionpid> <explanation> //追踪 trace_on [-p <vhost>] trace_off [-p <vhost>] set_vm_memory_high_watermark <fraction> //运行时也可以修改,默认0.4
用户管理:
[root@node1 rabbitmq]# rabbitmqctl add_user test apss [root@node1 rabbitmq]# rabbitmqctl list_users Listing users ... guest[administrator] test[] [root@node1 rabbitmq]# rabbitmqctl set_user_tags test administrator [root@node1 rabbitmq]# rabbitmqctl list_users guest[administrator] test[administrator] [root@node1 rabbitmq]# rabbitmqctl set_user_tags test //删除tag
权限管理&虚拟主机相关:
[root@node1 rabbitmq]# rabbitmqctl list_permissions Listing permissions in vhost "/" ... guest.*.*.* //一个主机上有三个组件{exchange,bingding,queue} 配置文件 写权限 读权限 [root@node1 rabbitmq]# rabbitmqctl add_vhost /myhost2 [root@node1 rabbitmq]# rabbitmqctl add_vhost /myhost2/test2 [root@node1 rabbitmq]# rabbitmqctl delete_vhost /myhost2/test2 [root@node1 rabbitmq]# rabbitmqctl list_vhosts [root@node1 rabbitmq]# rabbitmqctl list_permissions [root@node1 rabbitmq]# rabbitmqctl list_user_permissions guest [root@node1 rabbitmq]# rabbitmqctl set_permissions -p /myhost2 test ".*" " .*" ".*" [root@node1 rabbitmq]# rabbitmqctl list_user_permissions test Listing permissions for user "test" ... /myhost2.* .*.* ...done. [root@node1 rabbitmq]# rabbitmqctl add_user wolf wolf [root@node1 rabbitmq]# rabbitmqctl set_permissions -p /myhost2 wolf ".*" ".*" ".*" [root@node1 rabbitmq]# rabbitmqctl set_user_tags wolf administrator [root@node1 rabbitmq]# http://192.168.5.113:15672/#/exchanges //只有/myhost2 //wolf只对myhost2是管理员权限 使用wolf用户登录。对/ 的授权,需要对/myhosts 另外授权。
Reseful风格配置查看:
http://192.168.5.113:15672/api/exchanges http://192.168.5.113:15672/api/overview http://192.168.5.113:15672/api/queues 队列内容
SASL:
简单验证安全层 (Simple Authentication Security Layer, SASL)
SASL 库称为 libsasl。
SASL 为应用程序和共享库的开发者提供了用于验证、数据完整性检查和加密的机制。
开发者可通过 SASL 对通用 API 进行编码。
SASL 特别适用于使用 IMAP、SMTP、ACAP 和 LDAP 协议的应用程序,因为这些协议全都支持 SASL。
五、rabbitmq集群
rabbitmq非常关键,需要进行高可用
HA:
RABBITMQ Cluster: //将每一个node上持有的信息,同其他节点进行同步
[] -- []
\ /
[]
LB:
/[]
=>[haproxy]-- []
\[]
//健康状态监测和
实验1:rabbitmq的HA集群
1.环境:
node1 :192.168.5.113
node2 :192.168.5.134 //都已经安装和启动rabbitmq-server ,并且都已经enable rabbitmq_management
vim /etc/hosts 两个都一样
192.168.5.113node1
192.168.5.136node2
systemctl cluster_status 看到的都是
[root@localhost ~]# rabbitmqctl cluster_status //默认主机名是localhost,不是的话修改过来
Cluster status of node rabbit@localhost ... [{nodes,[{disc,[rabbit@localhost]}]}, {running_nodes,[rabbit@localhost]}, {cluster_name,<<"rabbit@localhost">>}, {partitions,[]}] ...done.
node1加入node2的集群或者node2加入node1的集群都可以,只要把集群名改一致即可
2.node2和node1的配置
node1:
hostnamectl set-hostname node1
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
[root@node1 ~]# rabbitmqctl cluster_status //修改完名字需要重新登录,否则结果仍是"rabbit@localhost"
{cluster_name,<"rabbit@node1">>}
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
node2:
hostnamectl set-hostname node2
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl cluster_status //验证
[root@node112 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node rabbit@node112 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2]}]}]
...done.
rabbitmqctl start_app //加入后就可以直接查看
3.注意事项
注意:解析主机名和实际主机名一致
cookie和hostname和/etc/hosts要匹配。
时间同步、各节点启用rabbitmq_manager插件
在master节点:
复制其cookie至其他各节点,要注意保持其权限为400
/var/lib/rabbitmq/.erlang.cookie
在各从节点上:
停止应用:
rabbitmqctl stop_app
加入集群:
rabbitmqctl join_cluster CLUSTER_NAME
启用应用:
rabbitmqctl start_app
http://192.168.5.113:15672 //两个node正常运行
rabbitmqctl list_vhosts //两个节点上将一样
相关命令:
join_cluster <clusternode> [--ram]
cluster_status
change_cluster_node_type disc | ram
forget_cluster_node [--offli ne]
update_cluster_nodes clusternode
sync_queue queue //同步某个队列
cancel_sync_queue queue
set_cluster_name name
实验2:基于haproxy的LB集群
基于haproxy的LB集群:
listen rabbitmq_test :5672 {
mode tcp
status enable
balance roundrobin
server rabbit01 IP:PORT check inter 5672
server rabbit02 IP:PORT check inter 5000
}
参考博客:https://www.cnblogs.com/qixuejia/p/5997054.html
环境变量参考:
http://blog.csdn.net/zdq0394123/article/details/14517161
https://www.cnblogs.com/zhen-rh/p/6884297.html
原文:http://blog.51cto.com/hmtk520/2050847