启动Spark
cd /usr/local/spark-2.4.0-bin-hadoop2.6/sbin
./start-all.sh
./start-history-server.sh hdfs://master:8020/spark-logs
关闭Spark
cd /usr/local/spark-2.4.0-bin-hadoop2.6/sbin
./stop-all.sh
./stop-history-server.sh hdfs://master:8020/spark-logs
Spark监控http://master:8080
执行一个单词计数的程序
sc.textFile("/user/root/a.txt").flatMap(x=>x.split(“ “)).map(x=>(x,1)).reduceByKey(_+_)
res0.collect()//打印出结果
?
ØCluster Manager
在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器。
ØWorker
从节点,负责控制计算节点,启动Executor或者Driver。在YARN模式中为NodeManager,负责计算节点的控制。
ØDriver
运行Application的main()函数并创建SparkContext。
ØExecutor
执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executor。
ØSparkContext
整个应用的上下文,控制应用的生命周期,主要包括:
RDD Objects(RDD DAG):构建DAG图
DAG Scheduler:根据作业(task)构建基于Stage的DAG,并提交Stage给TaskScheduler。
TaskScheduler:将任务(task)分发给Executor执行。
yarn-cluster运行流程
可以理解为一个提供了许多操作接口的数据集合;弹性存储(弹性分布式数据集)
Ø它是集群节点上的不可改变的、已分区的集合对象;
Ø通过并行转换的方式来创建如(map、filter、join等);
Ø失败自动重建;
Ø可以控制存储级别(内存、磁盘等)来进行重用;
Ø必须是可序列化的;在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能有大的下降但不会差于现在的MapReduce;
Ø对于丢失部分数据分区只需要根据它的lineage就可重新计算出来,而不需要做特定的checkpoint;
Ø从集合中创建RDD
parallelize():通过parallelize函数把一般数据结构加载为RDD
parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]
Ø从外部存储创建RDD
通过textFile直接加载数据文件为RDD
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
sc.defaultMinPartitions=min(sc.defaultParallelism,2)
展示内容student.collect()
map(func)
ØTransformation类型算子
Ømap: 将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目
使用map函数对RDD中每个元素进行倍数操作
flatMap(func)
ØTransformation类型算子
ØflatMap:对集合中的每个元素进行map操作再扁平化
案例:
mapPartitions(func)
ØTransformation类型算子
Ø和map功能类似,但是输入的元素是整个分区,即传入函数的操作对象是每个分区的Iterator集合,该操作不会导致Partitions数量的变化
取出每个分区中大于3的值
sortBy(f:(T) => K, ascending, numPartitions)
ØsortBy()可接受三个参数:
Øf:(T) => K:左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
Øascending:决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序,false为降序排序。
ØnumPartitions:该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等。
按照每个元素的第二个值进行降序排序
filter(func)
ØTransformation类型算子
Ø保留通过函数func,返回值为true的元素,组成新的RDD
Ø过滤掉data RDD中元素小于或等于2的元素
小案例:考试情况排名
union
合并List的一个方法
rdd1.union(rdd2)
distinct([numPartitions]))
ØTransformation类型算子
Ø针对RDD中重复的元素,只保留一个元素
intersection(otherDataset)
Ø找出两个RDD的共同元素,也就是找出两个RDD的交集
找出c_rdd1和c_rdd2中相同的元素
subtract (otherDataset)
Ø获取两个RDD之间的差集
找出rdd1与rdd2之间的差集
cartesian(otherDataset)
Ø笛卡尔积就是将两个集合的元素两两组合成一组
虽然大部分Spark的RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。
顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。
mapValues(func)=》只对value进行操作
Ø类似map,针对键值对(Key,Value)类型的数据中的Value进行map操作,而不对Key进行处理
groupByKey([numPartitions])
Ø按键分组,在(K,V)对组成的RDD上调用时,返回(K,Iterable<V>)对组成的新的RDD。
将rdd按键进行分组
reduceByKey(func, [numPartitions])
Ø将键值对RDD按键分组后进行聚合
Ø当在(K,V)类型的键值对组成的RDD上调用时,返回一个(K,V)类型键值对组成的新RDD
Ø其中新RDD每个键的值使用给定的reduce函数func进行聚合,该函数必须是(V,V)=>V类型
统计每个键出现的次数
join(otherDataset, [numPartitions])
Ø把键值对数据相同键的值整合起来
Ø其他连接有:leftOuterJoin, rightOuterJoin, and fullOuterJoin
join: 把键值对数据相同键的值整合起来
lookup(key: K)
ØAction类型算子
Ø作用于(K,V)类型的RDD上,返回指定K的所有V值
collect()
Ø返回RDD中所有的元素
ØcollectAsMap(): Map[K, V]
take(num)
saveAsTextFile(path: String)
repartition(numPartitions: Int)
Ø可以增加或减少此RDD中的并行级别。在内部,它使用shuffle重新分发数据。
Ø如果要减少此RDD中的分区数,请考虑使用coalesce,这样可以避免执行shuffle。
Øcoalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
saveAsSequenceFile(path)
键值对
Ø将数据集的元素作为Hadoop SequenceFile编写,只支持键值对RDD
sequenceFile[K, V](ath: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)
Ø读取序列化文件
SparkContext介绍
Ø任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,无论是Scala、Python、R程序,都是通过SparkContext对象的实例来创建RDD。
Ø因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。
ØSparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点
ØSparkConf、SparkContext、SQLContext、HiveContext都已经被封装在SparkSession当中
SparkSession.builder
.master("local") \设置运行模式
.appName("Word Count") \设置名称
.config("spark.some.config.option", "some-value") \[设置集群配置}(file://设置集群配置)
.enableHiveSupport() \ 支持读取Hive
.getOrCreate()
单词计数
传参
Ø点击“Run”→“Edit Configurations...”,弹出对话框如图所示
Ø如果程序有自定义的输入参数,继续点击“Program arguments”参数值设置
Ø将JAR包上传到Linux的/opt目录下
Ø将Windows本地的words.txt文件也上传到/opt目录下
Ø将/opt/words.txt上传到HDFS的/user/root下
spark-submit提交任务
spark-submit --masterurl**> *
*--deploy-mode *
--**conf *= *
... # other options
*--class *
* *
application-arguments
Ø--class:应用程序的入口点,指主程序。
Ø--master:指定要连接的集群URL。
Ø--deploy-mode:是否将驱动程序部署在工作节点(cluster)或本地作为外部客户端(client)。
Ø--conf:设置任意Spark配置属性,即允许使用key=value格式设置任意的SparkConf配置选项。
Øapplication-jar:包含应用程序和所有依赖关系的捆绑JAR的路径。
Øapplication-arguments:传递给主类的main方法的参数。
运行
spark-submit --master yarn --deploy-mode cluster --class demo.spark.WordCount /opt/word.jar /user/root/words.txt " " /user/root/word_count
•--master设置运行模式为yarn-cluster集群模式
•--class设置程序入口,然后设置JAR包路径,输入文件路径,输出文件路径,设置运行结果存储在HDFS
spark-submit常用的配置项
设置spark-submit提交单词计数程序时的环境配置,设置运行时所启用的资源
spark-submit --master spark://master:7077 --executor-memory 512m --executor-cores 2 --class demo.spark.WordCount /opt/word.jar /user/root/words.txt " " /user/root/word_ count2
ØSpark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新数据抽象,它为结构化和半结构化数据提供支持
Ø提供了DataFrame、DataSet的编程抽象
Ø可以充当分布式SQL查询引擎
DataFrame特点
Ø支持KB级到PB级得到数据处理
Ø支持多种数据格式和存储系统
Ø通过Spark SQL Catalyst优化器可以进行高效的代码生成与优化
Ø能够无缝集成Spark上的大数据处理工具
Ø提供了Python、Java、Scala、R等多种语言API
创建DataFrame
Ø从结构化数据文件创建DataFrame
Ø从外部数据库创建DataFrame
Ø从RDD创建DataFrame
Ø从Hive中的表创建DataFrame
Ø初始化一个SparkSession,名称为spark
?
Øspark.read.parquet(path: String):读取一个Parquet文件,返回一个DataFrame
?
Øspark.read.json(path: String):读取一个JSON文件,返回一个DataFrame
?
Øspark.read.csv(path: String):读取CSV文件,返回一个DataFrame
用option设置参数
?
Øspark.read.text(path: String):读取文本文件的数据,返回一个DataFrame,只有一个value字段
查看DataFrame
ØprintSchema:查看数据模式,打印出列的名称和类型
?
Øshow:查看数据
?
show():显示前20条记录
?
show(numRows:Int):显示numRows条
?
show(truncate:Boolean):是否最多只显示20个字符,默认为true
?
show(numRows:Int,truncate:Boolean):显示numRows条记录并设置过长字符串的显示格式
?
Øfirst/head/take/takeAsList:获取若干行数据
?
Øcollect/collectAsList:获取所有数据
ØSpark SQL可以从外部数据库(比如MySQL、Oracle等数据库)中创建DataFrame
Ø使用这种方式创建DataFrame需要通过JDBC连接或ODBC连接的方式访问数据库
Øspark.read.jdbc(url: String, table: String, properties: Properties)
Ø利用反射机制推断RDD模式,使用这种方式首先需要定义一个case class,因为只有case class才能被Spark隐式地转换为DataFrame。
Ø从原始RDD创建一个元素为Row类型的RDD
Ø用StructType创建一个和RDD中Row的结构相匹配的Schema
Ø通过SparkSession提供的createDataFrame方法将Schema应用到RDD上
Ø通过SQL查询语句
Øspark.read.table(tablename)
方法 | 描述 | 举例 |
---|---|---|
select(cols:string*) | 查询指定字段的数据信息 | df.select($"colA", $"colB" + 1) |
selectExpr(exprs: String*) | 查询指定字段的数据信息 | df.selectExpr("name","name as names","upper(name)","age+1") |
filter(conditionExpr: String) | 条件查询 | df.filter(“age>10”),df.where(df("age")>10) |
limit(n:Int) | 查询前n行记录 | limit(n: Int) |
distinct | 去重 | df.distinct |
orderBy(sortExprs: Column*) | 排序查询 | df.orderBy(desc("age")) |
sort(sortExprs: Column*) | 排序查询 | df.sort(df(“age”).desc),默认是asc |
方法 | 描述 | 举例 |
---|---|---|
groupBy(col1: String, cols: String*) | 分组查询 | df.groupBy(“deptno”).agg(max("age"), avg("salary")) |
join(right: DataFrame, joinExprs: Column, joinType: String) | 连接查询,关联的类型:inner, outer, left_outer, right_outer, leftsemi | df.join(ds,df("name")=ds("name") and df("age")=ds("age"),"outer") |
unionAll(other:Dataframe) | 合并两个DataFrame | df.unionAll(ds) |
withColumn(colName: String, col: Column) | 增加一列 | df.withColumn("aa",df("name")) |
drop(col: Column) | 删除某列 | df.drop("name") |
自定义udf-用于SQL查询
•sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)
有一个hive的视图
spark.sql()
自定义udf-用于DataFrame API
•import org.apache.spark.sql.functions._
•import org.apache.spark.sql.types._
•udf(f: AnyRef, dataType: DataType)
val df_hvie=saprk.sql("select * form tarn.people")
Spark DataFrame基础操作
(1)保存为文本文件
overwrite表示覆盖
Ødf.write.mode("overwrite").parquet(path: String)
df.repartition(1).write.mode("overwrite").parquet("/user/root/sparkdata/data_repartition")
Ødf.write.mode("overwrite"). json(path: String)
Ødf.write.mode("overwrite").option("header", "true").option("sep", ";"). csv(path: String)
Ødf.write.mode("overwrite").text(path: String) //只能有一个字段(用的非常少)
传入数据到mysql
Øjdbc(url: String, table: String, connectionProperties: Properties)
保存到hive
Ødf.write.mode("overwrite"). saveAsTable(tableName: String)
ØDataFrame的缺点:
1.编译时不能类型转化安全检查,运行时才能确定是否有问题
2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象
ØDataSet
ØDataSet是分布式的数据集合,DataSet提供了强类型支持,也是在RDD的每行数据加了类型约束。
ØDateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据。
ØDataFrame表示为DataSet[Row],即DataSet的子集。
ØDataSet是面向对象的编程接口,可以通过JVM的对象进行构建DataSet。
•createDataset【T](data: List[T])//两种方式
•createDataset 【 T](data: RDD[T])
•dataSet=dataFrame.as[强类型]
val ds4=spark.read.table("tarin.goodsorder").as[Person]
DataSet API
1. 机器学习概念
Ø机器学习就是让机器能像人一样有学习、理解、认识的能力。
Ø机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能。
Ø机器学习的过程就是通过计算机使算法模型利用输入数据的规律或以往经验进行学习,并对模型进行评估,评估的性能如果达到要求就拿这个模型来测试其他的数据,如果达不到要求就要调整算法来重新建立模型,再次进行评估,如此循环往复,最终获得满意的经验来处理其他的数据。
原文:https://www.cnblogs.com/Liguangyang/p/13530929.html