首页 > 其他 > 详细

Giraph 源码分析(五)—— 加载数据和重分布

时间:2014-04-15 03:18:42      阅读:591      评论:0      收藏:0      [点我收藏+]

环境:在单机上(机器名:giraphx)启动了2个workers。

输入:SSSP文件夹,里面有1.txt和2.txt两个文件

1. 在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。方法:每个Worker通过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就通过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master创建该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码如下:

	//Znode的路径
    String addressesAndPartitionsPath =
        getAddressesAndPartitionsPath(getApplicationAttempt(),
            getSuperstep());
	//把该znode的data读入到addressesAndPartitions中
    AddressesAndPartitionsWritable addressesAndPartitions =
        new AddressesAndPartitionsWritable(
            workerGraphPartitioner.createPartitionOwner().getClass());
	 //当master创建该znode后,退出while循环
     while (getZkExt().exists(addressesAndPartitionsPath, true) ==
          null) {
		 //陷入等待状态
        getAddressesAndPartitionsReadyChangedEvent().waitForever();
		//当master创建该znode后,触发Watcher。调用process进而唤醒线程
        getAddressesAndPartitionsReadyChangedEvent().reset();
      }
	  //读入数据
      WritableUtils.readFieldsFromZnode(
          getZkExt(),
          addressesAndPartitionsPath,
          false,
          null,
          addressesAndPartitions);

2. Master调用createInputSplits()方法创建InputSplit。

    bubuko.com,布布扣

在generateInputSplits()方法中,根据用户设定的VertexInputFormat获得InputSplits。代码如下:

List<InputSplit> splits=inputFormat.getSplits(getContext(), minSplitCountHint);
其中minSplitCountHint为创建split的最小数目,其值如下:

minSplitCountHint = Workers数目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每个Input split loading的线程数目,默认值为1 。 经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。

如果得到的splits.size小于minSplitCountHint,那么有些worker就没被用上。

     得到split信息后,要把这些信息写到Zookeeper上,以便其他workers访问。上面得到的split信息如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]
遍历splits List,为每个split创建一个Znode,值为split的信息。如为split-0创建Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

为split-1创建znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
最后创建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都创建好了。

3. Master根据splits创建Partitions。首先确定partition的数目。

bubuko.com,布布扣

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:

@Override
  public Collection<PartitionOwner> createInitialPartitionOwners(
      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
	  //maxWorkers为Workers的最大数目,用户通过 -w 指定。实验时指定为 2
	  //availableWorkerInfos为健康的Workers列表,此处为:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)]
    int partitionCount = PartitionUtils.computePartitionCount(
        availableWorkerInfos, maxWorkers, conf);
    List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
	//为每个Partition指定一个PartitionOwner,表示该Partition的元数据信息
    for (int i = 0; i < partitionCount; ++i) {
      PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
	  //若遍历完availableWorkerInfos,则开始下一轮遍历。
      if (!workerIt.hasNext()) {
        workerIt = availableWorkerInfos.iterator();
      }
      ownerList.add(owner);
    }
    this.partitionOwnerList = ownerList;
    return ownerList;
  }

上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。

可见,partitionCount值为4(1*2*2)。创建的partitionOwnerList信息如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null), 

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null), 

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4. Master创建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir ,用于后面的exchange partition。

5. Master最后在assignPartitionOwners()方法中,把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(作为Znode的data),该Znode的路径为:      /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

6. 






Giraph 源码分析(五)—— 加载数据和重分布,布布扣,bubuko.com

Giraph 源码分析(五)—— 加载数据和重分布

原文:http://blog.csdn.net/xin_jmail/article/details/23687073

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!