Kafka

Kafka! 실시간으로 Message 처리하기! (Java)

케키키케 2020. 7. 19. 16:26

이전 글에서 Producer를 생성하여 Message를 발행하고, Consumer를 생성하여 Message를 수신하였다.

 

2020/07/19 - [Kafka] - Kafka Producer를 생성하여 Message를 보내보자! (Java)

 

Kafka Producer를 생성하여 Message를 보내보자! (Java)

먼저, 지난 글에서 IntelliJ에 Kafka 개발 환경을 구축했다. 오늘은 Kafka Producer를 생성하여 새로운 Message를 보내보겠다. 이제 새로운 Class를 만들어보자. 나는 MyKafkaProducer Class를 생성했다. 다음으..

bachong.tistory.com

 

 

2020/07/19 - [Kafka] - Kafka Consumer를 생성하여 Message를 받아보자! (Java)

 

 

 

 

이번 글에서는 실시간 Message를 처리하기 위한 간단한 프로그램을 짜보겠다.

실시간 느낌을 주기 위해서 반복문을 통해 계속해서 Message를 보내도록 했고, Consumer는 계속해서 Message를 읽어온다.

 

 

 

 

먼저, Producer 코드를 보자.

 

 

 

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {

        Properties props_pd = new Properties();
        props_pd.put("bootstrap.servers", "localhost:9092");
        props_pd.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_pd.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props_pd);

        int n = 1;

		//반복문을 통해 100개의 Message를 보낸다. 
        while (n <= 100) {
            String msg = "hello Kfaka!"+Integer.toString(n);
            producer.send(new ProducerRecord<String, String>("song-topic", msg));
            n++;
            try {
            	//너무 빨라서 딜레이를 주었다. 안줘도 상관없다.
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}

 

 

 

 

 

다음은 Consumer를 구현해보자.

 

 

 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args){

        Properties props_cs = new Properties();
        props_cs.put("bootstrap.servers", "localhost:9092");
        props_cs.put("group.id","consumer-song");
        props_cs.put("enable.auto.commit","true");
        props_cs.put("auto.offset.reset","latest");
        props_cs.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props_cs.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props_cs);
        consumer.subscribe(Arrays.asList("song-topic"));
        try{
        	//반복문을 통해 계속해서 Message를 읽어오려고 한다.
            while(true){
                ConsumerRecords<String,String> records = consumer.poll(1000);
                for(ConsumerRecord<String,String> record: records){
                    System.out.println("-------------------------------------------------------------------------------");
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        }finally {
            consumer.close();
        }
    }
}

 

 

 

실행결과는 동영상으로...

핸드폰 카메라가 깨져서 화질구지지만 자세히 보면 보인다.

왼쪽이 Consumer, 오른쪽이 Producer다.

Producer가 Message를 보내기 시작하면, Consumer에서 즉시 읽어오는 것을 볼 수 있다.

100개의 Message를 모두 읽어오면, 그 이후로는 읽은 Message가 없으므로 읽어오지 않는다.