1,生产者
new_task.py
import pika
if __name__ == ‘__main__‘:
connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel=connection.channel()
channel.queue_declare("Kadima")
message="You are awsome!"
for i in range(0,100):#循环100次发送消息
channel.basic_publish(exchange="",routing_key=‘Kadima‘,body=message+" "+str(i))
print "sending ",message
2,多个消费者
消费者1,work.py
#-*- coding: UTF-8 -*-
import time
import pika
import sys
__author__ = ‘Yue‘
var=0
def callback(ch, method, properties, body):
# temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
#global var
#var+=1
#if var==20:
#sys.exit()
print "1 received %r" % (body,)
time.sleep(body.count("."))
print "Done"
if __name__ == ‘__main__‘:
connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel=connection.channel()
channel.queue_declare("Kadima")
channel.basic_consume(callback,queue="Kadima",no_ack=True)
print ‘ [1] Waiting for messages‘
channel.start_consuming()
work2.py
import time
import pika
__author__ = ‘Yue‘
def callback(ch, method, properties, body):
print "2 received %r" % (body,)
time.sleep(body.count("."))
print "Done"
if __name__ == ‘__main__‘:
connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel=connection.channel()
channel.queue_declare("Kadima")
channel.basic_consume(callback,queue="Kadima",no_ack=True)
print ‘ [2] Waiting for messages‘
channel.start_consuming()
3,执行work,work2,new_task


我的启动顺序是work,work2,从执行结果可以看出,RabbitMQ是将task分别依次分发给按照时间顺序注册的work上的,
也就是,task1,task2,task3,task4,它会将task1,task3分发给work,另外两个分发给task3,task4
接下来,有趣的事情就要发生了:
当把work.py中的callback函数的注释内容打开后(起作用是让work处理19个task,便退出程序),MQ并没有将本该分发给work的task分发给work2,那到底去哪里了呢?我暂时假设为work退出时并没有告诉MQ他不干了(他出现异常啦),MQ还是会将task分发给work


4,那没有执行完的任务怎么办呢?
Message acknowledgment :ack默认是打开的
修改work代码如下
#-*- coding: UTF-8 -*-
import time
import pika
import sys
__author__ = ‘Yue‘
var=0
def callback(ch, method, properties, body):
# temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
global var
var+=1
if var==20:
sys.exit()
print "1 received %r" % (body,)
time.sleep(body.count("."))
print "Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == ‘__main__‘:
connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel=connection.channel()
channel.queue_declare("Kadima")
channel.basic_consume(callback,queue="Kadima",no_ack=True)
print ‘ [1] Waiting for messages‘
channel.start_consuming()

work只执行到20,但是work2并未从22开始全部执行,而是从37开始MQ,这老实说,我也有点搞不懂了,以后想清楚了再补充

原文:http://my.oschina.net/u/2494265/blog/521684