@Override public ReturnT<String> registry(String accessToken, String biz, String env, List<XxlRegistryData> registryDataList) { // valid if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) { return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid"); } if (biz==null || biz.trim().length()<4 || biz.trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Biz Invalid[4~255]"); } if (env==null || env.trim().length()<2 || env.trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"); } if (registryDataList==null || registryDataList.size()==0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry DataList Invalid"); } for (XxlRegistryData registryData: registryDataList) { if (registryData.getKey()==null || registryData.getKey().trim().length()<4 || registryData.getKey().trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Key Invalid[4~255]"); } if (registryData.getValue()==null || registryData.getValue().trim().length()<4 || registryData.getValue().trim().length()>255) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Registry Value Invalid[4~255]"); } } // fill + add queue for (XxlRegistryData registryData: registryDataList) { registryData.setBiz(biz); registryData.setEnv(env); } registryQueue.addAll(registryDataList); return ReturnT.SUCCESS; }
for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { XxlRegistryData xxlRegistryData = registryQueue.take(); if (xxlRegistryData !=null) { // refresh or add int ret = xxlRegistryDataDao.refresh(xxlRegistryData); if (ret == 0) { xxlRegistryDataDao.add(xxlRegistryData); } // valid file status XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry == null) { // go on } else if (fileXxlRegistry.getStatus() != 0) { continue; // "Status limited." } else { if (fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) { continue; // "Repeated limited." } } // checkRegistryDataAndSendMessage checkRegistryDataAndSendMessage(xxlRegistryData); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } }); }
private void checkRegistryDataAndSendMessage(XxlRegistryData xxlRegistryData){ // data json List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey()); List<String> valueList = new ArrayList<>(); if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) { for (XxlRegistryData dataItem: xxlRegistryDataList) { valueList.add(dataItem.getValue()); } } String dataJson = JacksonUtil.writeValueAsString(valueList); // update registry and message XxlRegistry xxlRegistry = xxlRegistryDao.load(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey()); boolean needMessage = false; if (xxlRegistry == null) { xxlRegistry = new XxlRegistry(); xxlRegistry.setBiz(xxlRegistryData.getBiz()); xxlRegistry.setEnv(xxlRegistryData.getEnv()); xxlRegistry.setKey(xxlRegistryData.getKey()); xxlRegistry.setData(dataJson); xxlRegistryDao.add(xxlRegistry); needMessage = true; } else { // check status, locked and disabled not use if (xxlRegistry.getStatus() != 0) { return; } if (!xxlRegistry.getData().equals(dataJson)) { xxlRegistry.setData(dataJson); xxlRegistryDao.update(xxlRegistry); needMessage = true; } } if (needMessage) { // sendRegistryDataUpdateMessage (registry update) sendRegistryDataUpdateMessage(xxlRegistry); } }
executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { // new message, filter readed List<XxlRegistryMessage> messageList = xxlRegistryMessageDao.findMessage(readedMessageIds); if (messageList!=null && messageList.size()>0) { for (XxlRegistryMessage message: messageList) { readedMessageIds.add(message.getId()); if (message.getType() == 0) { // from registry、add、update、deelete,ne need sync from db, only write XxlRegistry xxlRegistry = JacksonUtil.readValue(message.getData(), XxlRegistry.class); // process data by status if (xxlRegistry.getStatus() == 1) { // locked, not updated } else if (xxlRegistry.getStatus() == 2) { // disabled, write empty xxlRegistry.setData(JacksonUtil.writeValueAsString(new ArrayList<String>())); } else { // default, sync from db (aready sync before message, only write) } // sync file setFileRegistryData(xxlRegistry); } } } // clean old message; if ( (System.currentTimeMillis()/1000) % registryBeatTime ==0) { xxlRegistryMessageDao.cleanMessage(registryBeatTime); readedMessageIds.clear(); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } });
public ReturnT<Map<String, List<String>>> discovery(String accessToken, String biz, String env, List<String> keys) { // valid if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) { return new ReturnT<>(ReturnT.FAIL_CODE, "AccessToken Invalid"); } if (biz==null || biz.trim().length()<2 || biz.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Biz Invalid[2~255]"); } if (env==null || env.trim().length()<2 || env.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Env Invalid[2~255]"); } if (keys==null || keys.size()==0) { return new ReturnT<>(ReturnT.FAIL_CODE, "keys Invalid."); } for (String key: keys) { if (key==null || key.trim().length()<4 || key.trim().length()>255) { return new ReturnT<>(ReturnT.FAIL_CODE, "Key Invalid[4~255]"); } } Map<String, List<String>> result = new HashMap<String, List<String>>(); for (String key: keys) { XxlRegistryData xxlRegistryData = new XxlRegistryData(); xxlRegistryData.setBiz(biz); xxlRegistryData.setEnv(env); xxlRegistryData.setKey(key); List<String> dataList = new ArrayList<String>(); XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry!=null) { dataList = fileXxlRegistry.getDataList(); } result.put(key, dataList); } return new ReturnT<Map<String, List<String>>>(result); }
for (int i = 0; i < 10; i++) { executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { try { XxlRegistryData xxlRegistryData = removeQueue.take(); if (xxlRegistryData != null) { // delete xxlRegistryDataDao.deleteDataValue(xxlRegistryData.getBiz(), xxlRegistryData.getEnv(), xxlRegistryData.getKey(), xxlRegistryData.getValue()); // valid file status XxlRegistry fileXxlRegistry = getFileRegistryData(xxlRegistryData); if (fileXxlRegistry == null) { // go on } else if (fileXxlRegistry.getStatus() != 0) { continue; // "Status limited." } else { if (!fileXxlRegistry.getDataList().contains(xxlRegistryData.getValue())) { continue; // "Repeated limited." } } // checkRegistryDataAndSendMessage checkRegistryDataAndSendMessage(xxlRegistryData); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } }); }
executorService.execute(new Runnable() { @Override public void run() { while (!executorStoped) { // align to beattime try { long sleepSecond = registryBeatTime - (System.currentTimeMillis()/1000)%registryBeatTime; if (sleepSecond>0 && sleepSecond<registryBeatTime) { TimeUnit.SECONDS.sleep(sleepSecond); } } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { // clean old registry-data in db xxlRegistryDataDao.cleanData(registryBeatTime * 3); // sync registry-data, db + file int offset = 0; int pagesize = 1000; List<String> registryDataFileList = new ArrayList<>(); List<XxlRegistry> registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null); while (registryList!=null && registryList.size()>0) { for (XxlRegistry registryItem: registryList) { // process data by status if (registryItem.getStatus() == 1) { // locked, not updated } else if (registryItem.getStatus() == 2) { // disabled, write empty String dataJson = JacksonUtil.writeValueAsString(new ArrayList<String>()); registryItem.setData(dataJson); } else { // default, sync from db List<XxlRegistryData> xxlRegistryDataList = xxlRegistryDataDao.findData(registryItem.getBiz(), registryItem.getEnv(), registryItem.getKey()); List<String> valueList = new ArrayList<String>(); if (xxlRegistryDataList!=null && xxlRegistryDataList.size()>0) { for (XxlRegistryData dataItem: xxlRegistryDataList) { valueList.add(dataItem.getValue()); } } String dataJson = JacksonUtil.writeValueAsString(valueList); // check update, sync db if (!registryItem.getData().equals(dataJson)) { registryItem.setData(dataJson); xxlRegistryDao.update(registryItem); } } // sync file String registryDataFile = setFileRegistryData(registryItem); // collect registryDataFile registryDataFileList.add(registryDataFile); } offset += 1000; registryList = xxlRegistryDao.pageList(offset, pagesize, null, null, null); } // clean old registry-data file cleanFileRegistryData(registryDataFileList); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(registryBeatTime); } catch (Exception e) { if (!executorStoped) { logger.error(e.getMessage(), e); } } } } });
registryThread = new Thread(new Runnable() { @Override public void run() { while (!registryThreadStop) { try { if (registryData.size() > 0) { boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData)); logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData); } } catch (Exception e) { if (!registryThreadStop) { logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e); } } try { TimeUnit.SECONDS.sleep(10); } catch (Exception e) { if (!registryThreadStop) { logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e); } } } logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped."); } }); registryThread.setName("xxl-registry, XxlRegistryClient registryThread."); registryThread.setDaemon(true); registryThread.start();