//广播变量其实就是将一个变量传播到每个excetor,实现excetor共享同一个只读变量.
//其中有一个难题就是,动态广播变量.我在实验过程中只是实现了不同job的广播变量更改,对于有N分钟修改一次还没有试验出更好的方法
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()//.master("local")
.appName("WordCount").getOrCreate()
val sc = new JavaSparkContext(sparkSession.sparkContext).sc
var runStatus = true;
var broadcast:Broadcast[Long] = sc.broadcast(System.currentTimeMillis())
class ThreadExample(sc : SparkContext) extends Thread{
override def run(): Unit ={
while(runStatus){
val time = System.currentTimeMillis()
broadcast.unpersist()
broadcast = sc.broadcast(time)
Thread.sleep(1000)
}
}
}
new ThreadExample(sc).start()
var firstrdd = sc.textFile("").map(x=>broadcast)
firstrdd.saveAsTextFile("")
var secondrdd = sc.textFile("").map(x=>broadcast)
firstrdd.saveAsTextFile("")
runStatus = false
}
//累加器实际上就是共享变量,实现多个excetor对同一份变量的多次操作
val sparkSession = SparkSession.builder()//.master("local")
.appName("WordCount").getOrCreate()
val sc = new JavaSparkContext(sparkSession.sparkContext).sc
// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、 CollectionAccumulator
// LongAccumulator: 数值型累加
val longAccumulator = sc.longAccumulator("long-account")
// DoubleAccumulator: 小数型累加
val doubleAccumulator = sc.doubleAccumulator("double-account")
// CollectionAccumulator:集合累加
val collectionAccumulator:CollectionAccumulator[Int] = sc.collectionAccumulator("collection-account")
var firstrdd = sc.textFile("s3://transsion-bigdata-warehouse/ods/athena_10410001/dt=20191210/hour=00/*")
.map(x=>{
longAccumulator.value;
doubleAccumulator.add(1)
collectionAccumulator.add(1)
x
})
firstrdd.saveAsTextFile("s3://transsion-athena/test/push_msg/first")
print("longAccumulator:"+longAccumulator.count)
print("doubleAccumulator:"+doubleAccumulator.count)
print("collectionAccumulator:"+collectionAccumulator.value.size())
//longAccumulator:0 doubleAccumulator:2817336 collectionAccumulator:2817336
原文:https://www.cnblogs.com/wuxiaolong4/p/12046722.html