关于GraphX的join操作,官网上给出两个方法:
JoinVertices连接输入RDD顶点并返回新的Graph,RDD中没有匹配值的顶点就保留其原始值。如果RDD对于给定的顶点包含多个值,则会使用一个。
outerJoinVertices除了可以将用户定义的函数应用于所有顶点并可以更改顶点属性外,若顶点在输入RDD中没有匹配值时,可以使用None值作为该点的属性值。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD object JoinDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") //设置顶点数据 val users: RDD[(VertexId, (String, Int))] = sc.parallelize(Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))) //设置边 val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3))) val graph = Graph(users, edges) // 任意设置一个新的外部rdd val rdd: RDD[(VertexId, Boolean)] = sc.parallelize(Array((3L, true), (3L, false), (2L, true), (1L, true), (5L, false), (4L, false))) rdd.collect.foreach(println(_)) println("******************************************") graph.vertices.collect.foreach(println(_)) //若外部rdd为true,年龄+1;若的false,年龄-1;若没有关联到,则设置年龄为0 val graph2: Graph[(String, Int), Int] = graph.outerJoinVertices(rdd)((id, attr, ages) => ( ages match { case Some(true) => (attr._1, attr._2 + 1) case Some(false) => (attr._1, attr._2 - 1) case None => (attr._1, 0) } )) println("***********************************************") graph2.vertices.collect.foreach(println(_)) } }
运行结果:
原文:https://www.cnblogs.com/WhiteDeer-w/p/12707275.html