Kafka

Kafka Consumer를 생성하여 Message를 받아보자! (Java)

케키키케 2020. 7. 19. 15:38

이전 글에서 Kfaka Producer를 생성하여 Message를 발행해보았다.

이번 글에서는 Consumer를 생성하여 Message를 받아보도록 하겠다.

 

스타또~

 

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MyKafka {
    public static void main(String[] args){

		//Producer 생성
        Properties props_pd = new Properties();
        props_pd.put("bootstrap.servers", "localhost:9092");
        props_pd.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props_pd.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props_pd);
        //읽어오는 순서를 확인하기 위해 번호를 붙여 여러개의 Message를 생성해보았다.
        producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka1"));
        producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka2"));
        producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka3"));
        producer.send(new ProducerRecord<String, String>("song-topic", "hello kafka4"));
        producer.close();


		//Consumer 생성
        Properties props_cs = new Properties();
        props_cs.put("bootstrap.servers", "localhost:9092");
        props_cs.put("group.id","consumer-song");
        props_cs.put("enable.auto.commit","true");
		//latest : Topic에 가장 먼저 들어온, Topic의 가장 마지막 Message부터 가져옵니다.(earliest는 반대)
        props_cs.put("auto.offset.reset","latest");
        props_cs.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props_cs.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props_cs);
        consumer.subscribe(Arrays.asList("song-topic"));
        //try{
            //while(true){
                ConsumerRecords<String,String> records = consumer.poll(100);
                //records에 쌓여있는 Message들을 읽어옵니다.
                for(ConsumerRecord<String,String> record: records){
                    System.out.println("-------------------------------------------------------------------------------");
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            //}
        //}finally {
            consumer.close();
        //}

    }
}

 

위 코드를 실행하면, Topic에는 "hello kafka" Message가 1번부터 차례대로 쌓이게 되고, Consumer에서는 1번부터 읽어가게 된다.

 

실행 결과 :

 

다음 글에서는 실시간으로 Kafka Mesaage를 보내는 듯이 연속해서 Topic에 Message를 보내고, 이를 실시간으로 수신하는 Consumer 코드를 짜보겠다.

 

그럼 20000