首页 > 编程语言 > 详细

python借助zookeeper实现分布式服务(二)

时间:2020-07-22 21:18:35      阅读:85      评论:0      收藏:0      [点我收藏+]

重新思考了分布式服务的分工与合作,我梳理了分布式系统的三个角色,重写了上一篇的代码.

众所周知,分布式系统中一般有三个角色,master,worker和client

1.master
主服务器监视新的worker和task,将任务分配给可用的工作人员。当然自己也要高可用
2.worker
worker在系统中进行注册,以确保主服务器可以分配任务给自己,然后监视新任务,有任务分配给自己就开始执行。
3.client
客户端创建新任务并提交给系统。

详细的逻辑来讲是这样的:

1.多个worker竞选master,竞选成功的成为master,创建workers/tasks/assigin,竞选失败的worker监控/master随时准备竞选master
2.worker在zk注册自己,zk会将worker存在/worker一个子节点,创建属于自己的分配空间,并监控master可能分配给自己的任务
3.client在tasks下创建有序task-子节点,并监控task-的状态
4.master查看新task,获取可用worker节点,将分配任务给master之外的worker
5.worker监控分配给自己的task,发现有分配给自己的task时执行task,执行完修改task状态
6.client收到通知,获取到task的状态
下面看代码

zk_master.py

# -*- coding:utf-8 -*-
# @Time    : 2020-07-22 14:12
# @Author  : wangbin
import json
import random
import socket
import sys
import time

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError


class ZKMaster(object):
    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.zk = KazooClient(hosts=127.0.0.1:2181)
        self.zk.start()
        self._workers = []

    # 将自己注册到zk,临时节点连接不能中断,同时监听/rpc/master是否存在,不存在重新竞选master
    def register_zk(self, event=None):
        """
        注册到zookeeper
        """
        self.zk.ensure_path(/rpc)  # 创建根节点
        value = json.dumps({host: self.host, port: self.port})
        # 创建服务子节点
        try:
            self.zk.create(/rpc/master, value.encode(), ephemeral=True)
        except NodeExistsError as e:
            print(e)
        self.zk.exists(/rpc/master, watch=self.register_zk)

    def do(self):
        self.get_works()
        self.watch_tasks()
        while True:
            print(self._workers)
            time.sleep(3)

    # 监控/rpc/workers子节点变化,都会实时更新self._servers列表
    def get_works(self, event=None):
        """
        从zookeeper获取服务器地址信息列表
        """
        workers = self.zk.get_children(/rpc/workers, watch=self.get_works)
        self._workers = []
        for worker in workers:
            data = self.zk.get(/rpc/workers/ + worker)[0]
            if data:
                addr = json.loads(data.decode())
                self._workers.append(addr)

    # 监控最新任务,没有状态的进行分配,成功的进行删除
    def watch_tasks(self, event=None):
        tasks = self.zk.get_children(/rpc/tasks, watch=self.watch_tasks)
        for task in tasks:
            # 如果没有status,那就没有在执行
            is_exist = self.zk.exists(/rpc/tasks/ + task + /status)
            if not is_exist:
                worker = random.choice(self._workers)
                self.assign(worker[host], task)
            else:
                # 如果task任务状态为成功,则删除task节点
                status, data = self.zk.get(/rpc/tasks/ + task + /status)
                if status == bdone:
                    self.zk.delete(/rpc/tasks/ + task + /status)
                    self.zk.delete(/rpc/tasks/ + task)
                    print(delete done task= + task)

    # 将任务分配给worker,分配前打上状态标记
    def assign(self, worker, task):
        print(task=%s schedule to %s % (task, worker))
        self.zk.create(/rpc/tasks/ + task + /status, bschedule)
        self.zk.create(/rpc/assign/{worker}/{task}.format(worker=worker, task=task), value=b‘‘)


if __name__ == __main__:
    host = sys.argv[1]
    port = sys.argv[2]
    m = ZKMaster(host, int(port))
    m.register_zk()
    m.do()

zk_worker.py

# -*- coding:utf-8 -*-
# @Time    : 2020-07-20 17:50
# @Author  : wangbin
import json
import socket
import sys
import time

from kazoo.client import KazooClient

# worker将自己注册到zk
from kazoo.exceptions import NodeExistsError


class ZKWorker(object):
    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.zk = None

    # 将自己注册到zk,临时节点,所以连接不能中断
    def register_zk(self):
        """
        注册到zookeeper
        """
        self.zk = KazooClient(hosts=127.0.0.1:2181)
        self.zk.start()
        self.zk.ensure_path(/rpc/workers)  # 创建根节点
        value = json.dumps({host: self.host, port: self.port})
        # 创建服务子节点
        self.zk.create(/rpc/workers/{name}.format(name=self.host), value.encode(), ephemeral=True)

    def wait_assign(self, event=None):
        """create /assign/worker1.example.com ""
            ls -w /assign/worker1.example.com """
        self.zk.ensure_path(/rpc/assign)  # 创建根节点
        try:
            self.zk.create(/rpc/assign/{name}.format(name=self.host), b‘‘)
        except NodeExistsError as e:
            print(e)
        tasks = self.zk.get_children(/rpc/assign/{name}.format(name=self.host), watch=self.wait_assign)
        print(tasks)
        for task in tasks:
            self.do_task(task)

    # 执行
    def do_task(self, task):
        print(ready to do  + task)
        time.sleep(2)
        self.update_task_status(task)
        self.delete_assign(task)

    # 获取新分配的任务,执行后更新任务状态
    def update_task_status(self, task):
        self.zk.set(/rpc/tasks/{task}/status.format(task=task), bdone)
        print(task=%s is done!  % task)

    def delete_assign(self, task):
        self.zk.delete(/rpc/assign/{host}/{task}.format(host=self.host, task=task))
        print(delete done assign task= + task)


if __name__ == __main__:
    host = sys.argv[1]
    port = sys.argv[2]
    worker = ZKWorker(host, int(port))
    worker.register_zk()
    worker.wait_assign()
    while True:
        print(i am worker!wait for task)
        time.sleep(3)

zk_client.py

# -*- coding:utf-8 -*-
# @Time    : 2020-07-20 17:53
# @Author  : wangbin

from kazoo.client import KazooClient


# 客户端连接zk,并从zk获取可用的服务器列表
class ZKClient(object):
    def __init__(self):
        self.zk = KazooClient(hosts=127.0.0.1:2181)
        self.zk.start()

    def add_task(self):
        self.zk.ensure_path(/rpc/tasks)
        task = self.zk.create(/rpc/tasks/task-, value=bcmd, sequence=True)
        print(task)


if __name__ == __main__:
    client = ZKClient()
    client.add_task()

 

python借助zookeeper实现分布式服务(二)

原文:https://www.cnblogs.com/wangbin2188/p/13363015.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!