IT 잡동사니

Spark Structured Streaming Service LAG 모니터링

케키키케 2024. 11. 15. 13:46




Spark Structured Streaming 서비스의 Kafka LAG 모니터링을 위해 kafka-offset-committer 적용

Kafka Offsets Committer
Kafka를 DataSource로 사용하는 Structured streaming query 의 한 배치 작업에서 쿼리가 처리되면 offsets을 commit할 수 있도록 돕는다.
특정 cousumer 그룹의 파티션별  end offsets 추출하여 KafkaConsumer의 commitSync API를 호출하여 강제 commit한다.


Kafka Offsets Committer 적용
1. 의존성 추가 : spark-sql-kafka-offset-committer_2.12
https://mvnrepository.com/artifact/net.heartsavior.spark/spark-sql-kafka-offset-committer_2.12/0.4.0-spark-3.0
2. kafka.consumer.commit.groupid 지정

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic-test")
  .option("startingOffsets", "latest")
  .option("kafka.consumer.commit.groupid", "groupId1")
  .load()

3. StreamingQueryListener 추가
val listener = new KafkaOffsetCommitterListener()
spark.streams.addListener(listener)


KafkaManager 에서 확인해보면 Lag 잘 보임