1. 订阅/发布:
前面worker示例中的每个任务都是只发送给一个worker,如果我们多个worker都需要接收处理同一个任务,此时就要使用
订阅/发布功能,比如,日志模块产生日志并发送到队列中,队列连接两个worker,一个负责打印到控制台,一个负责打印到日志文件,
则队列需要将内部消息同时发送到两个worker中做不同的处理。
2. 交换:
前面示例当中,我们是直接使用队列来收发消息的,那并不是RabbitMQ的完整模型,完整模型当中还包含有"交换",消息不应该直接发送给
队列,而是发送给"交换"。交换的模型很简单,其一端连接生产者,一端连接消息队列,交换需要一定的规则来对收到消息做处理,比如发给
某个队列,亦或者丢弃该消息,这个规则我们称之为"交换类型": direct, topic, headers ,fanout,本文以及后面的文章会对几种类型做详细
介绍,可以使用如下方式创建交换,如下其名称为logs,类型是=fanout,fanout类型不会关系消息,只是简单对消息广播到连接队列。
channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘)
含有交换的完整模型如下图所示:
3. 临时队列:
在不需要多个生产者或者消费者共享队列的时候,队列名称我们是不关心的,RabbitMQ提供了一种随机生成队列的方式:
result = channel.queue_declare()
result.method.queue中含有队列的名称
当我们需要设置消费者断开,队列自动销毁,可以使用如下方式,标记exlusive=True:
result = channel.queue_declare(exclusive=True)
4. 绑定:
队列和交换均建立完成,此时我们需要绑定队列和交换,这样交换才知道向哪些队列发送消息,方式如下:
channel.queue_bind(exchange=‘logs‘, queue=result.method.queue)
绑定之后的模型如下:
完成模型,包含worker:
5. 测试代码:
emit_log.py -- 产生日志消息,发送到交换:
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host=‘localhost‘)) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange=‘logs‘, 10 type=‘fanout‘) 11 12 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" 13 channel.basic_publish(exchange=‘logs‘, 14 routing_key=‘‘, 15 body=message) 16 print(" [x] Sent %r" % message) 17 connection.close()
reveive_logs.py--临时队列绑定交换,接收日志消息并处理;
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host=‘localhost‘)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange=‘logs‘, 9 type=‘fanout‘) 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 channel.queue_bind(exchange=‘logs‘, 15 queue=queue_name) 16 17 print(‘ [*] Waiting for logs. To exit press CTRL+C‘) 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) 25 26 channel.start_consuming()
原文:http://www.cnblogs.com/wanpengcoder/p/5291627.html