首页 > 其他 > 详细

XXL-REGISTRY源码阅读笔记

时间:2019-12-24 22:47:53      阅读:95      评论:0      收藏:0      [点我收藏+]

因为业务需要,使用xxl-job任务调度平台,对它的实现非常好奇,于是从github上下载了它的源码来“观瞻观瞻”。

开源社区网站为:https://www.xuxueli.com/page/projects.html

xxl系列包含了很多开源项目:任务调度平台xxl-job、分布式服务框架xxl-rpc、分布式注册中心xxl-registry等。xxl-job里面会使用到xxl-registry\xxl-rpc的功能,因此想从注册中心开始看起。

注册中心从功能上来说可以划分为四大功能:

  1. 服务注册;
  2. 服务发现
  3. 服务移除
  4. 服务监控

本文先分析前三个功能(第四个功能使用DeferredResult进行异步返回结果,后续添加)

1.服务注册的实现

@Override
    public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) {

        // valid
        if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
        }
        if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]");
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
        }
        if (registryDataList==null || registryDataList.size()==0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid");
        }
        for (XxlRegistryData registryData: registryDataList) {
            if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]");
            }
            if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]");
            }
        }

        // fill + add queue
        for (XxlRegistryData registryData: registryDataList) {
            registryData.setBiz(biz);
            registryData.setEnv(env);
        }
        registryQueue.addAll(registryDataList);

        return ReturnT.SUCCESS;
    }

其实非常简单,就是声明一个LinkedBlockingQueue对象registryQueue,每次客户端需要注册时,向队列里添加对应注册信息。同时,启动一newCachedThreadPool线程池(会提交许多任务,注册线程只是其中一个)。提交十个线程处理注册任务,代码如下:

for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while (!executorStoped) {
                        try {
                            XxlRegistryData xxlRegistryData = registryQueue.take();
                            if (xxlRegistryData !=null) {

                                // refresh or add
                                int ret = xxlRegistryDataDao.refresh(xxlRegistryData);
                                if (ret == 0) {
                                    xxlRegistryDataDao.add(xxlRegistryData);
                                }

                                // valid file status
                                XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
                                if (fileXxlRegistry == null) {
                                    // go on
                                } else if (fileXxlRegistry.getStatus() != 0) {
                                    continue;     // "Status limited."
                                } else {
                                    if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
                                        continue;     // "Repeated limited."
                                    }
                                }

                                // checkRegistryDataAndSendMessage
                                checkRegistryDataAndSendMessage(xxlRegistryData);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            });
        }

关于getFileRegistryData方法,这里先不做解释,主要是用作各注册中心节点同步。checkRegistryDataAndSendMessage功能是将注册信息同步到数据库(添加、更新等)。

private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){
        // data json
        List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
        List<String> valueList = new ArrayList<>();
        if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
            for (XxlRegistryData dataItem: xxlRegistryDataList) {
                valueList.add(dataItem.getValue());
            }
        }
        String dataJson = JacksonUtil.writeValueAsString(valueList);

        // update registry and message
        XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey());
        boolean needMessage = false;
        if (xxlRegistry == null) {
            xxlRegistry = new XxlRegistry();
            xxlRegistry.setBiz(xxlRegistryData.getBiz());
            xxlRegistry.setEnv(xxlRegistryData.getEnv());
            xxlRegistry.setKey(xxlRegistryData.getKey());
            xxlRegistry.setData(dataJson);
            xxlRegistryDao.add(xxlRegistry);
            needMessage = true;
        } else {

            // check status, locked and disabled not use
            if (xxlRegistry.getStatus() != 0) {
                return;
            }

            if (!xxlRegistry.getData().equals(dataJson)) {
                xxlRegistry.setData(dataJson);
                xxlRegistryDao.update(xxlRegistry);
                needMessage = true;
            }
        }

        if (needMessage) {
            // sendRegistryDataUpdateMessage (registry update)
            sendRegistryDataUpdateMessage(xxlRegistry);
        }

    }

而sendRegistryDataUpdateMessage(xxlRegistry)其实是将注册信息暂存到数据库中(可能是新注册数据,也可能是已注册数据的更新),每个消息保存了完整的注册数据。后续程序将会依据该消息将注册消息固定到properties文件内,用作各节点同步。具体操作如下:

executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (!executorStoped) {
                    try {
                        // new message, filter readed
                        List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds);
                        if (messageList!=null && messageList.size()>0) {
                            for (XxlRegistryMessage message: messageList) {
                                readedMessageIds.add(message.getId());

                                if (message.getType() == 0) {   // from registry、add、update、deelete,ne need sync from db, only write

                                    XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class);

                                    // process data by status
                                    if (xxlRegistry.getStatus() == 1) {
                                        // locked, not updated
                                    } else if (xxlRegistry.getStatus() == 2) {
                                        // disabled, write empty
                                        xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>()));
                                    } else {
                                        // default, sync from db (aready sync before message, only write)
                                    }

                                    // sync file
                                    setFileRegistryData(xxlRegistry);
                                }
                            }
                        }

                        // clean old message;
                        if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) {
                            xxlRegistryMessageDao.cleanMessage(registryBeatTime);
                            readedMessageIds.clear();
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        });

