首页 > 其他 > 详细

pyspark 日常整理

时间:2019-12-16 20:31:14      阅读:77      评论:0      收藏:0      [点我收藏+]

1  联表

  df1.join(df2,连接条件,连接方式)

  如:df1.join(df2,[df1.a==df2.a], "inner").show()

  连接方式:字符串类型, 如 "left"  , 常用的有:inner, cross, outer, full, full_outer, left, left_outer, right, right_outer

  连接条件: df1["a"] == df2["a"] 或 "a" 或 df1.a == df2.a , 如有多个条件的情况 如,[df1["a"] == df2["a"] ,df1["b"] == df2["b"] ] 或  (df.a > 1) & (df.b > 1)

  需要注意的:

  如果使用 "a" 进行连接,则会自动合并相同字段,只输入一个。如  df1.join(df2,"a","left") 只输出df1的 a字段,df2 的 a 字段是去掉了。

2 udf使用

  需添加引用

  from pyspark.sql.functions import udf
  from pyspark.sql import functions as F

  有两种方式:

  第一种

  def get_tablename(a):

    return "name"

  get_tablename_udf = F.udf(get_tablename)

  第二种

  @udf

  def get_tablename_udf (a):

    return "name"

  

  两种方式的调用是一样的

  df.withColumn("tablename", get_tablename_udf (df[a"]))

3  分组

  使用groupBy方法

  单个字段:df.groupBy("a") 或 df.groupBy(df.a)

  多个字段:df.groupBy([df.a, df.b]) 或 df.groupBy(["a", "b"])

  需要注意的:

  groupBy方法后面 一定要跟字段输出方法,如:agg()、select()等

4  查询条件

  使用 filter() 或 where() ,两者一样的。

  单条件: df.filter(df.a > 1) 或 df.filter("a > 1")

  多条件:df.filter("a > 1 and b > 0 ")  或 df.filter((df.a > 1) & (df.b ==0))

5  替换null值

  使用 fillna() 或 fill()方法

  df.fillna({"a":0, "b":""})

  df.na.fill({"a":0, "b":""})

6  排序

  使用 orderBy() 或 sort()方法

  df.orderBy(df.a.desc())

  df.orderBy(desc("age"), asc("name"))

       df.orderBy(["age", "name"], ascending=[0, 1])

  df.orderBy(["age", "name"], ascending=False)

  需要注意的:

  ascending 默认为True 升序, False 降序

7  新增列

  使用 withColumn() 或 alias()方法

  df.withColumn("b",F.lit(999))

  df.withColumn("b",df.a)

  df.withColumn("b",df.a).withColumn("m","m1")

  df.agg(F.lit(ggg).alias("b"))

  df.select(F.lit(ggg).alias("b"))

  需要注意的:

  withColumn方法会覆盖df里面原有的同名的列

8  重命名列名

  使用 withColumnRenamed() 方法 

  df.withColumnRenamed("a","a1").withColumnRenamed("m","m1") 

  需要注意的点:

  确定要重命名的列在df里面存在

9  创建新的DataFrame

  使用createDataFrame()方法

  spark.createDataFrame(数据集, 列集合)  例如:spark.createDataFrame([(5, "hello")], [‘a‘, ‘b‘])

  需要注意的:

   数据集和列集合 个数要一致

   spark为 SparkSession 对象, 例如:spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()

10  合并结果集

  使用union() 或 unionAll() 方法

  df.union(df1)

  需要注意的:

  这两个方法都不会主动消除重复项的,如需要,在后面跟distinct() 如:df.union(df1).distinct()

  这两个方法都是按照数据列的摆放顺序进行合并,而不是根据列名

  两个结果集的列 数量要保证一样大小

11  判断是否NULL值 

  使用isNull()方法 或 sql语句

  df.where(df["a"].isNull())

  df.where("a is null")

12  在计算条件中加入判断

  使用when() 方法

  df.select(when(df.age == 2, 1).alias("age")) 

  age列的值:当满足when条件,则输出1 ,否则,输出NULL 

  多个条件 :when((df.age == 2) & (df.name == ‘"name") , 1)

  

pyspark 日常整理

原文:https://www.cnblogs.com/xiaonanmu/p/12049868.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!