首页 > 其他 > 详细

Spark 累加器

时间:2016-11-14 07:35:33      阅读:238      评论:0      收藏:0      [点我收藏+]

由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量,

对于其概念与理解可以参考:共享变量(广播变量和累加器)  。可能需要注意:Spark累加器(Accumulator)陷阱及解决办法

因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率。

如下通过累加器构造了一个DF数据间
的映射关系,

defgetMap(spark:SparkSession,data:DataFrame){
//通过collectionAccumulator构造Map关系
valmyAccumulator=spark.sparkContext.collectionAccumulator[(String,Long)]
data.foreach(
row=>{
valname=row.getAs[String]("name")
valage=row.getAs[Long]("age")
myAccumulator.add(name,age)
}
)
valaiterator:util.Iterator[(String,Long)]=myAccumulator.value.iterator()
varnewMap:Map[String,Long]=Map()
while(aiterator.hasNext){
vala=aiterator.next()
valkey=a._1
valvalue=a._2
if(!newMap.contains(key)){
newMap+=(key->value)
}
else{
valoldvalue=newMap(key)
newMap+=(key->(oldvalue+value))
}
}
}

 

Spark 累加器

原文:http://www.cnblogs.com/namhwik/p/6060509.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!