首页 > 其他 > 详细

spark

时间:2020-08-19 22:03:10      阅读:71      评论:0      收藏:0      [点我收藏+]

spark

技术分享图片

启动

 启动Spark
 cd /usr/local/spark-2.4.0-bin-hadoop2.6/sbin
 ./start-all.sh
 ./start-history-server.sh hdfs://master:8020/spark-logs
 关闭Spark
 cd /usr/local/spark-2.4.0-bin-hadoop2.6/sbin
 ./stop-all.sh
 ./stop-history-server.sh hdfs://master:8020/spark-logs

Spark监控http://master:8080

技术分享图片

执行一个单词计数的程序

 sc.textFile("/user/root/a.txt").flatMap(x=>x.split(“ “)).map(x=>(x,1)).reduceByKey(_+_)
 res0.collect()//打印出结果
 ?

spark架构

ØCluster Manager

在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器。

ØWorker

从节点,负责控制计算节点,启动Executor或者Driver。在YARN模式中为NodeManager,负责计算节点的控制。

ØDriver

运行Application的main()函数并创建SparkContext。

ØExecutor

执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executor。

ØSparkContext

整个应用的上下文,控制应用的生命周期,主要包括:

RDD Objects(RDD DAG):构建DAG图

DAG Scheduler:根据作业(task)构建基于Stage的DAG,并提交Stage给TaskScheduler。

TaskScheduler:将任务(task)分发给Executor执行。

技术分享图片

运行流程

技术分享图片

yarn-cluster运行流程

技术分享图片

RDD

可以理解为一个提供了许多操作接口的数据集合;弹性存储(弹性分布式数据集)

Ø它是集群节点上的不可改变的、已分区的集合对象;

Ø通过并行转换的方式来创建如(map、filter、join等);

Ø失败自动重建;

Ø可以控制存储级别(内存、磁盘等)来进行重用;

Ø必须是可序列化的;在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能有大的下降但不会差于现在的MapReduce;

Ø对于丢失部分数据分区只需要根据它的lineage就可重新计算出来,而不需要做特定的checkpoint;

创建一个RDD

Ø从集合中创建RDD

 parallelize():通过parallelize函数把一般数据结构加载为RDD
 parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T]

技术分享图片

Ø从外部存储创建RDD

 通过textFile直接加载数据文件为RDD
 textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

sc.defaultMinPartitions=min(sc.defaultParallelism,2)

技术分享图片

技术分享图片

展示内容student.collect()

RDD算子

map(func)

ØTransformation类型算子

Ømap: 将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目

技术分享图片

使用map函数对RDD中每个元素进行倍数操作

技术分享图片

技术分享图片

flatMap(func)

ØTransformation类型算子

ØflatMap:对集合中的每个元素进行map操作再扁平化

技术分享图片

案例:

技术分享图片

技术分享图片

mapPartitions(func)

ØTransformation类型算子

Ø和map功能类似,但是输入的元素是整个分区,即传入函数的操作对象是每个分区的Iterator集合,该操作不会导致Partitions数量的变化

取出每个分区中大于3的值

技术分享图片

sortBy(f:(T) => K, ascending, numPartitions)

ØsortBy()可接受三个参数:

Øf:(T) => K:左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。

Øascending:决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序,false为降序排序。

ØnumPartitions:该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等。

按照每个元素的第二个值进行降序排序

技术分享图片

filter(func)

ØTransformation类型算子

Ø保留通过函数func,返回值为true的元素,组成新的RDD

Ø过滤掉data RDD中元素小于或等于2的元素

技术分享图片

小案例:考试情况排名

技术分享图片

union

合并List的一个方法

rdd1.union(rdd2)

distinct([numPartitions]))

ØTransformation类型算子

Ø针对RDD中重复的元素,只保留一个元素

intersection(otherDataset)

Ø找出两个RDD的共同元素,也就是找出两个RDD的交集

找出c_rdd1和c_rdd2中相同的元素

技术分享图片

subtract (otherDataset)

Ø获取两个RDD之间的差集

找出rdd1与rdd2之间的差集

技术分享图片

cartesian(otherDataset)

Ø笛卡尔积就是将两个集合的元素两两组合成一组

键值对RDD

虽然大部分Spark的RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。

顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。

mapValues(func)=》只对value进行操作

 

Ø类似map,针对键值对(Key,Value)类型的数据中的Value进行map操作,而不对Key进行处理

技术分享图片

技术分享图片

groupByKey([numPartitions])

Ø按键分组,在(K,V)对组成的RDD上调用时,返回(K,Iterable<V>)对组成的新的RDD。

将rdd按键进行分组

技术分享图片

reduceByKey(func, [numPartitions])

Ø将键值对RDD按键分组后进行聚合

Ø当在(K,V)类型的键值对组成的RDD上调用时,返回一个(K,V)类型键值对组成的新RDD

Ø其中新RDD每个键的值使用给定的reduce函数func进行聚合,该函数必须是(V,V)=>V类型

统计每个键出现的次数

技术分享图片

join(otherDataset, [numPartitions])

