首页 > 其他 > 详细

第一个GraphX程序

时间:2014-07-16 16:34:13      阅读:738      评论:0      收藏:0      [点我收藏+]

   程序功能:收集顶点指向的邻居中所在地

/*
 * 找出每个顶点所指向的邻居中所在的地区
 */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.collection.mutable.Map
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object testApp{
    def main(args:Array[String]){
        val conf = new SparkConf().setAppName("testApp")
        val sc = new SparkContext(conf)

        val graph = GraphLoader.edgeListFile(sc, "/home/spark/spark/graphx/data/followers.txt")//加载边时顶点是边上出现的点

        val users = sc.textFile("/home/spark/spark/graphx/data/users.txt").map { line =>
            val fields = line.split(",")
            (fields(0).toLong,(fields(1),fields(2)))//解析顶点数据:ID(一定转成Long型),姓名,地区
        }

        val myGraph=Graph.apply(users,graph.edges)//重构图,顶点数据以users为准

        val vertices=myGraph.mapReduceTriplets[Map[String,Int]](//收集每个定点指向的邻居所在的地区
            triplet=>Iterator((triplet.srcId,Map[String,Int](triplet.dstAttr._2->1))),//Map function单向发送消息给有向边的源顶点
            (a,b)=>{//Reduce function汇集消息
                var myMap=Map[String,Int]()
                for((k,v)<-a){
                    if(b.contains(k))
                    {
                        var t=a(k)+b(k)
                        myMap+=(k->t)
                    }
                    else
                        myMap+=(k->a(k))
                }
                myMap //返回汇集的结果
            }
        )

        vertices.collect.foreach(a=>print(a+"\n"))//打印收集的邻居所在地
    }
}


users.txt顶点数据:ID,姓名,地区

1,BarackObama,American
2,ladygaga,American
3,John,American
4,xiaoming,Beijing
6,Hanmeimei,Beijing
7,Polly,American
8,Tom,American


followers.txt边数据:只有源顶点和目标顶点,中间以空格隔开,多余的列无用,如:2 1 other 有3列数据,但是graphx只会读取前两列

2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

结果:

(4,Map(American -> 1))
(6,Map(American -> 2))
(2,Map(American -> 1))
(1,Map(American -> 1))
(3,Map(American -> 1))
(7,Map(American -> 1))


工程目录结构:

./test.sbt
./src
./src/main
./src/main/scala
./src/main/scala/testApp.scala


test.sbt内容:

name := "test Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1"

libraryDependencies += "org.apache.spark" %% "spark-graphx" %"1.0.1"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"



命令行执行如下进行编译:

sbt package



命令行执行如下向集群提交(开启了集群为前提),这里我的工程建在spark安装目录下apps/testApp下,因此下面开头是../../

 ../../bin/spark-submit --class "testApp" --master local[4] target/scala-2.10/test-project_2.10-1.0.jar


第一个GraphX程序,布布扣,bubuko.com

第一个GraphX程序

原文:http://blog.csdn.net/liuxuejiang158blog/article/details/37876217

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!