1、生产者
import pika #RabbitMQ 采用轮询方式 connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))#建立连接 channel=connection.channel()#声明一个管道 channel.queue_declare(queue=‘hello‘)#声明一个队列并命名 channel.basic_publish(exchange=‘‘, routing_key=‘hello‘,#队列名 body=‘hello there~‘, ) )#发送的消息 print(‘send message...‘) connection.close()
首先使用pika库建立连接,声明一个管道,再声明一个队列,发送消息,最后关闭连接
2、消费者
import pika connection=pika.BlockingConnectison(pika.ConnectionParameters(‘localhost‘,5672))#RabbitMQ默认端口 channel=connection.channel() channel.queue_declare(queue=‘hello‘,durable=True)#队列名,和produser的一致 def callback(ch,method,propotities,body): print("receive >>",body) ch.basic_ack(delivery_tag=method.delivery_tag)#no_ack=False的话要手动确认 channel.basic_consume(callback,#如果收到消息就调用回调函数 queue=‘hello‘, # no_ack=True ) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()#是channel开始不是connection开始,不用关闭连接
RabbitMQ交换模式有fanout、direct和topic
注:三种交换模式中信息的发送都是即时的,即错过了消息就接收不到了
1、fanout广播
publisher实现
import pika #不需要声明queue,广播是实时的即订阅发布 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘) message=‘message...‘ channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) print(‘send...‘) connection.close()
consumer实现
import pika #发送端和接收端均不用声明queue connection=pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘logs‘, exchange_type=‘fanout‘) result=channel.queue_declare(exclusive=True) #不指定queue名字,rabbitmq会随机指定queue,exclusive=True会在消息接收完后删除 queue_name=result.method.queue def callback(ch,method,properties,body): print(‘receive>>‘,body) channel.queue_bind(exchange=‘logs‘,queue=queue_name)#绑定queue channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
2、direct过滤特定消息
publisher实现
import pika,sys #不需要声明queue,广播是实时的即订阅发布 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘direct_logs‘,exchange_type=‘direct‘) severity=sys.argv[1] if len(sys.argv)>1 else ‘info‘#定义消息级别,默认为info #sys.argv为获取命令行参数 message=‘‘.join(sys.argv[2:])or ‘HELLO!‘#获取消息 channel.basic_publish(exchange=‘direct_logs‘,#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!! routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是 body=message) print(‘send...‘) connection.close()
consumer实现
#direct可以实现接收特定消息或指定消息 import pika,sys #发送端和接收端均不用声明queue connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, exchange_type=‘direct‘) result=channel.queue_declare(exclusive=True) queue_name=result.method.queue #定义级别,不指定级别会报错 severities=sys.argv[1:] if not severities: sys.stderr.write(‘Usage:%s [info] [warning] [error]‘%sys.argv[0]) sys.exit(1) for s in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name,routing_key=s) # 绑定queue,要指定routing_key!! def callback(ch,method,properties,body): print(‘receive>>‘,body) channel.basic_consume(callback, queue=queue_name, #no_ack=True ) channel.start_consuming()
3、topic细致的过滤
publisher实现
import pika,sys #不需要声明queue,广播是实时的即订阅发布 connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘topic_logs‘,exchange_type=‘topic‘) severity=sys.argv[1] if len(sys.argv)>1 else ‘anonymous.info‘#定义消息级别,默认为info #sys.argv为获取命令行参数 message=‘‘.join(sys.argv[2:])or ‘HELLO!‘#获取消息 channel.basic_publish(exchange=‘topic_logs‘,#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!! routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是 body=message) print(‘send...‘) connection.close()
consumer实现
#direct可以实现接收特定消息或指定消息 import pika,sys #发送端和接收端均不用声明queue connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.exchange_declare(exchange=‘topic_logs‘, exchange_type=‘topic‘) result=channel.queue_declare(exclusive=True) queue_name=result.method.queue #定义级别,不指定级别会报错 severities=sys.argv[1:] if not severities: sys.stderr.write(‘Usage:%s [指定接收消息的类型]‘%sys.argv[0]) #消费端接收的消息类型可以以*.开头,如*.info,这样可以接收不同应用的消息 sys.exit(1) for s in severities: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name,routing_key=s) # 绑定queue,要指定routing_key!! def callback(ch,method,properties,body): print(‘receive>>‘,body) channel.basic_consume(callback, queue=queue_name, #no_ack=True ) channel.start_consuming()
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) self.channel=self.connection.channel() result=self.channel.queue_declare(exclusive=True) self.callback_queue=result.method.queue self.channel.basic_consume(self.callback,no_ack=True,queue=self.callback_queue) def callback(self,ch,method,props,body): if self.corr_id==props.correlation_id: #判断从服务器端接收的UUID和之前发送的是否相等,借此判断是否是同一个消息队列 self.response=body#从服务器端收到的消息 def call(self,n): self.response=None self.corr_id=str(uuid.uuid4())#客户端发送请求时生成一个唯一的UUID self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, properties=pika.BasicProperties(#消息持久化 reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n)) while self.response is None: self.connection.process_data_events() #一般情况下channel.start_consuming表示阻塞模式下接收消息 #这里不阻塞收消息,且隔一段时间检查有没有消息 #即非阻塞版的start_consuming print(‘no message...‘)#进行到这一步代表没有消息 return int(self.response) fibonacci_rpc=FibonacciRpcClient() response=fibonacci_rpc.call(6) print(‘get‘,response)
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel=connection.channel() channel.queue_declare(queue=‘rpc_queue‘) def callback(ch,method,props,body): ‘‘‘接收消息并返回结果‘‘‘ n=int(body) print(n) response=fib(n) ch.basic_publish(exchange=‘‘, routing_key=props.reply_to,#要返回的队列名 properties=pika.BasicProperties(correlation_id=props.correlation_id),#使correlation_id等于客户端的并发送回去 body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag)#确认收到 def fib(n): ‘‘‘斐波那契实现‘‘‘ if n==0 or n==1: return n else: return fib(n-1)+fib(n-2) channel.basic_consume(callback,queue=‘rpc_queue‘) channel.start_consuming()
原文:https://www.cnblogs.com/Aprilnn/p/9336377.html