Ø把键值对数据相同键的值整合起来

Ø其他连接有:leftOuterJoin, rightOuterJoin, and fullOuterJoin

技术分享图片

join: 把键值对数据相同键的值整合起来

技术分享图片

lookup(key: K)

ØAction类型算子

Ø作用于(K,V)类型的RDD上,返回指定K的所有V值

技术分享图片

collect()

Ø返回RDD中所有的元素

ØcollectAsMap(): Map[K, V]

技术分享图片

技术分享图片

take(num)

文件读取与存储

saveAsTextFile(path: String)

技术分享图片

技术分享图片

repartition(numPartitions: Int)

Ø可以增加或减少此RDD中的并行级别。在内部,它使用shuffle重新分发数据。

Ø如果要减少此RDD中的分区数,请考虑使用coalesce,这样可以避免执行shuffle。

Øcoalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

技术分享图片

技术分享图片

saveAsSequenceFile(path)

键值对

Ø将数据集的元素作为Hadoop SequenceFile编写,只支持键值对RDD

技术分享图片

技术分享图片

sequenceFile[K, V](ath: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)

Ø读取序列化文件

技术分享图片

spark环境

SparkContext介绍

Ø任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,无论是Scala、Python、R程序,都是通过SparkContext对象的实例来创建RDD。

Ø因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。


ØSparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点

ØSparkConf、SparkContext、SQLContext、HiveContext都已经被封装在SparkSession当中

SparkSession.builder

.master("local") \设置运行模式

.appName("Word Count") \设置名称

.config("spark.some.config.option", "some-value") \[设置集群配置}(file://设置集群配置)

.enableHiveSupport() \ 支持读取Hive

.getOrCreate()

单词计数

技术分享图片

传参

Ø点击“Run”→“Edit Configurations...”,弹出对话框如图所示

Ø如果程序有自定义的输入参数,继续点击“Program arguments”参数值设置

执行spark程序

Ø将JAR包上传到Linux的/opt目录下

Ø将Windows本地的words.txt文件也上传到/opt目录下

Ø将/opt/words.txt上传到HDFS的/user/root下

spark-submit提交任务

spark-submit --masterurl**> *

*--deploy-mode *

--**conf *= *

... # other options

*--class *

* *

application-arguments

spark-submit参数解释

Ø--class:应用程序的入口点,指主程序。

Ø--master:指定要连接的集群URL。

Ø--deploy-mode:是否将驱动程序部署在工作节点(cluster)或本地作为外部客户端(client)。

Ø--conf:设置任意Spark配置属性,即允许使用key=value格式设置任意的SparkConf配置选项。

Øapplication-jar:包含应用程序和所有依赖关系的捆绑JAR的路径。

Øapplication-arguments:传递给主类的main方法的参数。

运行

 spark-submit --master yarn --deploy-mode cluster --class demo.spark.WordCount /opt/word.jar /user/root/words.txt " " /user/root/word_count

•--master设置运行模式为yarn-cluster集群模式

•--class设置程序入口,然后设置JAR包路径,输入文件路径,输出文件路径,设置运行结果存储在HDFS

spark-submit常用的配置项

技术分享图片

设置spark-submit提交单词计数程序时的环境配置,设置运行时所启用的资源

 spark-submit --master spark://master:7077 --executor-memory 512m --executor-cores 2 --class demo.spark.WordCount /opt/word.jar /user/root/words.txt " " /user/root/word_ count2

sparkSQL

ØSpark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新数据抽象,它为结构化和半结构化数据提供支持

Ø提供了DataFrame、DataSet的编程抽象

Ø可以充当分布式SQL查询引擎

DataFrame特点

Ø支持KB级到PB级得到数据处理

Ø支持多种数据格式和存储系统

Ø通过Spark SQL Catalyst优化器可以进行高效的代码生成与优化

Ø能够无缝集成Spark上的大数据处理工具

Ø提供了Python、Java、Scala、R等多种语言API

创建DataFrame

Ø从结构化数据文件创建DataFrame

Ø从外部数据库创建DataFrame

Ø从RDD创建DataFrame

Ø从Hive中的表创建DataFrame

(1)从结构化数据文件创建DataFrame
 Ø初始化一个SparkSession,名称为spark
 ?
 Øspark.read.parquet(path: String):读取一个Parquet文件,返回一个DataFrame
 ?
 Øspark.read.json(path: String):读取一个JSON文件,返回一个DataFrame
 ?
 Øspark.read.csv(path: String):读取CSV文件,返回一个DataFrame
 用option设置参数
 ?
 Øspark.read.text(path: String):读取文本文件的数据,返回一个DataFrame,只有一个value字段

技术分享图片

查看DataFrame

 ØprintSchema:查看数据模式,打印出列的名称和类型
 ?
 Øshow:查看数据
 ?
 show():显示前20条记录
 ?
 show(numRows:Int):显示numRows条
 ?
 show(truncate:Boolean):是否最多只显示20个字符,默认为true
 ?
 show(numRows:Int,truncate:Boolean):显示numRows条记录并设置过长字符串的显示格式
 ?
 Øfirst/head/take/takeAsList:获取若干行数据
 ?
 Øcollect/collectAsList:获取所有数据
(2)从外部数据库创建DataFrame

ØSpark SQL可以从外部数据库(比如MySQL、Oracle等数据库)中创建DataFrame

Ø使用这种方式创建DataFrame需要通过JDBC连接或ODBC连接的方式访问数据库

Øspark.read.jdbc(url: String, table: String, properties: Properties)

技术分享图片

(3)从RDD创建DataFrame-方法1

Ø利用反射机制推断RDD模式,使用这种方式首先需要定义一个case class,因为只有case class才能被Spark隐式地转换为DataFrame。

技术分享图片

(3)从RDD创建DataFrame-方法2

Ø从原始RDD创建一个元素为Row类型的RDD

Ø用StructType创建一个和RDD中Row的结构相匹配的Schema

Ø通过SparkSession提供的createDataFrame方法将Schema应用到RDD上

技术分享图片

(4)从Hive中的表创建DataFrame

Ø通过SQL查询语句

Øspark.read.table(tablename)

技术分享图片

技术分享图片

技术分享图片

 

DataFrame常用API

方法描述举例
select(cols:string*) 查询指定字段的数据信息 df.select($"colA", $"colB" + 1)
selectExpr(exprs: String*) 查询指定字段的数据信息 df.selectExpr("name","name as names","upper(name)","age+1")
filter(conditionExpr: String) 条件查询 df.filter(“age>10”),df.where(df("age")>10)
limit(n:Int) 查询前n行记录 limit(n: Int)
distinct 去重 df.distinct
orderBy(sortExprs: Column*) 排序查询 df.orderBy(desc("age"))
sort(sortExprs: Column*) 排序查询 df.sort(df(“age”).desc),默认是asc
方法描述举例
groupBy(col1: String, cols: String*) 分组查询 df.groupBy(“deptno”).agg(max("age"), avg("salary"))
join(right: DataFrame, joinExprs: Column, joinType: String) 连接查询,关联的类型:inner, outer, left_outer, right_outer, leftsemi df.join(ds,df("name")=ds("name") and df("age")=ds("age"),"outer")
unionAll(other:Dataframe) 合并两个DataFrame df.unionAll(ds)
withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name"))
drop(col: Column) 删除某列 df.drop("name")

自定义udf-用于SQL查询

•sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)

