程序功能:收集顶点指向的邻居中所在地
/* * 找出每个顶点所指向的邻居中所在的地区 */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import scala.collection.mutable.Map import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD object testApp{ def main(args:Array[String]){ val conf = new SparkConf().setAppName("testApp") val sc = new SparkContext(conf) val graph = GraphLoader.edgeListFile(sc, "/home/spark/spark/graphx/data/followers.txt")//加载边时顶点是边上出现的点 val users = sc.textFile("/home/spark/spark/graphx/data/users.txt").map { line => val fields = line.split(",") (fields(0).toLong,(fields(1),fields(2)))//解析顶点数据:ID(一定转成Long型),姓名,地区 } val myGraph=Graph.apply(users,graph.edges)//重构图,顶点数据以users为准 val vertices=myGraph.mapReduceTriplets[Map[String,Int]](//收集每个定点指向的邻居所在的地区 triplet=>Iterator((triplet.srcId,Map[String,Int](triplet.dstAttr._2->1))),//Map function单向发送消息给有向边的源顶点 (a,b)=>{//Reduce function汇集消息 var myMap=Map[String,Int]() for((k,v)<-a){ if(b.contains(k)) { var t=a(k)+b(k) myMap+=(k->t) } else myMap+=(k->a(k)) } myMap //返回汇集的结果 } ) vertices.collect.foreach(a=>print(a+"\n"))//打印收集的邻居所在地 } }
1,BarackObama,American 2,ladygaga,American 3,John,American 4,xiaoming,Beijing 6,Hanmeimei,Beijing 7,Polly,American 8,Tom,American
2 1 4 1 1 2 6 3 7 3 7 6 6 7 3 7
(4,Map(American -> 1)) (6,Map(American -> 2)) (2,Map(American -> 1)) (1,Map(American -> 1)) (3,Map(American -> 1)) (7,Map(American -> 1))
工程目录结构:
./test.sbt ./src ./src/main ./src/main/scala ./src/main/scala/testApp.scala
test.sbt内容:
name := "test Project" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" libraryDependencies += "org.apache.spark" %% "spark-graphx" %"1.0.1" resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
sbt package
../../bin/spark-submit --class "testApp" --master local[4] target/scala-2.10/test-project_2.10-1.0.jar
原文:http://blog.csdn.net/liuxuejiang158blog/article/details/37876217