티스토리 뷰

개발

[ Kafka ] kafka란? docker, Spring 연동

ThisisEmma 2025. 2. 18. 20:12
반응형
Kafka?

실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산형 데이터 스트리밍 플랫폼

 

API로 바로 요청하는 것이 아닌 카프카에서

프로듀서가 메시지를 생산해서

토픽으로 메시지를 보내고

컨슈머가 토픽의 메시지를 가져와 사용

하는 애플리케이션 

 

Kafka Architecture

💁🏻 용어정리

프로듀서 : 토픽으로 데이터를 publishing

컨슈머 : 구독하는 토픽으로부터 데이터를 가져온다(polling)

토픽 : 데이터가 들어가는 장소, 여러개의 파티션으로 구성 

파티션 : Queue와 같은 FIFO형태

브로커 : 카프카 애플리케이션이 설치된 서버 

주키퍼 : 브로커 health check 등 카프카 클러스터의 상태를 관리

 

✨ 중요 ✨
✔︎ 컨슈머는 파티션 polling할 때마다 offset값을 커밋해 에러가 발생하면 어디까지 데이터를 가져왔는지 확인가능
✔︎ 컨슈머가 토픽 내부에서 데이터를 가져가도 데이터는 삭제 되지 않는다. 카프카엣 데이터가 삭제 되는 시점은 옵션으로 지정
✔︎ 토픽을 생성할 때 파티션 수를 설정하는데, 초기 생성 후 늘릴 수 있지만 절대로 줄일 수 없다.
✔︎ 1 컨슈머당 1 파티션이 원칙 
✔︎ 컨슈머 갯수 <= 파티션 갯수 

 

📗 kafka의 장점 

  • 고가용성 및 확장성 
    • 한개의 파티션에 대해 두 대 이상의 브로커에 데이터를 분산저장함으로써 한 브로커에 장애가 나더라도 데이터 유실에 대한 걱정을 덜 수 있다.
    • 대량의 데이터 처리가 가능하고, 시스템 부하에 따라 확장이 가능 
  • 실시간 데이터 처리를 위해 설계되어 데이터의 지연시간이 최소화되었고 실시간 데이터 스트리밍이 가능
  • 메시지를 영속적으로 저장하기 때문에 나중에 다시 데이터를 재처리할 수 있다.

🤔 RabbitMQ ?

 

💁🏻 RabbitMQ vs Kafka

  • RabbitMQ 전통적인 메시지브로커
  • Kafka 이벤트 스트리밍 플랫폼 

https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html

 

여러 개의 Publisher가 Exchange에 메시지를 보내면, 정해진 규칙에 따라 큐에 Routing 되고, Consumer들이 메시지를 처리

👉 kafka와 차이점

kafka는 큐를 구현하지 않는다 

컨슈머가 메시지를 가져가면 큐에 더 이상 남지않고 사라진다 → 다시 이벤트를 받기가 어렵다

소비자와 메시지 브로커의 결합력이 높아져 후에 수평적으로 확장하는 데에 어려움이 있다


🧑🏻‍💻 Docker로 kafka 실행 해보기

docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

 

Terminal 에서 docker 실행 

// docker 실행
docker-compose up -d 
// docker 실행 컨테이너 확인 
docker ps
// 카프카 컨테이너 접속 
docker exec -it kafka /bin/bash

 


🧑🏻‍💻 토픽 생성, 확인, 삭제 

// 토픽생성
kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
// 토픽 리스트 확인 
kafka-topics.sh --list --bootstrap-server localhost:9092
// 토픽 상세 조회
kafka-topics.sh --describe --topic topic1 --bootstrap-server kafka:9092
// 토픽 삭제
kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic topic1

 

 


🧑🏻‍💻 Spring

gradle dependencies
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

 

KafkaConfig.java

@Configuration
public class KafkaConfig {

    // Kafka producer를 생성하는 팩토리
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        // Producer가 처음으로 연결할 kafka 브로커 위치
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // Producer가 key,value 데이터를 브로커로 전송하기 전에 직렬화과정 필요
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    // KafkaTemplate을 생성하여 Kafka 프로듀서를 사용하여 메시지를 보낼 수 있도록 한다.
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        /*
         default Round Robin 전체를 다시 재할당
                 Sticky 재할당되는 partition 은 중단
         */
        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList(StickyAssignor.class));
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        // 메시지를 동시에 처리할 수 있는 메시지 리스너 컨테이너를 생성
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }
}

 


참고

 

[Kafka] 카프카 주요 개념 정리

Apache Kafka는 분산 메시징 시스템(A high-throuhput distributed messaging system)이다. 2011년 링크드인에서 처음 개발 됐다. 자사 웹사이트의 이벤트 체크 목적으로 만들어지기 시작했고 2014년 아파치 재단으

deep-jin.tistory.com

 

 

Comparison of RabbitMQ, Kafka, ActiveMQ

we comparison Message Ordering, Message Retention, Message Lifetime, Message Priorities, Performance of Kafka, RabbitMQ and ActiveMQ.

techblogs.42gears.com

 

 

[카프카] 용어 정리🔍

회사에서 IOT 데이터를 스트리밍하기 위하여 Kafka를 사용하고 있다.DevOps Engineer로서 운영 관점에서 Kafka를 잘 사용하는 방법을 공부하고자 카프카 시리즈를 시작한다!첫 게시물이니카프카의 아주

velog.io

 

반응형