<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.blb</groupId> <artifactId>idea-scala</artifactId> <version>1.0-SNAPSHOT</version> <properties> <scala.version>2.11.8</scala.version> <spark.version>2.4.4</spark.version> <java.version>1.8</java.version> </properties> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib-local_2.11</artifactId> <version>${spark.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib-local_2.11</artifactId> <version>${spark.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <classifier>dist</classifier> <appendAssemblyId>true</appendAssemblyId> <descriptorRefs> <descriptor>jar-with-dependencies</descriptor> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <recompileMode>incremental</recompileMode> <useZincServer>true</useZincServer> <args> <arg>-unchecked</arg> <arg>-deprecation</arg> <arg>-feature</arg> </args> <jvmArgs> <jvmArg>-Xms1024m</jvmArg> <jvmArg>-Xmx1024m</jvmArg> </jvmArgs> <javacArgs> <javacArg>-source</javacArg> <javacArg>${java.version}</javacArg> <javacArg>-target</javacArg> <javacArg>${java.version}</javacArg> <javacArg>-Xlint:all,-serial,-path</javacArg> </javacArgs> </configuration> </plugin> <plugin> <groupId>org.antlr</groupId> <artifactId>antlr4-maven-plugin</artifactId> <version>4.3</version> <executions> <execution> <id>antlr</id> <goals> <goal>antlr4</goal> </goals> <phase>none</phase> </execution> </executions> <configuration> <outputDirectory>src/test/java</outputDirectory> <listener>true</listener> <treatWarningsAsErrors>true</treatWarningsAsErrors> </configuration> </plugin> </plugins> </build> </project>
package com.blb.WordCount
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount {
def main(args: Array[String]): Unit = {
//1、创建Sparkconf1并设置App名称
val conf = new SparkConf().setAppName("WordCount")
//2、创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//设置日志的输出级别
sc.setLogLevel("warn")
//3、去取本地数据文件
val data:RDD[String] = sc.textFile(args(0))
//4、切分每一行,获取所有的的单词
val words:RDD[String] = data.flatMap(_.split(" "))
//5、每个单词记为1
val wordandOne:RDD[(String,Int)] = words.map((_,1))
//6、相同单词出现的1累加
val result:RDD[(String,Int)] = wordandOne.reduceByKey(_+_)
//按照单词出现的次数降序排列,默认是第二个参数是true是升序,改为false就是降序
val soft:RDD[(String,Int)] = result.sortBy(_._2,false)
//7.保存结果
soft.saveAsTextFile(args(1))
//8、关闭sc
sc.stop()
}
}
1.进入project structure
2.选择artifacts
1.选择spark程序主类 2.JAR files from library ->copy to the output directory and link via manifest 只会打包自己写的代码 不会将依赖全部打包
1.将jar包上传到linux本地 或者hdfs 集群
推荐使用hadoop作为输入、输入的位置。可以减少很多不必要的bug
2.创建测试文本
3.使用spark-submit 指令运行
普通模式下,提交任务 就是已经知道了活着的master的地址。 bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 \ --executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.4.4.jar \ 10 //几何采样,参数 2.高可靠模式下提交任务 在高可用模式下,因为涉及到多个Master,所以对于应用程序的提交就有了一点变化,因为应用程序需要知道当前的Master的IP地址和端口。这种HA方案处理这种情况很简单,只需要在SparkContext列表就可以了。
例如:spark://master:port1,slave1:port2,slave2:port2。应用程序就会轮询列表,找到活着的Master. bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077,slave1:7077,slave2:7077 \ --executor-memory 1G --total-executor-cores 2examples/jars/spark-examples_2.11-2.4.4.jar \ 10
$ ./bin/spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> --conf <key>=<value> ... # other options <application-jar> [application-arguments] 参数说明: -- class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi) -- master spark://master:7077 指定Master的地址 -- deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)* -- conf 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value” application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar application-arguments: 传给main()方法的参数 -- executor-memory 1G 指定每个executor可用内存为1G -- total-executor-cores 2 指定每个executor使用的cup核数为2个
wordcount
spark-submit --class com.blb.WordCount.WordCount --master spark://hdp01:7077 --executor-memory 1G --total-executor-cores 2 ideascala.jar /ky.txt /out03/
原文:https://www.cnblogs.com/phy2020/p/12751594.html