首页 > 编程语言 > 详细

Spark和Java API(三)Join

时间:2021-06-10 23:54:05      阅读:28      评论:0      收藏:0      [点我收藏+]

本文介紹如何基于Spark和Java来实现一个Join算子的应用示例。

创建工程

创建一个Maven工程,pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.github.ralgond</groupId>
	<artifactId>spark-java-api</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.12</artifactId>
			<version>3.1.1</version>
			<scope>provided</scope>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.0</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

编写java类JoinByItemId

创建一个包com.github.ralgond.sparkjavaapi,在该包下创建一个名为JoinByItemId的类,该类内容如下:

package com.github.ralgond.sparkjavaapi;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;

import scala.Tuple2;

public class JoinByItemId {
	public static void main(String args[]) {
		String userFilePath = args[0];
		String itemFilePath = args[1];
		
		SparkConf conf = new SparkConf().setAppName("JoinByItemId Application");
		
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD<String> udata1 = sc.textFile(userFilePath);
		JavaPairRDD<String, String> udata2 = udata1.mapToPair(line -> { 
			String[] a = line.split("\\s+", 2);
			return new Tuple2<String, String>(a[0], a[1]);
		});
		JavaPairRDD<String, String> udata3 = udata2.mapToPair(t -> new Tuple2<String, String>(t._2, t._1));
		
		
		JavaRDD<String> idata1 = sc.textFile(itemFilePath);
		JavaPairRDD<String, String> idata2 = idata1.mapToPair(line -> { 
			String[] a = line.split("\\s+", 2);
			return new Tuple2<String, String>(a[0], a[1]);
		});
		
		JavaPairRDD<String, Tuple2<String, Optional<String>>> rddWithJoin = udata3.leftOuterJoin(idata2);
		
		JavaPairRDD<String, String> res = rddWithJoin.mapToPair(t -> {
			if (t._2()._2().isPresent()) {
				return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+t._2()._2().get());
			} else {
				return new Tuple2<String, String>(t._2()._1(), t._1() + "\t"+"NULL");
			}
		});
		
		List<Tuple2<String, String>> res2 = res.collect();
		
		System.out.println(res2);
	}
}

准备数据

进入spark的安装目录,在data文件夹里面创建文件夹spark-java-api\JoinByItemId,在{SPARK_HOME}\data\spark-java-api\JoinByItemId创建两个文件:user.txt和item.txt。其中

user.txt的内容为:

A	1
B	1
C	2
D	2
E	3

item.txt的内容为:

1	item1
2	item2
3	item3

编译并运行

通过mvn clean package编译出jar包spark-java-api-0.0.1-SNAPSHOT.jar。

到spark安装目录里,执行如下命令:

bin\spark-submit --class com.github.ralgond.sparkjavaapi.JoinByItemId D:\ralgond\spark-java-api\target\spark-java-api-0.0.1-SNAPSHOT.jar data\spark-java-api\JoinByItemId\user.txt data\spark-java-api\JoinByItemId\item.txt

便可以看到结果:
技术分享图片

Spark和Java API(三)Join

原文:https://www.cnblogs.com/ralgo/p/14872641.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!