실행
{SPARK_HOME}bin/spark-shell --conf "spark.jars=/myhome/jars/delta-spark_2.12-3.1.0.jar" --conf "spark.jars=/myhome/jars/delta-hive-assembly_2.12-3.1.0.jar" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
new SparkSession.Builder()
.appName(commandLineOptions.className())
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.autoCompact.enabled", "true")
.config("spark.databricks.delta.autoCompact.minNumFiles", 100) //자동으로 작은 파일을 하나로 통합. 최소 개수 100개 이상일 때.
.config("spark.databricks.delta.optimize.maxFileSize", 134217728) //최적화를 통해 생성되는 큰 파일의 최대 사이즈. (128MB)
//.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")// 이건 마지막에 컴팩션 한번 해주려고 임의로.. 평상시는 ㄴㄴ
//.config("spark.databricks.delta.autoCompact.minNumFiles", 1) // 이건 마지막에 컴팩션 한번 해주려고 임의로.. 평상시는 ㄴㄴ
Optimize
Compaction
작은 파일들을 큰 파일로 만들어 읽는 범위를 줄일 수 있음
Compaction을 하면, 데이터 경로 아래 새로운 parquet파일이 생성되고, _deltalog_/ 아래 n.json 파일이 생성된다.
해당 파일에 전체 데이터에 대한 통계 정보(path, numRecords, min/maxValues 등)가 담긴다.
*optimize 한번에 하지말고 나눠 하자. 예를 들어 일별로..
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable) // For path-based tables
// For Hive metastore-based tables: val deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeCompaction()
// If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()
Vacuum
DeltaTable에서 더이상 참조되지 않거나 retention(default : 7days)이 지난 파일을 삭제한다.
retention은 7일 이상으로 하는 것을 권장한다.
자동적으로 삭제하지는 않는다.
_로 시작하는 파일은 무시한다.
로그파일은 지우지 않다. 로그 파일은 자동적으로 삭제된다. retention은 기본 30일이다.
compaction 후 json 파일에 삭제로 기록되었다고 vacuum을 통해 삭제되지는 않는다.
vacuum(0)으로 실행하면, 지금까지 삭제대상으로 체크된 모든 파일 제거 ("spark.databricks.delta.retentionDurationCheck.enabled", "false" 설정을 해야 vacuum(0)실행 가능. 그렇지 않으면 안전 기준 default 7일임)
DataSkipping
요약된 정보를 기반으로 필요한 파일만 읽음. 요약된 정보는 write 할 때 생성. 따로 설정할 필요는 없으나 Z-Ordering을 적용하는 것이 가장 좋은 성능을 보인다. long, string 타입은 min, max 등의 통계 정보 생성 비용이 크므로, 설정을 통해 수집 대상에서 제외할 수 있다.
Z-Ordering
카디널리티가 높은 열에 적용하면 관련성이 있는 정보를 같은 파일 내에 위치시킨다. 특정 열에 적용할 수 있고, 열을 추가할수록 효과는 떨어진다.
Multi-part checkpointing
DeltaLake는 주기적으로 자동으로 compaction을 진행한다. checkpoint를 통해 쿼리가 현재의 상태를 빠르게 가져올 수 있도록 한다. 기본적으로 하나의 파일로 쓰나 설정을 통해 나눠서 쓸 수 있고, 병렬로 읽어 성능을 향상시킬 수 있다.
Log Compactions
DeltaLake 3.0.0 부터 지원되며, log compaction 파일을 생성하고, 특정 범위의 커밋 액션이 포함된다.
Time Travel
이전 스냅샷을 조회하고자 할 때, time travel 기능을 사용할 수 있다.
참고 :
https://docs.delta.io/latest/optimizations-oss.html#language-scala
Optimizations — Delta Lake Documentation
Delta Lake provides optimizations that accelerate data lake operations. Optimize performance with file management To improve query speed, Delta Lake supports the ability to optimize the layout of data in storage. There are various ways to optimize the layo
docs.delta.io
https://docs.delta.io/latest/quick-start.html#set-up-apache-spark-with-delta-lake&language-scala
Quickstart — Delta Lake Documentation
Follow these instructions to set up Delta Lake with Spark. You can run the steps in this guide on your local machine in the following two ways: As mentioned in the official Apache Spark installation instructions here, make sure you have a valid Java versio
docs.delta.io