1.简单的分布式流程图
分布式:
import multiprocessing import multiprocessing.managers # 进程管理器 import random,time # 分布式进程 task_queue = multiprocessing.Queue() # 任务 resule_queue = multiprocessing.Queue() # 结果 def return_task(): """ :return: 任务队列 """ return task_queue def return_result(): """ :return: 结果队列 """ return resule_queue # 创建管理器类 class QueueManager(multiprocessing.managers.BaseManager): """ 进程管理器,实现队列和客户端共享 """ pass if __name__ == ‘__main__‘: multiprocessing.freeze_support() # 开启分布式支持 QueueManager.register("get_task",callable=return_task) # 注册函数给客户端调用 QueueManager.register("get_result",callable=return_result) # 注册函数给客户端调用 manager = QueueManager(address=("192.168.33.39",8888),authkey=123456) # 创建管理器,绑定ip、port和链接密码 # 开启管理器实例 manager.start() task = manager.get_task() result = manager.get_result() # 初始化数据 for i in range(10000): print("给任务队列里面放入数据:{}".format(i)) task.put(i) print("---" * 100) for i in range(10000): ret = result.get() print("从结果队列中取出数据:{}".format(ret)) # 关闭服务器 manager.shutdown
import multiprocessing import multiprocessing.managers # 进程管理器 import random,time # 分布式进程 task_queue = multiprocessing.Queue() # 任务 result_queue = multiprocessing.Queue() # 结果 # 创建管理器类 class QueueManager(multiprocessing.managers.BaseManager): """ 进程管理器,实现队列和客户端共享 """ pass if __name__ == ‘__main__‘: QueueManager.register("get_task") # 注册函数调用服务器 QueueManager.register("get_result") # 注册函数调用服务器 manager = QueueManager(address=("192.168.33.39",8888),authkey=123456) # 创建管理器,绑定ip、port和链接密码 # 链接服务器 manager.connect() task = manager.get_task() result = manager.get_result() for i in range(10000): data = task.get() print("客户端获取任务队列中的数据") data += 10 print("客户端将加工过的数据放入结果队列中") result.put(data)
原文:https://www.cnblogs.com/meloncodezhang/p/11443749.html