一、简介
RabbitMQ,它是干嘛用的呐?它是用来发消息的,消息队列,那它跟我们之前的学习的python的线程queue和进程的queue有什么区别呢?其实他们干的事情都是一样的。先来说说我们之前学习的python的queue吧。
- 线程queue:只是用于多个线程之间,进行数据同步交互的。
- 进程queue:只是用户父进程与子进程进行交互,或者属于同一父进程下的多个子进程进行交互。
如果是两个独立的程序,即便是python 程序,两个完全独立的python程序也依然是不用这个python的这个线程或者进程queue来通信的。
那么问题来了,我现在两个独立的python程序,或者python跟Java程序,或者跟PHP程序,或者两台独立机器之间的也涉及到生产者消费者模型,这个时候用python的线程queue和进程queue就通信不了了。那怎么办呢?这个时候我们只能搞一个中间代理,这个中间代理就是RabbitMQ。
二、消息发送方式如图:
三、rabbitMQ安装
windows下直接到官网下载即可 地址:http://www.rabbitmq.com/install-windows-manual.html
安装完毕后在windows的服务中即出现了rabbitMQ的服务,如果没有启动建议设置为自动随机启动。
四、rabbitMQ工作原理如下:
五、rabbitMQ基本应用示例
1、 生产者(Producer)主要工作步骤如下:
建立socket->声明管道->声明queue->通过一个空的exchange发送内容至queue(不能直接发送到队列)->关闭连接
import pika #通过这个实例先建立一个socket connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #声明一个管道 channel = connection.channel() #声明queue channel.queue_declare(queue="basic_1") #这边给queue起名字叫"basic_1" #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange="", routing_key="basic_1", #queue的名字 body="hello world") #body是你发送的内容 print("[x] Sent ‘hello world‘") #直接关闭连接 connection.close()
2、消费者(Consumer)主要工作步骤如下:
创建socket连接->创建管道->声明queue->声明回调函数callback ->消费的消息->开启消费
## 消费者有可能在其他的机器上 import pika #建立一个socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #创建一个管道 channel = connection.channel() #You may ask why we declare the queue again ? we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue="basic_1") def callback(ch,method,properites,body): print("--->",ch,method,properites) print(" [x] Received %r" % body)
‘‘‘ ch:是send端管道的内存对象的地址
method:指的send端的是发给谁,发给哪个Q的一些信息,一般不怎么用
properites:send端的属性,这边至的send端发过来给recive端的属性
body:是send端发过来的消息
‘‘‘
channel.basic_consume(#消费的消息 callback, #如果收到消息,就调用callback函数来处理消息 queue="basic_1",#queue的名字 no_ack=True) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) #这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住 channel.start_consuming()
总结:
1、consumers中如果不声明queue的话,则如果consumers先启动,则会报错。如果是producer先启动,consumers后启动则不报错。但是如果说consumer声明了,consumer先启动就不会报错。如果是producers先启动的话,则忽略。
2、所有的socket传输都是bytes类型。
3、消费者和生产者不一定在同一台机器上,在其他机器上运行也是可以的。
4、consumers启动以后会一直运行下去,它会永远的收下去。producers可以运行多次,只要运行一次,consumers就会接收一次。
六、消息分发轮询
1、一个生产者对应多个消费者是采用轮询机制,公平的依次发给每一个消费者,每个消费者消费1个。
2、一个生产者对应多个消费者,生产者发送多次消息,是采用轮询的机制,公平的分给每一个消费者。
3、消费者代码中no_ack=True,一般情况下是不加的,保证了连接断开,消息就会转给下一个消费者。当添加了之后,代表消费者拿到数据的时候,ranbbitMQ即删除数据,如果此时消费者出现异常,则会产生数据丢失的情况(删除数据需要在消费者的callback函数中添加下面一段:
channel.basic_ack(delivery_tag=method.delivery_tag)
)。
4、RabbitMQ判断如果socket断了,就知道连接断了,消息就会转给下一个消费者。
5、消费者的启动顺序,代表着是第几个消费者。
七、RabbitMQ 消息持久化
1、队列持久化 :此时当rabbitMQ重启后,队列存在,但队列中数据消失
在生产者和消费者的队列声明时,均变为:
#声明队列queue
channel.queue_declare(queue=‘hello‘,durable=True)
2、消息持久化:此时当rabbitMQ重启后,队列存在,但队列中数据仍存在,即消息持久化
首先需要队列持久化,然后在生产端消息发布消息时修改为:
#发送数据
channel.basic_publish(exchange=‘‘,routing_key=‘hello5‘,
body=‘hello,you are welcomed!111222‘,
properties=pika.BasicProperties(
delivery_mode=2,
)
)
即可实现消息持久化,即当rabbitMQ重启后,该消息未消费时,仍将继续在队列中。
八、消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
|
channel.basic_qos(prefetch_count = 1 ) |
注意了,这种公平指的是你消费者有多大本事,就干多少活,你消费者处理的越慢,我就分发的少,你消费者处理的越多,处理的快,我就多发点消息。我server端给客户端发消息的时候,先检查一下,你现在还有多少消息,你如果处理的消息超过1条,我就不给你发了,就是你当前消息没有处理完毕,我就不给你发消息了,没有消息,我就给你发。
九、exchange的类型(广播模式)
之前的例子都基本都是队列级别的 1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,先来说说exchange的官方说明:
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
- fanout:所有bind到此exchange的queue都可以接收消息(纯广播的,所有消费者都能收到消息)
- direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
- topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
- headers:通过headers 来决定把消息发给哪些queue(这个很少用,一般情况下,我们用不到)
9.1 fanout广播模式
说明:fanout这种模式是所有绑定exchange的queue都可以接收到消息。exchange=>转换器
生产者(fanout_publiser)
说明:跟之前写的不同,生产者这边并没有声明queue,因为生产者是以广播的形式,所以这边不需要声明queue
import pika #创建socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters (host=‘localhost‘)) #创建管道 channel = connection.channel() #声明exchange,且exchange的名字是logs,exchange的类型为fanout channel.exchange_declare(exchange=‘logs‘,exchange_type="fanout") #发送的消息 message = "info:hello world" #生产一个消息 channel.basic_publish( exchange="logs", routing_key=‘‘, body=message ) print("[X] Send {0}".format(message)) #关闭连接 connection.close()
消费者(fanout_consumer)
说明:消费者这边要声明一个唯一的queue_name的对象,并且从对象中获取queue名
import pika #创建一个socket connection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost")) #创建一个管道 channel = connection.channel() #声明exchange,exchange的名字logs,类型是fanout广播模式 channel.exchange_declare(exchange="logs", exchange_type="fanout") #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除,result是queue的对象 result = channel.queue_declare(exclusive=True) #exclusive=>排他的,唯一的 #获取queue名 queue_name = result.method.queue #绑定exchange channel.queue_bind(exchange="logs", queue=queue_name) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) #声明回调函数 def callback(ch,method,properties,body): "回调函数" print("[X] {0}".format(body)) #消费者消费 channel.basic_consume(callback, queue=queue_name, no_ack=True) #启动消费模式 channel.start_consuming()
1、服务端没有声明queue,为什么客户端要声明一个queue?
生产者发消息到exchange上,exchange就会遍历一遍,所有绑定它的queue,然后把消息发到queue里面,它发了queue就不管了,消费者从queue里面去收,所以就收到广播了,而不是说exchange直接就把消息发给消费者,消费者只会从queue里去读消息,且拿着queue去绑定exchange。
2、为什么queue要自动生成,而不是自己手动去写?
这个queue只是为了收广播的,队列是当消费者连接时自动生成的,每次生成的队列不一样,当消费者停止消费时,队列自动销毁
3、广播实时性
广播是实时的,你不在的时候,就是你消费者没有开启的时候,发消息的时候,就没有收到,这个时候就没有了。如果消费者开启了,生产者发消息时,消费者是收的到的,这个又叫订阅发布,收音机模式
9.2 Direct广播模式
队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
direct广播模式逻辑图:
生产者代码:
import pika,sys connection = pika.BlockingConnection(pika.ConnectionParameters ("localhost")) channel = connection.channel() #定义direct类型的exchange channel.exchange_declare(exchange="direct_logs", exchange_type="direct") #定义重要程度,定义什么级别的日志 severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ‘ ‘.join(sys.argv[2:]) or "hello world" #发送消息 channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message ) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者代码:
import pika,sys connection = pika.BlockingConnection(pika.ConnectionParameters ("localhost")) channel = connection.channel() #定义direct类型的exchange channel.exchange_declare(exchange="direct_logs",exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #手动输入安全级别 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) #循环遍历绑定消息队列 for severity in severities: channel.queue_bind(exchange="direct_logs", queue=queue_name, routing_key=severity) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) def callback(ch,method,properites,body): "回调函数" print(" [x] %r:%r" % (method.routing_key, body)) #消费消息 channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
这种模式下,根据服务器执行程序时的参数的不一样而消费不同广播的数据。
消费者在启动时需添加参数 info warning error 其中之一,即指定了消费的severity
则生产端启动时默认不输入时为info级别,否则按输入的来,按照指定的routing_key发送数据。
9.3 topic细致的消息过滤模式
在direct模式中我们做了一个区分,把error、warning绑定级别把消息区分了。我们回到日志上,如果想做的更细致的区分,比如说,你现在搜索的有error,有warning等,在Linux上有一个系统日志,这个系统日志搜索所有应用的系统日志。所有程序都在这个日志里面打日志。那如果我想划分出来。什么是mysql的发出来的日志,什么是apache发出来的日志。然后mysql日志里面同时是info,又包含warning,error。Apache也是一样,所以我们要做更细的区分,更细致的消息过滤。
topic广播模式逻辑图:
代码实现:
生产者:
import pika,sys connection = pika.BlockingConnection(pika.ConnectionParameters ("localhost")) channel = connection.channel() #声明一个topic的exchange channel.exchange_declare(exchange="topic_logs", exchange_type="topic") routing_key = sys.argv[1] if len(sys.argv) > 1 else "anonymous.info" message = " ".join(sys.argv[2:]) or "hello world" channel.basic_publish(exchange="topic_logs", routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者:
import pika,sys connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() #声明topic类型的exchange channel.exchange_declare(exchange="topic_logs", exchange_type="topic") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue banding_keys = sys.argv[1:] if not banding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) #循环绑定queue for banding_key in banding_keys: channel.queue_bind(exchange="topic_logs", queue=queue_name, routing_key=banding_key) print(‘ [*] Waiting for logs. To exit press CTRL+C‘) #回调函数 def callback(ch,method,properites,body): "回调函数" print(" [x] %r:%r" % (method.routing_key, body)) #消费者消费 channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
生产端执行:
python topic_sender.py mysql.info system srated sucess!
python topic_sender.py app.error nullpiont error!
即在生产者的第一个参数里写明字段,
如应用 app.error app.info app.warning
mysql mysql.error mysql.info mysql.warning
等等
第二个参数写各种情况的具体信息,如异常、信息和警告。
在消费时,按照下面规则进行匹配
To receive all the logs run: =>
# 是匹配所有的
python receive_logs_topic.py "#"
#只匹配app开头的
python receive_logs_topic.py "app.*"
#只匹配error结尾的
python receive_logs_topic.py "*.error"
You can create multiple bindings:
#创建多个接收队列
python receive_logs_topic.py "app.*" "*.error"
#只匹配一类消息
python receive_logs_topic.py "app.info"
9.4 rabbitMQ RPC实现
之前我们都是单向发送消息,客户端发送消息给服务端,那么问题来了,我现在发一个命令给远程客户端,让它去执行,执行之后的结果,我想把这个结果再返回。这个模型叫什么呐,这种模型叫RPC=>remote procedure call。
怎么返回这个消息呢?
答:就server 端和客户端既是消费者,又是生产者。
RPC模式逻辑图:
代码实现:
RPC ----CLIENT
import pika,uuid,time class FibonacciRpcClient(object): "斐波那契数列rpc客户端" def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters (host="localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response,no_ack=True, queue=self.callback_queue) def on_response(self,ch,method,props,body): print("---->",method,props) if self.corr_id == props.correlation_id: #我发过去的结果就是我想要的结果,保持数据的一致性 self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n)) while self.response is None: self.connection.process_data_events() #非阻塞版的start_consumer() print("no msg....") time.sleep(0.5) return int(self.response) if __name__ == "__main__": fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
注:
1、想不阻塞,而是想每过一段时间,就过来检查一下,就不能用start_consumer,而是用connection.process_data_evevts(),它是不阻塞的,如果收到消息就收到,收不到消息也返回,就继续往下执行。
2、reply_to就是想让服务器执行完命令之后,把结果返回到这个queue里面。
3、在while self.respose is None中的代码我可以不做time.sleep,我这边可以发消息给服务器端,这个消息不一定按顺序发给服务器端,如果不做self.corr_id == props.correlation_id的验证,那数据就可能对不上了。
RPC ---- SERVER
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): "斐波那契数列" if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange="", routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= props.correlation_id), # props的是客户端的发过来的信息,这边把correlation_id返回给客户端做验证 body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue="rpc_queue") print(" [x] Awaiting RPC requests") channel.start_consuming()
注:props.reply_to,这个就是客户端返回过来的queue。
如果客户端和服务用的是同一个queue,客户端也发到rpc_queue中,那么客户端就会收到自己的消息,就会形成一个死循坏,无法处理数据