public static DataNode instantiateDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new Configuration();//创建Configuration对象 if (!parseArguments(args, conf)) { printUsage(); System.exit(-2); } if (conf.get("") != null) { LOG.error("This configuration for rack identification is not supported" + " anymore. RackID resolution is handled by the NameNode."); System.exit(-1); } String[] dataDirs = conf.getStrings(DATA_DIR_KEY);//获取数据块的存储目录 dnThreadName = "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]"; DefaultMetricsSystem.initialize("DataNode"); return makeInstance(dataDirs, conf, resources);//创建DataNode对象 }
public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException { UserGroupInformation.setConfiguration(conf); LocalFileSystem localFS = FileSystem.getLocal(conf);//获取本地文件系统 ArrayList<File> dirs = new ArrayList<File>(); FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION)); for (String dir : dataDirs) { try { DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);//检查文件夹在本地文件系统中是否存在,如果存在就检查文件所属用户的权限 dirs.add(new File(dir));//将这个目录加入到列表中 } catch(IOException e) { LOG.warn("Invalid directory in " + DATA_DIR_KEY + ": " + e.getMessage()); } } if (dirs.size() > 0) //至少有一个目录存在,且权限满足 return new DataNode(conf, dirs, resources); LOG.error("All directories in " + DATA_DIR_KEY + " are invalid."); return null; }
DataNode(final Configuration conf, final AbstractList<File> dataDirs, SecureResources resources) throws IOException { super(conf); SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY); datanodeObject = this; durableSync = conf.getBoolean("dfs.durable.sync", true); this.userWithLocalPathAccess = conf .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); try { startDataNode(conf, dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } }
InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);//获取NameNode的地址
//默认当NameNode与DataNode的build版本不一致时,DataNode拒绝连接到NameNode,这个选项如果为true,则只检查版本,而不检查其他 this.relaxedVersionCheck = conf.getBoolean( CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY, CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_DEFAULT); //默认当NameNode与DataNode的build版本不一致时,DataNode拒绝连接到NameNode,这个选项如果为true,则不检查版本,并且这个选项值可覆盖relaxedVersionCheck noVersionCheck = conf.getBoolean( CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY, CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_DEFAULT);
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);//数据节点提供数据流服务的地址 int tmpPort = socAddr.getPort(); storage = new DataStorage(); // construct registration this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);//数据节点在NameNode节点注册的对象
// connect to name node,调用RPC.waitForProxy()方法得到一个动态DatanodeProtocol对象的引用,在waitForProxy方法中调用了getProxy方法,IPC中分析过 //getProxy方法 this.namenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf);
static VersionedProtocol waitForProxy( Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout) throws IOException { long startTime = System.currentTimeMillis(); IOException ioe; while (true) { try { return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); } catch(ConnectException se) { // namenode has not been started"Server at " + addr + " not available yet, Zzzzz..."); ioe = se; } catch(SocketTimeoutException te) { // namenode is busy"Problem connecting to server: " + addr); ioe = te; } // check if timed out if (System.currentTimeMillis()-connTimeout >= startTime) { throw ioe; } // wait for retry try { Thread.sleep(1000); } catch (InterruptedException ie) { // IGNORE } } }
NamespaceInfo nsInfo = handshake();
/** * 与NameNode握手,数据节点会保证它的构建信息和存储系统结构版本号和名字节点的一致 * @return * @throws IOException */ private NamespaceInfo handshake() throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); while (shouldRun) { try { nsInfo = namenode.versionRequest();//调用远程方法,获取名字节点的信息 break; } catch(SocketTimeoutException e) { // namenode is busy"Problem connecting to server: " + getNameNodeAddr()); try { Thread.sleep(1000); } catch (InterruptedException ie) {} } } if (!isPermittedVersion(nsInfo)) { String errorMsg = "Shutting down. Incompatible version or revision." + "DataNode version ‘" + VersionInfo.getVersion() + "‘ and revision ‘" + VersionInfo.getRevision() + "‘ and NameNode version ‘" + nsInfo.getVersion() + "‘ and revision ‘" + nsInfo.getRevision() + " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY + " is " + (relaxedVersionCheck ? "enabled" : "not enabled") + " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY + " is " + (noVersionCheck ? "enabled" : "not enabled"); LOG.fatal(errorMsg); notifyNamenode(DatanodeProtocol.NOTIFY, errorMsg); throw new IOException( errorMsg ); } assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : "Data-node and name-node layout versions must be the same." + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion(); return nsInfo; }
boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);
//先将存储目录从某一状态中恢复,再读取目录中的信息 storage.recoverTransitionRead(nsInfo, dataDirs, startOpt); // adjust this.dnRegistration.setStorageInfo(storage); // initialize data node internal structure = new FSDataset(storage, conf);
// find free port or use privileged port provide ServerSocket ss; if(secureResources == null) { ss = (socketWriteTimeout > 0) ? : new ServerSocket(); Server.bind(ss, socAddr, 0); } else { ss = resources.getStreamingSocket(); } //默认为8KB,这里设置为128KB,因为数据节点需要提供搞吞吐率的数据服务,所以需要较大的缓冲区,这个缓冲区适用于所有从accept方法返回的参数 ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); this.dnRegistration.setName(machineName + ":" + tmpPort);
//使用一个线程组 this.threadGroup = new ThreadGroup("dataXceiverServer"); this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));//DataXceiverServer中有一个ServerSocket对象,用于接收客户端Socket this.threadGroup.setDaemon(true); // auto destroy when empty,设置线程组中所有的线程为守护线程
InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.get("dfs.datanode.ipc.address")); ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false, conf, blockTokenSecretManager); dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
/** Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */ public static void runDatanodeDaemon(DataNode dn) throws IOException { if (dn != null) { //register datanode dn.register(); dn.dataNodeThread = new Thread(dn, dnThreadName); dn.dataNodeThread.setDaemon(true); // needed for JUnit testing dn.dataNodeThread.start(); } }
private void register() throws IOException { if (dnRegistration.getStorageID().equals("")) { setNewStorageID(dnRegistration);//构造一个新的数据节点标识 } while(shouldRun) { try { // reset name to machineName. Mainly for web interface. = machineName + ":" + dnRegistration.getPort();//注册信息 dnRegistration = namenode.register(dnRegistration); break; } catch(SocketTimeoutException e) { // namenode is busy try { Thread.sleep(1000); } catch (InterruptedException ie) {} } } //数据节点注册后还需要根据目前数据节点的配置情况进执行一些后续处理 if (storage.getStorageID().equals("")) { storage.setStorageID(dnRegistration.getStorageID()); storage.writeAll();//完成数据节点的存储初始化 } if(! storage.getStorageID().equals(dnRegistration.getStorageID())) { throw new IOException("Inconsistent storage IDs. Name-node returned " + dnRegistration.getStorageID() + ". Expecting " + storage.getStorageID()); } if (durableSync) { Block[] bbwReport = data.getBlocksBeingWrittenReport();//获取blocksBeingWritten目录下面的所有Block封装成Block[]数组 long[] blocksBeingWritten = BlockListAsLongs.convertToArrayLongs(bbwReport); namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten); } // random short delay - helps scatter the BR from all DNs // - but we can start generating the block report immediately data.requestAsyncBlockReport(); //在随机等待一小段时间后,DataNode的主循环offerService()方法就会使用blockReport()上报数据节点上现有的数据块信息 scheduleBlockReport(initialBlockReportDelay); }
public void run() { // start dataXceiveServer dataXceiverServer.start(); ipcServer.start(); while (shouldRun) { try { startDistributedUpgradeIfNeeded(); offerService(); } catch (Exception ex) { LOG.error("Exception: " + StringUtils.stringifyException(ex)); if (shouldRun) { try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } }
