首页 > 其他 > 详细

海豚调度器(dolphinscheduler)的容错分析

时间:2021-09-16 12:14:34      阅读:44      评论:0      收藏:0      [点我收藏+]

海豚调度器(dolphinscheduler)的容错,如图:

技术分享图片

容错有两种情况:一是启动第一个master的时候,该master节点扫描processInstance表里面正在还在执行状态的processInstance重新生成command命令,同时将该条processInstance记录的host置为null。

执行状态是指:

 private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
            ExecutionStatus.READY_PAUSE.ordinal(),
            ExecutionStatus.READY_STOP.ordinal()};

启动时一个节点容错的代码:

    // startup tolerant
            if (getActiveMasterNum() == 1) {
                removeZKNodePath(null, ZKNodeType.MASTER, true);
                removeZKNodePath(null, ZKNodeType.WORKER, true);
            }

第二种情况是ZK注册了若干个节点,以ABC三个节点为例,C节点突然某个节点下线了,AB会受到通知,将C ip下面的处于上述列举的执行状态的processInstance重新生成command命令,同时将该条processInstance记录的host置为null,这个过程到底由AB谁来做呢?谁先抢到ZK的锁谁做。

 @Override
    protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
        //monitor master
        if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) {
            handleMasterEvent(event, path);
        } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) {
            //monitor worker
            handleWorkerEvent(event, path);
        }
    }
  public void handleMasterEvent(TreeCacheEvent event, String path) {
        switch (event.getType()) {
            case NODE_ADDED:
                logger.info("master node added : {}", path);
                break;
            case NODE_REMOVED:
                removeZKNodePath(path, ZKNodeType.MASTER, true);
                break;
            default:
                break;
        }
    }
 /**
     * remove zookeeper node path
     *
     * @param path zookeeper node path
     * @param zkNodeType zookeeper node type
     * @param failover is failover
     */
    private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
        logger.info("{} node deleted : {}", zkNodeType, path);
        InterProcessMutex mutex = null;
        try {
            String failoverPath = getFailoverLockPath(zkNodeType);
            // create a distributed lock
            mutex = new InterProcessMutex(getZkClient(), failoverPath);
            mutex.acquire();

            String serverHost = null;
            if (StringUtils.isNotEmpty(path)) {
                serverHost = getHostByEventDataPath(path);
                if (StringUtils.isEmpty(serverHost)) {
                    logger.error("server down error: unknown path: {}", path);
                    return;
                }
                // handle dead server
                handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
            }
            //failover server
            if (failover) {
                failoverServerWhenDown(serverHost, zkNodeType);
            }
        } catch (Exception e) {
            logger.error("{} server failover failed.", zkNodeType);
            logger.error("failover exception ", e);
        } finally {
            releaseMutex(mutex);
        }
    }

海豚调度器(dolphinscheduler)的容错分析

原文:https://www.cnblogs.com/d9e84208/p/15265544.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!