首页 > 编程语言 > 详细

RabitMQ使用_python

时间:2020-04-11 18:38:59      阅读:62      评论:0      收藏:0      [点我收藏+]

一、RabbitMQ的介绍

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

主要作用:每个进程(跨语言,系统)之间的中间代理。

 

二、常用命令与授权角色说明

启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:rabbitmq-service start
关闭rabbitmq:rabbitmq-service stop
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
2、使用前需要添加用户,授权等
用户和权限设置(后面用处)
添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost  vhost_name
将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username ‘.*‘ ‘.*‘ ‘.*‘
角色说明
none  最小权限角色
management 管理员角色
policymaker   决策者
monitoring  监控
administrator  超级管理员 

三、exchange交换器的四种类型

1、fanout:分发给exchange绑定的所有queu中

技术分享图片

 

 

 2、direct:把消息路由到那些binding key与routing key完全匹配的Queue中

技术分享图片

 

 

 3、topic:与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:(过滤字符串)

技术分享图片

 

 

 4、header

技术分享图片

 

 

 

四、简单使用例子

1、RPC进行批量主机执行命令

生产者给消费者消息之后,执行之后结果返回给生产者,这种模式叫做RPC(remote procedure call  远程过程调用)

技术分享图片

 

 

 client:

start:

技术分享图片
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 #Author:chenxz
 4 import os,sys
 5 Base_dir=os.path.dirname(os.path.dirname(__file__))
 6 sys.path.append(Base_dir)
 7 from core import main
 8 
 9 if __name__ == __main__:
10     obj=main.Main()
11     obj.interactive()
View Code

main:

技术分享图片
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 #Author:chenxz
 4 import threading
 5 import os,sys
 6 Base_dir=os.path.dirname(os.path.dirname(__file__))
 7 sys.path.append(Base_dir)
 8 from core import producer
 9 import uuid
10 
11 class Main(object):
12     def __init__(self):
13         self.information={}
14     def interactive(self):
15         ‘‘‘
16         每进来一个任务新建一个多线程
17         :return:
18         ‘‘‘
19         while True:
20             cmd_inp=input(>>).strip()
21             if not cmd_inp:
22                 continue
23             t=threading.Thread(target=self.dealwith,args=(cmd_inp,))
24             t.start()
25             # cmd_list = cmd_inp.split(‘"‘)
26             # cmd=cmd_list()
27             # print(cmd_list)
28     def dealwith(self,cmd_inp):
29         ‘‘‘
30         处理输入的命令,并映射到对应的方法(run check check_all)
31         :param cmd_inp:
32         :return:
33         ‘‘‘
34         cmd_list = cmd_inp.split(")
35 
36         operate=cmd_list[0].strip().split( )[0]
37         print(operate)
38         if hasattr(self,operate):
39             getattr(self,operate)(cmd_list)
40         else:
41             print(命令输入错误:%s%cmd_inp)#
42 
43     def run(self,cmd_list):
44         cmd=cmd_list[1].strip()
45         hosts_list=cmd_list[2].strip().split( )[1:]
46         print(cmd,hosts_list)
47         for ip in hosts_list:
48             #producer_obj=producer.Producer()
49             #response=producer.send(cmd,ip)
50             task_id=uuid.uuid4()
51             response =[queue_123,12345678]
52             self.information[task_id]={host:ip,cmd:cmd,corr_id:response[1],callback_queue:response[0]}
53             print( self.information)
54 
55     def check_all(self,cmd_list):
56         print(存在以下任务:)
57         for i in self.information:
58             print("task_id(%s): %s 主机执行 %s 命令"%(i,self.information[i][host],self.information[i][cmd]))
59 
60     def check_task(self,cmd_list):
61         task_id=cmd_list[0].strip().split( )[1]
62         print(task_id)
63         callback_queue=self.information[task_id][callback_queue]
64         corr_id=self.information[task_id][corr_id]
65         consume_obj=producer()
66         response=consume_obj.receive(callback_queue,corr_id)
67         print(response.decode())
68         del self.information[task_id]
69 if __name__ == __main__:
70     obj=Main()
71     obj.interactive()
View Code

producer:

技术分享图片
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 #Author:chenxz
 4 import pika
 5 import uuid
 6 
 7 class Producer(object):
 8     def __init__(self):
 9         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=192.168.106.25,credentials=pika.PlainCredentials(root,123)))
10         self.channel=self.connection.channel()
11     def send(self,body,queue_name):
12         #设置返回时的队列名self.callback_queue和随机生成任务id号码self.corr_id
13         result=self.channel.queue_declare(queue=‘‘,exclusive=False)
14         self.callback_queue=result.method.queue
15         self.corr_id=str(uuid.uuid4())
16         #发送消息
17         self.channel.publish(exchange=‘‘,
18                              routing_key=queue_name,
19                              properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id),
20                              body=body)
21         return self.callback_queue,self.corr_id
22 
23     def receive(self,callback_queue,corr_id):
24         self.callback_id=corr_id
25         self.response=None
26         self.channel.basic_consume(
27             on_message_callback=self.callback,
28             queue=callback_queue,
29         )
30         while not self.response:
31             self.connection.process_data_events()
32         return self.response
33 
34     def callback(self,ch,method,props,body):
35         if self.callback_id==props.correlation_id
36             self.response=body
View Code

 

server:

start:

技术分享图片
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 #Author:chenxz
 4 
 5 import os,sys
 6 Base_dir=os.path.dirname(os.path.dirname(__file__))
 7 sys.path.append(Base_dir)
 8 from core import main
 9 
10 if __name__ == __main__:
11     obj=main.consumer()
12     obj.start()
View Code

 

main:

技术分享图片
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 #Author:chenxz
 4 import pika
 5 import os
 6 
 7 class consumer(object):
 8     def __init__(self):
 9         self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=192.168.106.25,credentials=pika.PlainCredentials(root,123)))
10         self.channel=self.connection.channel()
11     def start(self):
12         #获取ip
13         ip=123
14         self.channel.queue_declare(queue=ip)
15         self.channel.basic_consume(on_message_callback=self.callback,
16                                    queue=ip,
17                                    )
18         self.channel.start_consuming()
19     def callback(self,ch,method,props,body):
20         corr_id=props.correlation_id
21         callback_queue=props.reply_to
22         #执行命令,返回结果
23         response=self.handle(body)
24         self.channel.basic_publish(exchange=‘‘,
25                                    routing_key=callback_queue,
26                                    properties=pika.BasicProperties(correlation_id=corr_id,
27                                                                    ),
28                                    body=response)
29         ch.basic_ack(delivery_tag=method.delivery_tag)
30 
31     def handle(self,cmd):
32         cmd=cmd.decode()
33         message=os.popen(cmd).read()
34         if not message:
35             message=wrong cmd
36         return message
View Code

 

 

 

 

 

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
#Author:chenxz

import os,sys
Base_dir=os.path.dirname(os.path.dirname(__file__))
sys.path.append(Base_dir)
from core import main

if __name__ == ‘__main__‘:
obj=main.consumer()
obj.start()

RabitMQ使用_python

原文:https://www.cnblogs.com/chenxiaozan/p/12680481.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!