이전 글에서 Kfaka Producer를 생성하여 Message를 발행해보았다.
이번 글에서는 Consumer를 생성하여 Message를 받아보도록 하겠다.
스타또~
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyKafka {
public static void main(String[] args){
//Producer 생성
Properties props_pd = new Properties();
props_pd.put("bootstrap.servers", "localhost:9092");
props_pd.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props_pd.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props_pd);
//읽어오는 순서를 확인하기 위해 번호를 붙여 여러개의 Message를 생성해보았다.
producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka1"));
producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka2"));
producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka3"));
producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka4"));
producer.close();
//Consumer 생성
Properties props_cs = new Properties();
props_cs.put("bootstrap.servers", "localhost:9092");
props_cs.put("group.id","consumer-song");
props_cs.put("enable.auto.commit","true");
//latest : Topic에 가장 먼저 들어온, Topic의 가장 마지막 Message부터 가져옵니다.(earliest는 반대)
props_cs.put("auto.offset.reset","latest");
props_cs.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props_cs.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props_cs);
consumer.subscribe(Arrays.asList("song-topic"));
//try{
//while(true){
ConsumerRecords<String,String> records = consumer.poll(100);
//records에 쌓여있는 Message들을 읽어옵니다.
for(ConsumerRecord<String,String> record: records){
System.out.println("-------------------------------------------------------------------------------");
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
//}
//}finally {
consumer.close();
//}
}
}
위 코드를 실행하면, Topic에는 "hello kafka" Message가 1번부터 차례대로 쌓이게 되고, Consumer에서는 1번부터 읽어가게 된다.
실행 결과 :
다음 글에서는 실시간으로 Kafka Mesaage를 보내는 듯이 연속해서 Topic에 Message를 보내고, 이를 실시간으로 수신하는 Consumer 코드를 짜보겠다.
그럼 20000
'Kafka' 카테고리의 다른 글
CMAK(Kafka Manager)를 설치해보자! (Window) _ JMX Metric (0) | 2020.07.19 |
---|---|
Kafka! 실시간으로 Message 처리하기! (Java) (0) | 2020.07.19 |
Kafka Producer를 생성하여 Message를 보내보자! (Java) (0) | 2020.07.19 |
IntelliJ Kafka 개발환경 구축하기! (0) | 2020.07.18 |
Kafka Topic, Producer, Consumer 생성하기 (0) | 2020.07.18 |