环境:在单机上(机器名:giraphx)启动了2个workers。
输入:SSSP文件夹,里面有1.txt和2.txt两个文件
1. 在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。方法:每个Worker通过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就通过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master创建该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码如下:
//Znode的路径 String addressesAndPartitionsPath = getAddressesAndPartitionsPath(getApplicationAttempt(), getSuperstep()); //把该znode的data读入到addressesAndPartitions中 AddressesAndPartitionsWritable addressesAndPartitions = new AddressesAndPartitionsWritable( workerGraphPartitioner.createPartitionOwner().getClass()); //当master创建该znode后,退出while循环 while (getZkExt().exists(addressesAndPartitionsPath, true) == null) { //陷入等待状态 getAddressesAndPartitionsReadyChangedEvent().waitForever(); //当master创建该znode后,触发Watcher。调用process进而唤醒线程 getAddressesAndPartitionsReadyChangedEvent().reset(); } //读入数据 WritableUtils.readFieldsFromZnode( getZkExt(), addressesAndPartitionsPath, false, null, addressesAndPartitions);
2. Master调用createInputSplits()方法创建InputSplit。
在generateInputSplits()方法中,根据用户设定的VertexInputFormat获得InputSplits。代码如下:
List<InputSplit> splits=inputFormat.getSplits(getContext(), minSplitCountHint);其中minSplitCountHint为创建split的最小数目,其值如下:
minSplitCountHint = Workers数目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每个Input split loading的线程数目,默认值为1 。 经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。
如果得到的splits.size小于minSplitCountHint,那么有些worker就没被用上。
得到split信息后,要把这些信息写到Zookeeper上,以便其他workers访问。上面得到的split信息如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]遍历splits List,为每个split创建一个Znode,值为split的信息。如为split-0创建Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
为split-1创建znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1最后创建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都创建好了。
3. Master根据splits创建Partitions。首先确定partition的数目。
BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
@Override public Collection<PartitionOwner> createInitialPartitionOwners( Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) { //maxWorkers为Workers的最大数目,用户通过 -w 指定。实验时指定为 2 //availableWorkerInfos为健康的Workers列表,此处为:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] int partitionCount = PartitionUtils.computePartitionCount( availableWorkerInfos, maxWorkers, conf); List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>(); Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator(); //为每个Partition指定一个PartitionOwner,表示该Partition的元数据信息 for (int i = 0; i < partitionCount; ++i) { PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next()); //若遍历完availableWorkerInfos,则开始下一轮遍历。 if (!workerIt.hasNext()) { workerIt = availableWorkerInfos.iterator(); } ownerList.add(owner); } this.partitionOwnerList = ownerList; return ownerList; }
上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。
可见,partitionCount值为4(1*2*2)。创建的partitionOwnerList信息如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4. Master创建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir ,用于后面的exchange partition。
5. Master最后在assignPartitionOwners()方法中,把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(作为Znode的data),该Znode的路径为: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
6.
Giraph 源码分析(五)—— 加载数据和重分布,布布扣,bubuko.com
原文:http://blog.csdn.net/xin_jmail/article/details/23687073