<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sparkgi</groupId> <artifactId>sparkai</artifactId> <version>1.0</version> <properties> <scala.version>2.11</scala.version> <scala.compat.version>2.11</scala.compat.version> <spark.version>2.2.0</spark.version> </properties> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-graphx_${scala.version}</artifactId>--> <!--<version>${spark.version}</version>--> <!--</dependency>--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> </dependencies> </project>
package scala.com.atguigu.bigdata import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.log4j.{Level, Logger} object wordCount { //定义主方法 def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR) //创建SparkConf对象 //如果Master是local,表示运行在本地模式上,即可以在开发工具中直接运行 //如果要提交到集群中运行,不需要设置Master //集群模式 //val conf = new SparkConf().setAppName("My Scala Word Count") //本地模式 val conf = new SparkConf().setAppName("My Scala Word Count").setMaster("local") //创建SparkContext对象 val sc = new SparkContext(conf) val result = sc.textFile("E:/software/qiao/test.txt") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) result.foreach(println) //集群模式 // val result = sc.textFile(args(0)) // .flatMap(_.split(" ")) // .map((_,1)) // .reduceByKey(_+_) // .saveAsTextFile(args(1)) sc.stop() } }
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object wordCount2 { def main(args: Array[String]): Unit = { //本地模式 val conf = new SparkConf().setAppName("My Scala Word Count").setMaster("local") //创建spark上下文对象 val sc = new SparkContext(conf) //路径查找位置默认从当前的部署环境重查找 //如果需要从本地查找,file val lines:RDD[String] =sc.textFile("E:/software/qiao/test.txt") //将一行行数据分割成一个个单词 val words:RDD[String] = lines.flatMap(_.split(" ")) //为了统计方便,将单词数据进行结构转换 val wordToOne:RDD[(String,Int)]= words.map((_,1)) //对转化后的数据结构进行分组整合 val wordToSum:RDD[(String,Int)] = wordToOne.reduceByKey(_+_) //将统计结果采集后打印到控制台 val result:Array[(String,Int)]=wordToSum.collect() result.foreach(println) //停止 sc.stop() } }
原文:https://www.cnblogs.com/hapyygril/p/13689670.html