首页 > 其他 > 详细

Spark-->PageRank [Scala]

时间:2016-02-28 16:33:58      阅读:259      评论:0      收藏:0      [点我收藏+]

算法的数学分析部分:可参考网络,或者Google PageRank 论文。此处不做讨论,或以后我彻底搞明白再论述。

代码实现:

val sc = new SparkContext(...)
val links = sc.parallelize(Array((A,Array(D)),(B,Array(A)),(C,Array(A,B)),(D,Array(A,C))),2).map(x => (x._1,x._2)).cache()

var ranks = sc.parallelize(Array((A,1.0),(B,1.0),(C,1.0),(D,1.0)),2)

val iterations_num = 10

for(i <- 1 to iterations_num){
    val contribs = links.join(ranks,2).flatMap{
        case(url,(links,rank)) => links.map(dest => (dest,rank/links,size))
    }

    ranks = contribs.reduceByKey(_ + _,2).mapValues(0.15 + 0.85 * _)
}

ranks.saveAsTextFile(...)

Spark调优:

(1) 因为linksRDD在每次迭代中都会和ranks发生连接操作。由于links是一个静态数据集,所以可以在程序一开始的时候就对它进行分区操作,这样就不需要把它通过网络混洗了。

(2)当我们第一次创建ranks时,可以使用mapValues()而不是map来保留父RDD(links)的分区方式,这样对它的第一次链接操作就会开销很小。

改进后的代码:

val sc = new SparkContext(...)
val links = sc.parallelize(Array((A,Array(D)),(B,Array(A)),(C,Array(A,B)),(D,Array(A,C))),2).map(x => (x._1,x._2)).partitionBy(new HashPartitioner(100)).cache()

var ranks = links.mapValues(v=>1.0)

val iterations_num = 10

for(i <- 1 to iterations_num){
    val contribs = links.join(ranks,2).flatMap{
        case(url,(links,rank)) => links.map(dest => (dest,rank/links,size))
    }

    ranks = contribs.reduceByKey(_ + _,2).mapValues(0.15 + 0.85 * _)
}

ranks.saveAsTextFile(...)   

Spark-->PageRank [Scala]

原文:http://www.cnblogs.com/ducong/p/5224855.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!