因为业务需要,使用xxl-job任务调度平台,对它的实现非常好奇,于是从github上下载了它的源码来“观瞻观瞻”。
开源社区网站为:https://www.xuxueli.com/page/projects.html
xxl系列包含了很多开源项目:任务调度平台xxl-job、分布式服务框架xxl-rpc、分布式注册中心xxl-registry等。xxl-job里面会使用到xxl-registry\xxl-rpc的功能,因此想从注册中心开始看起。
注册中心从功能上来说可以划分为四大功能:
本文先分析前三个功能(第四个功能使用DeferredResult进行异步返回结果,后续添加)
1.服务注册的实现
@Override public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) { // valid if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) { return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid"); } if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"); } if (env==null || env.trim().length()<2 || env.trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"); } if (registryDataList==null || registryDataList.size()==0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid"); } for (XxlRegistryData registryData: registryDataList) { if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]"); } if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]"); } } // fill + add queue for (XxlRegistryData registryData: registryDataList) { registryData.setBiz(biz); registryData.setEnv(env); } registryQueue.addAll(registryDataList); return ReturnT.SUCCESS; }
其实非常简单,就是声明一个LinkedBlockingQueue对象registryQueue,每次客户端需要注册时,向队列里添加对应注册信息。同时,启动一newCachedThreadPool线程池(会提交许多任务,注册线程只是其中一个)。提交十个线程处理注册任务,代码如下:
for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { XxlRegistryData xxlRegistryData = registryQueue.take(); if (xxlRegistryData !=null) { // refresh or add int ret = xxlRegistryDataDao.refresh(xxlRegistryData); if (ret == 0) { xxlRegistryDataDao.add(xxlRegistryData); } // valid file status XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry == null) { // go on } else if (fileXxlRegistry.getStatus() != 0) { continue; // "Status limited." } else { if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) { continue; // "Repeated limited." } } // checkRegistryDataAndSendMessage checkRegistryDataAndSendMessage(xxlRegistryData); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } }); }
关于getFileRegistryData方法,这里先不做解释,主要是用作各注册中心节点同步。checkRegistryDataAndSendMessage功能是将注册信息同步到数据库(添加、更新等)。
private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){ // data json List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey()); List<String> valueList = new ArrayList<>(); if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) { for (XxlRegistryData dataItem: xxlRegistryDataList) { valueList.add(dataItem.getValue()); } } String dataJson = JacksonUtil.writeValueAsString(valueList); // update registry and message XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey()); boolean needMessage = false; if (xxlRegistry == null) { xxlRegistry = new XxlRegistry(); xxlRegistry.setBiz(xxlRegistryData.getBiz()); xxlRegistry.setEnv(xxlRegistryData.getEnv()); xxlRegistry.setKey(xxlRegistryData.getKey()); xxlRegistry.setData(dataJson); xxlRegistryDao.add(xxlRegistry); needMessage = true; } else { // check status, locked and disabled not use if (xxlRegistry.getStatus() != 0) { return; } if (!xxlRegistry.getData().equals(dataJson)) { xxlRegistry.setData(dataJson); xxlRegistryDao.update(xxlRegistry); needMessage = true; } } if (needMessage) { // sendRegistryDataUpdateMessage (registry update) sendRegistryDataUpdateMessage(xxlRegistry); } }
而sendRegistryDataUpdateMessage(xxlRegistry)其实是将注册信息暂存到数据库中(可能是新注册数据,也可能是已注册数据的更新),每个消息保存了完整的注册数据。后续程序将会依据该消息将注册消息固定到properties文件内,用作各节点同步。具体操作如下:
executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { // new message, filter readed List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds); if (messageList!=null && messageList.size()>0) { for (XxlRegistryMessage message: messageList) { readedMessageIds.add(message.getId()); if (message.getType() == 0) { // from registry、add、update、deelete,ne need sync from db, only write XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class); // process data by status if (xxlRegistry.getStatus() == 1) { // locked, not updated } else if (xxlRegistry.getStatus() == 2) { // disabled, write empty xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>())); } else { // default, sync from db (aready sync before message, only write) } // sync file setFileRegistryData(xxlRegistry); } } } // clean old message; if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) { xxlRegistryMessageDao.cleanMessage(registryBeatTime); readedMessageIds.clear(); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } });
这里setFileRegistryData方法就是将数据固定到properties文件内。其实我个人觉得这个地方用数据库会不会更好?(个人见解,不喜勿喷)后续其实作者的注册服务器各节点的同步还是通过数据库来实现的。
感觉这里其实已经不单单是注册了,还包括了一部分同步的内容。
因此,已经注册好的服务在数据库中可以查询到key、value信息,也可以在服务对应的properties文件内找到对应的注册信息。
2、服务发现
服务发现就是依据传入的值去寻找对应的value值(应该不局限是服务地址)。
public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) { // valid if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) { return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"); } if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]"); } if (env==null || env.trim().length()<2 || env.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"); } if (keys==null || keys.size()==0) { return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."); } for (String key: keys) { if (key==null || key.trim().length()<4 || key.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"); } } Map<String, List<String>> result = new HashMap<String, List<String>>(); for (String key: keys) { XxlRegistryData xxlRegistryData = new XxlRegistryData(); xxlRegistryData.setBiz(biz); xxlRegistryData.setEnv(env); xxlRegistryData.setKey(key); List<String> dataList = new ArrayList<String>(); XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry!=null) { dataList = fileXxlRegistry.getDataList(); } result.put(key, dataList); } return new ReturnT<Map<String, List<String>>>(result); }
显然这里的服务发现回传值是取自之前注册流程里产生的properties文件。
3、服务移除
服务移除也是使用了另一个阻塞队列removeQueue来进行操作,客户端调用remove方法将需要移除的注册内容写入removeQueue,在线程池中启动线程处理服务移除:
for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { XxlRegistryData xxlRegistryData = removeQueue.take(); if (xxlRegistryData != null) { // delete xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue()); // valid file status XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry == null) { // go on } else if (fileXxlRegistry.getStatus() != 0) { continue; // "Status limited." } else { if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) { continue; // "Repeated limited." } } // checkRegistryDataAndSendMessage checkRegistryDataAndSendMessage(xxlRegistryData); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } }); }
这里服务移除的方案是将value值置空。但并不删除在数据库中的内容。
还有一点需要理解的是各节点间的同步功能,通过同步线程固定周期进行一次同步:
executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { // align to beattime try { long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime; if (sleepSecond>0 && sleepSecond<registryBeatTime) { TimeUnit.SECONDS.sleep(sleepSecond); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { // clean old registry-data in db xxlRegistryDataDao.cleanData(registryBeatTime * 3); // sync registry-data, db + file int offset = 0; int pagesize = 1000; List<String> registryDataFileList = new ArrayList<>(); List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null); while (registryList!=null && registryList.size()>0) { for (XxlRegistry registryItem: registryList) { // process data by status if (registryItem.getStatus() == 1) { // locked, not updated } else if (registryItem.getStatus() == 2) { // disabled, write empty String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>()); registryItem.setData(dataJson); } else { // default, sync from db List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey()); List<String> valueList = new ArrayList<String>(); if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) { for (XxlRegistryData dataItem: xxlRegistryDataList) { valueList.add(dataItem.getValue()); } } String dataJson = JacksonUtil.writeValueAsString(valueList); // check update, sync db if (!registryItem.getData().equals(dataJson)) { registryItem.setData(dataJson); xxlRegistryDao.update(registryItem); } } // sync file String registryDataFile = setFileRegistryData(registryItem); // collect registryDataFile registryDataFileList.add(registryDataFile); } offset += 1000; registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null); } // clean old registry-data file cleanFileRegistryData(registryDataFileList); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(registryBeatTime); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } });
至此,数据库、注册properties文件保持一致。
在开始使用作者提供的客户端测试代码是,使用baseTest会发现注册内容总是会被自动清空。原来设定是每间隔固定时间对不续约数据进行清除(数据库与文件)。
因此在完整版客户端(姑且这个称呼)是会使用线程循环对服务进行续约的,通过这样的方式保持客户端的在线状态监测。
registryThread = new Thread(new Runnable() { @Override public void run() { while (!registryThreadStop) { try { if (registryData.size() > 0) { boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData)); logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData); } } catch (Exception e) { if (!registryThreadStop) { logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e); } } try { TimeUnit.SECONDS.sleep(10); } catch (Exception e) { if (!registryThreadStop) { logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e); } } } logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped."); } }); registryThread.setName("xxl-registry, XxlRegistryClient registryThread."); registryThread.setDaemon(true); registryThread.start();
原文:https://www.cnblogs.com/hblearning/p/xxl_registry.html