算法的数学分析部分:可参考网络,或者Google PageRank 论文。此处不做讨论,或以后我彻底搞明白再论述。
代码实现:
val sc = new SparkContext(...) val links = sc.parallelize(Array((‘A‘,Array(‘D‘)),(‘B‘,Array(‘A‘)),(‘C‘,Array(‘A‘,‘B‘)),(‘D‘,Array(‘A‘,‘C‘))),2).map(x => (x._1,x._2)).cache() var ranks = sc.parallelize(Array((‘A‘,1.0),(‘B‘,1.0),(‘C‘,1.0),(‘D‘,1.0)),2) val iterations_num = 10 for(i <- 1 to iterations_num){ val contribs = links.join(ranks,2).flatMap{ case(url,(links,rank)) => links.map(dest => (dest,rank/links,size)) } ranks = contribs.reduceByKey(_ + _,2).mapValues(0.15 + 0.85 * _) } ranks.saveAsTextFile(...)
Spark调优:
(1) 因为linksRDD在每次迭代中都会和ranks发生连接操作。由于links是一个静态数据集,所以可以在程序一开始的时候就对它进行分区操作,这样就不需要把它通过网络混洗了。
(2)当我们第一次创建ranks时,可以使用mapValues()而不是map来保留父RDD(links)的分区方式,这样对它的第一次链接操作就会开销很小。
改进后的代码:
val sc = new SparkContext(...) val links = sc.parallelize(Array((‘A‘,Array(‘D‘)),(‘B‘,Array(‘A‘)),(‘C‘,Array(‘A‘,‘B‘)),(‘D‘,Array(‘A‘,‘C‘))),2).map(x => (x._1,x._2)).partitionBy(new HashPartitioner(100)).cache() var ranks = links.mapValues(v=>1.0) val iterations_num = 10 for(i <- 1 to iterations_num){ val contribs = links.join(ranks,2).flatMap{ case(url,(links,rank)) => links.map(dest => (dest,rank/links,size)) } ranks = contribs.reduceByKey(_ + _,2).mapValues(0.15 + 0.85 * _) } ranks.saveAsTextFile(...)
原文:http://www.cnblogs.com/ducong/p/5224855.html