Spark

Iceberg 시작하기 - 설치/SparkStreaming/Hive연동

케키키케 2024. 7. 1. 23:57

환경

Spark-3.5.1

Hive-3.12

Iceberg-1.5.0 (https://iceberg.apache.org/multi-engine-support/#apache-spark)

호환 버전 확인 : https://iceberg.apache.org/multi-engine-support/#current-engine-version-lifecycle-status

 

Multi-Engine Support - Apache Iceberg

Multi-Engine Support Apache Iceberg is an open standard for huge analytic tables that can be used by any processing engine. The community continuously improves Iceberg core library components to enable integrations with different compute engines that power

iceberg.apache.org

 

 

Hive - Iceberg 활성화

  • {HIVE_HOME}/ext_lib/에 iceberg-hive-runtime-1.5.0.jar 추가 (다른 디렉토리도 상관없음. 하나 만들어서 넣어두면 됨)
  • {HIVE_HOME}/conf/hive-site.xml 에 iceberg.engine.hive.enabled=true 추가
  • Hiveserver2 재실행
    • nohup ${HIVE_HOME}/bin/hiveserver2 1 > ${HIVE_HOME}/logs/hiveserver2.log 2 > ${HIVE_HOME}/logs/ hiveserver2.err &
  • {HIVE_HOME}/bin/hive  //혹은 beeline으로 접속 (!connect  jdbc:hive2://localhost:10000)
  • add jar {HIVE_HOME}/ext_lib/iceberg-hive-runtime-1.5.0.jar; // hive-site.xml 에 aux jar path를 등록하면 add jar는 하지 않아도 됨. (aux jar 등록 시에 여러개를 등록하는 경우, 띄어쓰기 없이 등록해야 함. ex) 123.jar,456.jar

hive-site.xml 에 아래 설정 추가

<property>
	<name>iceberg.engine.hive.enabled</name>
	<value>true</value>
</property>
<property>
 	<name>hive.aux.jars.path</name>
 	<value>{HIVE_HOME}/ext_lib/iceberg-hive-runtime-1.5.0.jar</value>
 </property>

 

 

Spark  실행

참고 : https://iceberg.apache.org/spark-quickstart/#adding-a-catalog

 

Spark and Iceberg Quickstart - Apache Iceberg

Spark Spark and Iceberg Quickstart This guide will get you up and running with an Iceberg and Spark environment, including sample code to highlight some powerful features. You can learn more about Iceberg's Spark runtime by checking out the Spark section.

iceberg.apache.org


packages로 설치하면 좋겠지만 불가능한 경우, {SPARK_HOME}/jars에 jar를 옮겨두고, jars 경로를 추가하여 실행

 

1. Spark-shell 실행

{SPARK_HOME}/bin/spark-shell --conf spark.jars={SPARK_HOME}/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/hive/warehouse

 

 

 

테스트를 위한 테이블 생성 및 데이터 적재

테이블 생성

Spark Structured Streaming 적재 시에는 Iceberg 테이블이 기 생성되어 있어야 하기 때문에 Iceberg 테이블을 먼저 생성

//Spark-shell
spark.sql("CREATE TABLE test.ice_table(uid bigint COMMENT 'unique id', no bigint,  name string) USING iceberg PARTITIONED BY (no);")

https://iceberg.apache.org/docs/latest/spark-ddl/#create-table

 

*테이블 생성 후 Hive desc formatted 확인. Storage Information을 확인할 것!

SerDe Library : org.apache.iceberg.mr.hive.HiveIcebergSerDe

InputFormat : org.apache.iceberg.mr.hive.HiveIcebergInputFormat

OutputFormat : org.apache.iceber.mr.hive.HiveIcebergOutputFormat

 

아래와 같이 나오면 Iceberg 테이블 데이터가 제대로 조회되지 않는다. 

SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:org.apache.hadoop.mapred.FileInputFormat
OutputFormat:org.apache.hadoop.mapred.FileOutputFormat

 

iceberg.engine.hive.enabled=true를 hive-site.xml에 추가했는지 확인하고,

추가되어 있는데도 Iceberg serde와 format이 적용되지 않는 경우, 아래와 같이 테이블을 생성하여 이슈를 해결했다.

//Spark-shell
spark.sql("CREATE TABLE test.ice_table(uid bigint COMMENT 'unique id', no bigint,  name string) USING iceberg PARTITIONED BY (no) TBLPROPERTIES('engine.hive.enabled' = 'true');")

 

 

Row 적재

//Spark-shell
spark.sql("INSERT INTO test.ice_table VALUES (1,1, 'a'), (2,2, 'b'), (3,3, 'c');");

//Spark-sql
INSERT INTO test.ice_table VALUES (1,1, 'a'), (2,2, 'b'), (3,3, 'c');

 

 

스트리밍 적재

//Spark-shell

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
import org.apache.spark.sql.types._
   
val SERVERS = "Kafka서버:9092"
val checkpointLocation = "location지정.작업별로 다르게 지정될 수 있도록"
val sparkSession = SparkSession.builder.appName("iceberg-write-test").getOrCreate()
   
val loadData = sparkSession.readStream.format("kafka").option("maxOffsetsPerTrigger", "10000").option("kafka.bootstrap.servers", SERVERS).option("subscribe", "sample.topic").option("startingOffsets", "earliest").load()
   
val msg = loadData.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), new StructType().add("msg", StringType)).as("data")).withColumn("tmp", split(col("data.msg"), ","))
   
