이전 글에서 Producer를 생성하여 Message를 발행하고, Consumer를 생성하여 Message를 수신하였다.
2020/07/19 - [Kafka] - Kafka Producer를 생성하여 Message를 보내보자! (Java)
2020/07/19 - [Kafka] - Kafka Consumer를 생성하여 Message를 받아보자! (Java)
이번 글에서는 실시간 Message를 처리하기 위한 간단한 프로그램을 짜보겠다.
실시간 느낌을 주기 위해서 반복문을 통해 계속해서 Message를 보내도록 했고, Consumer는 계속해서 Message를 읽어온다.
먼저, Producer 코드를 보자.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties props_pd = new Properties();
props_pd.put("bootstrap.servers", "localhost:9092");
props_pd.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props_pd.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props_pd);
int n = 1;
//반복문을 통해 100개의 Message를 보낸다.
while (n <= 100) {
String msg = "hello Kfaka!"+Integer.toString(n);
producer.send(new ProducerRecord<String, String>("song-topic", msg));
n++;
try {
//너무 빨라서 딜레이를 주었다. 안줘도 상관없다.
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
}
다음은 Consumer를 구현해보자.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args){
Properties props_cs = new Properties();
props_cs.put("bootstrap.servers", "localhost:9092");
props_cs.put("group.id","consumer-song");
props_cs.put("enable.auto.commit","true");
props_cs.put("auto.offset.reset","latest");
props_cs.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props_cs.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props_cs);
consumer.subscribe(Arrays.asList("song-topic"));
try{
//반복문을 통해 계속해서 Message를 읽어오려고 한다.
while(true){
ConsumerRecords<String,String> records = consumer.poll(1000);
for(ConsumerRecord<String,String> record: records){
System.out.println("-------------------------------------------------------------------------------");
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}finally {
consumer.close();
}
}
}
실행결과는 동영상으로...
핸드폰 카메라가 깨져서 화질구지지만 자세히 보면 보인다.
왼쪽이 Consumer, 오른쪽이 Producer다.
Producer가 Message를 보내기 시작하면, Consumer에서 즉시 읽어오는 것을 볼 수 있다.
100개의 Message를 모두 읽어오면, 그 이후로는 읽은 Message가 없으므로 읽어오지 않는다.
'Kafka' 카테고리의 다른 글
Kafka JMX를 이용하여 모니터링 하기!(테스트중) (0) | 2020.08.02 |
---|---|
CMAK(Kafka Manager)를 설치해보자! (Window) _ JMX Metric (0) | 2020.07.19 |
Kafka Consumer를 생성하여 Message를 받아보자! (Java) (0) | 2020.07.19 |
Kafka Producer를 생성하여 Message를 보내보자! (Java) (0) | 2020.07.19 |
IntelliJ Kafka 개발환경 구축하기! (0) | 2020.07.18 |