exchange:发给RabbitMQ的消息其实都是发给exchange,exchange是一个中间人,将收到的消息处理转发。
exchangge类型:
(1)fanout:所有bind到此exchange的queue都可以接收消息(全广播)
(2)direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
(3)topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
fanout 简单实现:
fanout实际上是多个消费者将自己的队列绑定到生产者声明的exchange中
producor ;
(1)建立socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道 channel = connection.channel()
(3)声明exchange, channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘)
(4)发送消息 channel.basic_publish(exchange=‘logs‘, routing_key= ‘‘, body=message,)
cosumer ;
(1)建立socket连接 :connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
(2)建立管道: channel = connection.channel()
(3)声明exchange : channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘)
(4)声明队列 ,设置队列自动删除: result = channel.queue_declare(queue=‘‘,exclusive= True)
(5)获取队列名 : queue_name= result.method.queue
(6)绑定exchange:channel.queue_bind(exchange=‘logs‘,queue=queue_name
(4)接收广播: channel.basic_consume(queue=‘‘,
on_message_callback=recv ,
auto_ack= True)
代码区:
1 #7512
2 ‘‘‘
3 exchange用法,
4 fanout广播,广播是实时的
5
6 ‘‘‘
7 import pika,sys
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
10 channel = connection.channel()
11 # channel.queue_declare(‘msg‘)
12
13
14 #fanout广播
15 channel.exchange_declare(exchange=‘logs‘,
16 exchange_type=‘fanout‘)
17
18 message = ‘ ‘.join(sys.argv[1:]) or "info :哈哈"
19 # message ="哈哈"
20 channel.basic_publish(exchange=‘logs‘,
21 routing_key= ‘‘,
22 body=message,
23 )
24
25 print(" [x] Sent ‘Hello World!‘")
26 connection.close()
1 #7512
2 ‘‘‘
3 广播接收端
4
5 ‘‘‘
6 import pika,time
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
9 channel = connection.channel()
10
11 channel.exchange_declare(exchange=‘logs‘,
12 exchange_type=‘fanout‘)
13
14
15 # exclusive排他,唯一的,不指定queue名字,rabbit会随机分配一个名字
16 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
17 result = channel.queue_declare(queue=‘‘,
18 exclusive= True)
19
20 print("result",result,"xxx",result.method.queue)
21 #取queue的名字
22 queue_name= result.method.queueresult.method.queue
23
24 #绑定交换器exchange
25 channel.queue_bind(exchange=‘logs‘,
26 queue=queue_name)
27
28
29 def recv(ch ,methrod ,proporties ,body):
30 print("recv msg : [%s]"%body.decode())
31 # time.sleep(20)
32 # ch.basic_ack(delivery_tag = methrod.delivery_tag)
33
34
35 # channel.basic_qos(prefetch_count=1)#最多能处理队列中的1条消息
36 channel.basic_consume(queue=‘‘,
37 on_message_callback=recv ,
38 auto_ack= True)
39
40 print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
41 channel.start_consuming()
direct 简单实现:
direct实际上是在fanout的基础上实现的,在fanhout基础上加多了一个routing_key来过滤发消息的目标。
代码区:
1 #7512
2
3 import pika
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange=‘direct_logs‘,
9 exchange_type="direct")
10
11 severity = input(">>:") or "info"
12 massage = input(">>:") or "哈哈"
13
14 channel.basic_publish(exchange="direct_logs",
15 routing_key=severity,
16 body = massage)
17
18 print("[x] Send to %r : %r "%(severity,massage))
19 connection.close()
1 #7512
2 import pika,sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
5 channel = connection.channel()
6
7 channel.exchange_declare(exchange=‘direct_logs‘,
8 exchange_type="direct")
9
10 result = channel.queue_declare(queue="",
11 exclusive=True)
12
13 queue_name = result.method.queue
14
15 serverities = sys.argv[1:]
16 if not serverities:
17 sys.exit()
18 print(serverities)
19
20 for i in serverities:
21 channel.queue_bind(exchange=‘direct_logs‘,
22 queue=queue_name,
23 routing_key= i )
24
25
26 def callback(ch, method, properties, body):
27 print("ch:",ch, "\nmethod:",method,"\nproperties" ,properties)
28 print(" [x] Received %r" % body.decode())
29 ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
30
31 channel.basic_consume(queue=queue_name,
32 on_message_callback=callback,
33 auto_ack=False)
34
35 channel.start_consuming()
topic简单实现:
1 #7512
2
3 import pika,sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
6
7 channel = connection.channel()
8 channel.exchange_declare(exchange="topic_logs",
9 exchange_type="topic")
10
11 key = sys.argv[1:] if len(sys.argv) > 1 else input(">>:")
12
13 message = input(">>:")
14 channel.basic_publish(exchange=‘topic_logs‘,
15 routing_key=key,
16 body=message
17 )
18
19 print("[x] sent [%s] : a msg is [%s]"%(key,message))
20 channel.start_consuming()
1 #7512
2 ‘‘‘
3 #收所有
4 ‘‘‘
5 import pika,sys
6
7 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
8 channel = connection.channel()
9
10 channel.exchange_declare(exchange="topic_logs",
11 exchange_type="topic")
12
13 result = channel.queue_declare(queue=‘‘,
14 exclusive=True)
15
16 queue_name = result.method.queue
17
18 massage = sys.argv[1:] or input(">>:").split()
19 for msg in massage:
20 channel.queue_bind(queue=queue_name,
21 exchange="topic_logs",
22 routing_key= msg)
23
24 def callback(ch, method, properties, body):
25 print(" [x] Received %r" % body.decode())
26 ch.basic_ack(delivery_tag=method.delivery_tag)#手动回应
27
28 channel.basic_consume(queue=queue_name,
29 on_message_callback=callback,
30 auto_ack=False)
31
32 print("[x] wait the msg...")
33 channel.start_consuming()
原文:https://www.cnblogs.com/gtq7512/p/11433900.html