val data = msg.select(col("tmp").getItem(1).as("data1").cast(StringType), col("tmp").getItem(2).as("data2").cast(LongType), col("tmp").getItem(3).as("data3").cast(LongType)).createTempView("data")
   
val result = spark.sql("SELECT data1, data2, data3 FROM data")
   
result.writeStream.format("iceberg").outputMode("append").partitionBy("data1").trigger(Trigger.ProcessingTime("10 seconds")).option("fanout-enabled", "true").option("path", "test.ice_table1").option("checkpointLocation", checkpointLocation).start()

maven에서 jar를 못찾아서, libs 에 올려서 테스트 진행

dependencies { compile files('libs/iceberg-spark-runtime-3.5_2.12-1.5.0.jar') }

 

 

*fanout-enabled

insert 시에 파티션 컬럼이 정렬되어 있지 않으면 오류 발생

java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
 

batch의 경우 정렬이 가능하고 큰 작업이 아닐 수 있지만,  streaming의 경우 정렬이 어렵고 무거운 작업이 된다.

때문에 fanout-enabled 설정을 true로 바꿔주어야 한다. 

그러면 파일을 닫지 않아 위의 오류를 피할 수 있다 

Fanout : Fanout writer opens the files per partition value and doesn't close these files till the write task finishes. Avoid using the fanout writer for batch writing, as explicit sort against output rows is cheap for batch workloads.

https://iceberg.apache.org/docs/latest/spark-structured-streaming/#partitioned-table

 

 

Hive 조회

SELECT * FROM default.iceberg_table LIMIT 10;


DESC FORMATTED default.iceberg_table;
// External 테이블을 따로 지정하여 생성하지 않았지만 External 테이블로 생성되었음을 확인할 수 있다.
// 일반적인 Hive Direct 테이블의 경우, col_name 밑에 partition 정보가 있지만 icetable에는 없으며, default-partition-spec에 partition 정보가 포함되어 있다.

 

 

Snapshot Rollback

 

Spark-shell을 통해 snapshot id 조회 (Spark Version 3.2 이하)

//snapshot_id 조회
spark.read.format("iceberg").load("test.ice_table1.history").show()

//rollback
spark.sql("call system.rollback_to_snapshot('test.ice_table1', 123456789);

참고 : https://iceberg.apache.org/docs/latest/spark-procedures/#rollback_to_snapshot

 

Procedures - Apache Iceberg

Spark Procedures To use Iceberg in Spark, first configure Spark catalogs. Stored procedures are only available when using Iceberg SQL extensions in Spark 3. Usage Procedures can be used from any configured Iceberg catalog with CALL. All procedures are in t

iceberg.apache.org