本文介紹如何基于Spark和Java来实现一个Join算子的应用示例。
创建一个Maven工程,pom.xml文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ralgond</groupId>
<artifactId>spark-java-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
创建一个包com.github.ralgond.sparkjavaapi,在该包下创建一个名为JoinByItemId的类,该类内容如下:
package com.github.ralgond.sparkjavaapi;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
public class JoinByItemId {
public static void main(String args[]) {
String userFilePath = args[0];
String itemFilePath = args[1];
SparkConf conf = new SparkConf().setAppName("JoinByItemId Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> udata1 = sc.textFile(userFilePath);
JavaPairRDD<String, String> udata2 = udata1.mapToPair(line -> {
String[] a = line.split("\\s+", 2);
return new Tuple2<String, String>(a[0], a[1]);
});
JavaPairRDD<String, String> udata3 = udata2.mapToPair(t -> new Tuple2<String, String>(t._2, t._1));
JavaRDD<String> idata1 = sc.textFile(itemFilePath);
JavaPairRDD<String, String> idata2 = idata1.mapToPair(line -> {
String[] a = line.split("\\s+", 2);
return new Tuple2<String, String>(a[0], a[1]);
});
JavaPairRDD<String, Tuple2<String, Optional<String>>> rddWithJoin = udata3.leftOuterJoin(idata2);
JavaPairRDD<String, String> res = rddWithJoin.mapToPair(t -> {
if (t._2()._2().isPresent()) {
return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+t._2()._2().get());
} else {
return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+"NULL");
}
});
List<Tuple2<String, String>> res2 = res.collect();
System.out.println(res2);
}
}
进入spark的安装目录,在data文件夹里面创建文件夹spark-java-api\JoinByItemId,在{SPARK_HOME}\data\spark-java-api\JoinByItemId创建两个文件:user.txt和item.txt。其中
user.txt的内容为:
A 1
B 1
C 2
D 2
E 3
item.txt的内容为:
1 item1
2 item2
3 item3
通过mvn clean package编译出jar包spark-java-api-0.0.1-SNAPSHOT.jar。
到spark安装目录里,执行如下命令:
bin\spark-submit --class com.github.ralgond.sparkjavaapi.JoinByItemId D:\ralgond\spark-java-api\target\spark-java-api-0.0.1-SNAPSHOT.jar data\spark-java-api\JoinByItemId\user.txt data\spark-java-api\JoinByItemId\item.txt
便可以看到结果:
原文:https://www.cnblogs.com/ralgo/p/14872641.html