Spark Structured Streaming 서비스의 Kafka LAG 모니터링을 위해 kafka-offset-committer 적용
Kafka Offsets Committer
Kafka를 DataSource로 사용하는 Structured streaming query 의 한 배치 작업에서 쿼리가 처리되면 offsets을 commit할 수 있도록 돕는다.
특정 cousumer 그룹의 파티션별 end offsets 추출하여 KafkaConsumer의 commitSync API를 호출하여 강제 commit한다.
Kafka Offsets Committer 적용
1. 의존성 추가 : spark-sql-kafka-offset-committer_2.12
- https://mvnrepository.com/artifact/net.heartsavior.spark/spark-sql-kafka-offset-committer_2.12/0.4.0-spark-3.0
2. kafka.consumer.commit.groupid 지정
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic-test")
.option("startingOffsets", "latest")
.option("kafka.consumer.commit.groupid", "groupId1")
.load()
3. StreamingQueryListener 추가
val listener = new KafkaOffsetCommitterListener()
spark.streams.addListener(listener)
KafkaManager 에서 확인해보면 Lag 잘 보임
'IT 잡동사니' 카테고리의 다른 글
MongoDB logRotate (crontab or logRotate 적용) (0) | 2024.11.15 |
---|---|
[Docker] Ubuntu 도커 설치 / 도커 삭제 / 버전 업그레이드 / 도커 명령어까지 쉽다쉬워 (2) | 2024.11.05 |
Zeppelin multi instance (H/A) 구성 (0) | 2024.11.05 |
MongoDB - 명령어 모음집 (0) | 2024.07.02 |
Ubuntu OpenLDAP 설치 (0) | 2024.07.01 |