技术分享图片

技术分享图片

 有一个hive的视图
 spark.sql()

 

自定义udf-用于DataFrame API

•import org.apache.spark.sql.functions._

•import org.apache.spark.sql.types._

•udf(f: AnyRef, dataType: DataType)

val df_hvie=saprk.sql("select * form tarn.people")

技术分享图片

技术分享图片

Spark DataFrame基础操作

保存

(1)保存为文本文件

overwrite表示覆盖

Ødf.write.mode("overwrite").parquet(path: String)

df.repartition(1).write.mode("overwrite").parquet("/user/root/sparkdata/data_repartition")

 

Ødf.write.mode("overwrite"). json(path: String)

Ødf.write.mode("overwrite").option("header", "true").option("sep", ";"). csv(path: String)

Ødf.write.mode("overwrite").text(path: String) //只能有一个字段(用的非常少)

技术分享图片

传入数据到mysql

Øjdbc(url: String, table: String, connectionProperties: Properties)

技术分享图片

保存到hive

Ødf.write.mode("overwrite"). saveAsTable(tableName: String)

技术分享图片

dataset

ØDataFrame的缺点:

1.编译时不能类型转化安全检查,运行时才能确定是否有问题

2.对于对象支持不友好,rdd内部数据直接以java对象存储,dataframe内存存储的是row对象而不能是自定义对象

ØDataSet

ØDataSet是分布式的数据集合,DataSet提供了强类型支持,也是在RDD的每行数据加了类型约束。

ØDateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据。

ØDataFrame表示为DataSet[Row],即DataSet的子集。

ØDataSet是面向对象的编程接口,可以通过JVM的对象进行构建DataSet。

(1)从集合创建DataSet

•createDataset【T](data: List[T])//两种方式

技术分享图片

(2)从rdd创建DataSet

•createDataset 【 T](data: RDD[T])

技术分享图片

(3)从DataFrame创建DataSet

•dataSet=dataFrame.as[强类型]

 val ds4=spark.read.table("tarin.goodsorder").as[Person]

技术分享图片

DataSet API

技术分享图片

 

技术分享图片

技术分享图片

机器学习

1. 机器学习概念

Ø机器学习就是让机器能像人一样有学习、理解、认识的能力。

Ø机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能。

Ø机器学习的过程就是通过计算机使算法模型利用输入数据的规律或以往经验进行学习,并对模型进行评估,评估的性能如果达到要求就拿这个模型来测试其他的数据,如果达不到要求就要调整算法来重新建立模型,再次进行评估,如此循环往复,最终获得满意的经验来处理其他的数据。

 

spark

原文:https://www.cnblogs.com/Liguangyang/p/13530929.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!