目录
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.5.0</version>
</dependency>
spark-shell --packages io.delta:delta-core_2.11:0.5.0,org.apache.hadoop:hadoop-aws:2.7.7 --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore --conf spark.sql.hive.metastore.version=1.2.1 --conf spark.hadoop.fs.s3a.access.key=<access-key> --conf spark.hadoop.fs.s3a.secret.key=<secret-key>
//创建 delta 表和分区表
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
data.write.format("delta").partitionBy("date").save("/tmp/delta-table")
//读delta表
//第一种方式
val df = spark.read.format("delta").load("/tmp/delta-table")
//第二种方式
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
//覆盖delta表数据(mode换成append就是插入数据)
//注意:delta lake会记忆表的schema,默认情况下,overwrite只会更改表数据,不会更改表结构
//注意:可通过.option("mergeSchema", "true"),将df中有而schema没有的字段添加到schema中,也就是add column
//注意:在overwrite时,可通过df.write.option("overwriteSchema", "true")来替换原有的schema
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
//更新delta表数据
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100"))
)
// Upsert (merge) delta表数据
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched(col("date") > "2019-01-01")
.update(Map("id" -> col("newData.id")))
.whenMatched
.delete()
.whenNotMatched
.insert(Map("id" -> col("newData.id")))
.execute()
//读写流式数据到delta表(可以边写边读)
//有append和complete两种输出模式,complete和overwrite意思一样
val streamingDf = spark.readStream.format("delta").load("/delta/events")
val stream = streamingDf.select("value").as("id").writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.outputMode("complete").format("console").start()
//删除delta表数据(只是添加墓碑标记,不是物理删除)
deltaTable.delete("id % 2 == 0")
deltaTable.delete(condition = expr("id % 2 == 0"))
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete(col("date") < "2017-01-01")
//根据时间戳或者版本号查看历史数据(time travel)
val timestamp_string = "2019-01-01"
val version = "0"
val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
//查看数据的历史版本
deltaTable.history().show()
spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()
//只保留最新的数据(vacuum是用来清理磁盘上的历史数据)
deltaTable.vacuum(0)
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 24 HOURS”)
merge是delta lake的重要操作,它实现了upsert和delete功能。(示例详见基础表操作)
delta lake以增量写文件的方式支持数据的更新和删除。
delta lake对schema的验证很严格,但同时也支持schema更改。
事务日志是delta lake的核心,它记录了delta表相关的所有commit操作。
delta_table_path/
|-- _delta_log/
|--|-- 00000000000000000000.json
|--|-- 00000000000000000001.json
|--|-- 00000000000000000002.json
|--|-- 00000000000000000003.json
|--|-- 00000000000000000004.json
|--|-- 00000000000000000005.json
|--|-- 00000000000000000006.json
|--|-- 00000000000000000007.json
|--|-- 00000000000000000008.json
|--|-- 00000000000000000009.json
|--|-- 00000000000000000010.json
|--|-- 00000000000000000010.checkpoint.parquet
|--|-- 00000000000000000011.json
|--|-- 00000000000000000012.json
|--|-- _last_checkpoint
|-- part-00000-1339ec93-7d47-4ef7-b167-1e5aaa8cd75d-c000.snappy.parquet
|-- part-00000-10a95e81-d64c-40ff-9143-25e998aadcc5-c000.snappy.parquet
|-- part-00000-22f8124e-d2dd-4804-9037-b7a780f70a08-c000.snappy.parquet
|-- part-00000-7866ec4b-b955-4d86-b08c-58cfc71bc1ea-c000.snappy.parquet
|-- part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet
Q: 谁来合并json日志?
A: OptimisticTransaction里commit时会调用postCommit()函数,这里会检查日志的版本是否能整除checkpointInterval,如果能则调用deltaLog.checkpoint()函数生成新的checkpoint文件。(详见源码分析)
Q: 生成历史版本的快照是从事务日志里一点点计算得来的?
A: 是的,但无需计算所有日志,只需要计算checkpoint和json文件。可以通过_last_checkpoint直接定位到最新的checkpoint,checkpoint只是单纯的合并日志信息,减少读取文件的数量,并不改变内容。
Q: 最新版本的数据不会删除,删除历史版本后再想用时间穿梭,就得根据commit日志重新计算?
A: 旧版本的数据删除后,就不能用时间穿梭了,时间穿梭只能用于已存在的版本数据。
原文:https://www.cnblogs.com/kehanc/p/12082988.html