这里setFileRegistryData方法就是将数据固定到properties文件内。其实我个人觉得这个地方用数据库会不会更好?(个人见解,不喜勿喷)后续其实作者的注册服务器各节点的同步还是通过数据库来实现的。

感觉这里其实已经不单单是注册了,还包括了一部分同步的内容。

因此,已经注册好的服务在数据库中可以查询到key、value信息,也可以在服务对应的properties文件内找到对应的注册信息。

2、服务发现

服务发现就是依据传入的值去寻找对应的value值(应该不局限是服务地址)。

 public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) {

        // valid
        if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid");
        }
        if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]");
        }
        if (env==null || env.trim().length()<2 || env.trim().length()>255) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]");
        }
        if (keys==null || keys.size()==0) {
            return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid.");
        }
        for (String key: keys) {
            if (key==null || key.trim().length()<4 || key.trim().length()>255) {
                return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]");
            }
        }

        Map<String, List<String>> result = new HashMap<String, List<String>>();
        for (String key: keys) {
            XxlRegistryData xxlRegistryData = new XxlRegistryData();
            xxlRegistryData.setBiz(biz);
            xxlRegistryData.setEnv(env);
            xxlRegistryData.setKey(key);

            List<String> dataList = new ArrayList<String>();
            XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
            if (fileXxlRegistry!=null) {
                dataList = fileXxlRegistry.getDataList();
            }

            result.put(key, dataList);
        }

        return new ReturnT<Map<String, List<String>>>(result);
    }

显然这里的服务发现回传值是取自之前注册流程里产生的properties文件。

3、服务移除

服务移除也是使用了另一个阻塞队列removeQueue来进行操作,客户端调用remove方法将需要移除的注册内容写入removeQueue,在线程池中启动线程处理服务移除:

for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while (!executorStoped) {
                        try {
                            XxlRegistryData xxlRegistryData = removeQueue.take();
                            if (xxlRegistryData != null) {

                                // delete
                                xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue());

                                // valid file status
                                XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData);
                                if (fileXxlRegistry == null) {
                                    // go on
                                } else if (fileXxlRegistry.getStatus() != 0) {
                                    continue;   // "Status limited."
                                } else {
                                    if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) {
                                        continue;   // "Repeated limited."
                                    }
                                }

                                // checkRegistryDataAndSendMessage
                                checkRegistryDataAndSendMessage(xxlRegistryData);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            });
        }

这里服务移除的方案是将value值置空。但并不删除在数据库中的内容。

 

还有一点需要理解的是各节点间的同步功能,通过同步线程固定周期进行一次同步:

executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (!executorStoped) {

                    // align to beattime
                    try {
                        long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime;
                        if (sleepSecond>0 && sleepSecond<registryBeatTime) {
                            TimeUnit.SECONDS.sleep(sleepSecond);
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // clean old registry-data in db
                        xxlRegistryDataDao.cleanData(registryBeatTime * 3);

                        // sync registry-data, db + file
                        int offset = 0;
                        int pagesize = 1000;
                        List<String> registryDataFileList = new ArrayList<>();

                        List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
                        while (registryList!=null && registryList.size()>0) {

                            for (XxlRegistry registryItem: registryList) {

                                // process data by status
                                if (registryItem.getStatus() == 1) {
                                    // locked, not updated
                                } else if (registryItem.getStatus() == 2) {
                                    // disabled, write empty
                                    String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>());
                                    registryItem.setData(dataJson);
                                } else {
                                    // default, sync from db
                                    List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey());
                                    List<String> valueList = new ArrayList<String>();
                                    if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) {
                                        for (XxlRegistryData dataItem: xxlRegistryDataList) {
                                            valueList.add(dataItem.getValue());
                                        }
                                    }
                                    String dataJson = JacksonUtil.writeValueAsString(valueList);

                                    // check update, sync db
                                    if (!registryItem.getData().equals(dataJson)) {
                                        registryItem.setData(dataJson);
                                        xxlRegistryDao.update(registryItem);
                                    }
                                }

                                // sync file
                                String registryDataFile = setFileRegistryData(registryItem);

                                // collect registryDataFile
                                registryDataFileList.add(registryDataFile);
                            }


                            offset += 1000;
                            registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null);
                        }

                        // clean old registry-data file

                        cleanFileRegistryData(registryDataFileList);

                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(registryBeatTime);
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        });

至此,数据库、注册properties文件保持一致。

 

在开始使用作者提供的客户端测试代码是,使用baseTest会发现注册内容总是会被自动清空。原来设定是每间隔固定时间对不续约数据进行清除(数据库与文件)。

因此在完整版客户端(姑且这个称呼)是会使用线程循环对服务进行续约的,通过这样的方式保持客户端的在线状态监测。

registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!registryThreadStop) {
                    try {
                        if (registryData.size() > 0) {

                            boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
                            logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
                        }
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
            }
        });
        registryThread.setName("xxl-registry, XxlRegistryClient registryThread.");
        registryThread.setDaemon(true);
        registryThread.start();

 

XXL-REGISTRY源码阅读笔记

原文:https://www.cnblogs.com/hblearning/p/xxl_registry.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!