环境:在单机上(机器名:giraphx)启动了2个workers。
输入:SSSP文件夹,里面有1.txt和2.txt两个文件
1. 在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。方法:每个Worker通过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就通过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master创建该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码如下:
//Znode的路径
String addressesAndPartitionsPath =
getAddressesAndPartitionsPath(getApplicationAttempt(),
getSuperstep());
//把该znode的data读入到addressesAndPartitions中
AddressesAndPartitionsWritable addressesAndPartitions =
new AddressesAndPartitionsWritable(
workerGraphPartitioner.createPartitionOwner().getClass());
//当master创建该znode后,退出while循环
while (getZkExt().exists(addressesAndPartitionsPath, true) ==
null) {
//陷入等待状态
getAddressesAndPartitionsReadyChangedEvent().waitForever();
//当master创建该znode后,触发Watcher。调用process进而唤醒线程
getAddressesAndPartitionsReadyChangedEvent().reset();
}
//读入数据
WritableUtils.readFieldsFromZnode(
getZkExt(),
addressesAndPartitionsPath,
false,
null,
addressesAndPartitions);
2. Master调用createInputSplits()方法创建InputSplit。
在generateInputSplits()方法中,根据用户设定的VertexInputFormat获得InputSplits。代码如下:
List<InputSplit> splits=inputFormat.getSplits(getContext(), minSplitCountHint);其中minSplitCountHint为创建split的最小数目,其值如下:
minSplitCountHint = Workers数目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每个Input split loading的线程数目,默认值为1 。 经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。
如果得到的splits.size小于minSplitCountHint,那么有些worker就没被用上。
得到split信息后,要把这些信息写到Zookeeper上,以便其他workers访问。上面得到的split信息如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]遍历splits List,为每个split创建一个Znode,值为split的信息。如为split-0创建Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
为split-1创建znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1最后创建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都创建好了。
3. Master根据splits创建Partitions。首先确定partition的数目。
BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
@Override
public Collection<PartitionOwner> createInitialPartitionOwners(
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
//maxWorkers为Workers的最大数目,用户通过 -w 指定。实验时指定为 2
//availableWorkerInfos为健康的Workers列表,此处为:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)]
int partitionCount = PartitionUtils.computePartitionCount(
availableWorkerInfos, maxWorkers, conf);
List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
//为每个Partition指定一个PartitionOwner,表示该Partition的元数据信息
for (int i = 0; i < partitionCount; ++i) {
PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
//若遍历完availableWorkerInfos,则开始下一轮遍历。
if (!workerIt.hasNext()) {
workerIt = availableWorkerInfos.iterator();
}
ownerList.add(owner);
}
this.partitionOwnerList = ownerList;
return ownerList;
}
上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。
可见,partitionCount值为4(1*2*2)。创建的partitionOwnerList信息如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4. Master创建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir ,用于后面的exchange partition。
5. Master最后在assignPartitionOwners()方法中,把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(作为Znode的data),该Znode的路径为: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
6.
Giraph 源码分析(五)—— 加载数据和重分布,布布扣,bubuko.com
原文:http://blog.csdn.net/xin_jmail/article/details/23687073