티스토리 뷰
Kafka?
실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산형 데이터 스트리밍 플랫폼
API로 바로 요청하는 것이 아닌 카프카에서
프로듀서가 메시지를 생산해서
토픽으로 메시지를 보내고
컨슈머가 토픽의 메시지를 가져와 사용
하는 애플리케이션
💁🏻 용어정리
프로듀서 : 토픽으로 데이터를 publishing
컨슈머 : 구독하는 토픽으로부터 데이터를 가져온다(polling)
토픽 : 데이터가 들어가는 장소, 여러개의 파티션으로 구성
파티션 : Queue와 같은 FIFO형태
브로커 : 카프카 애플리케이션이 설치된 서버
주키퍼 : 브로커 health check 등 카프카 클러스터의 상태를 관리
✨ 중요 ✨
✔︎ 컨슈머는 파티션 polling할 때마다 offset값을 커밋해 에러가 발생하면 어디까지 데이터를 가져왔는지 확인가능
✔︎ 컨슈머가 토픽 내부에서 데이터를 가져가도 데이터는 삭제 되지 않는다. 카프카엣 데이터가 삭제 되는 시점은 옵션으로 지정
✔︎ 토픽을 생성할 때 파티션 수를 설정하는데, 초기 생성 후 늘릴 수 있지만 절대로 줄일 수 없다.
✔︎ 1 컨슈머당 1 파티션이 원칙
✔︎ 컨슈머 갯수 <= 파티션 갯수
📗 kafka의 장점
- 고가용성 및 확장성
- 한개의 파티션에 대해 두 대 이상의 브로커에 데이터를 분산저장함으로써 한 브로커에 장애가 나더라도 데이터 유실에 대한 걱정을 덜 수 있다.
- 대량의 데이터 처리가 가능하고, 시스템 부하에 따라 확장이 가능
- 실시간 데이터 처리를 위해 설계되어 데이터의 지연시간이 최소화되었고 실시간 데이터 스트리밍이 가능
- 메시지를 영속적으로 저장하기 때문에 나중에 다시 데이터를 재처리할 수 있다.
🤔 RabbitMQ ?
💁🏻 RabbitMQ vs Kafka
- RabbitMQ 전통적인 메시지브로커
- Kafka 이벤트 스트리밍 플랫폼
여러 개의 Publisher가 Exchange에 메시지를 보내면, 정해진 규칙에 따라 큐에 Routing 되고, Consumer들이 메시지를 처리
👉 kafka와 차이점
kafka는 큐를 구현하지 않는다
컨슈머가 메시지를 가져가면 큐에 더 이상 남지않고 사라진다 → 다시 이벤트를 받기가 어렵다
소비자와 메시지 브로커의 결합력이 높아져 후에 수평적으로 확장하는 데에 어려움이 있다
🧑🏻💻 Docker로 kafka 실행 해보기
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Terminal 에서 docker 실행
// docker 실행
docker-compose up -d
// docker 실행 컨테이너 확인
docker ps
// 카프카 컨테이너 접속
docker exec -it kafka /bin/bash
🧑🏻💻 토픽 생성, 확인, 삭제
// 토픽생성
kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3
// 토픽 리스트 확인
kafka-topics.sh --list --bootstrap-server localhost:9092
// 토픽 상세 조회
kafka-topics.sh --describe --topic topic1 --bootstrap-server kafka:9092
// 토픽 삭제
kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic topic1
🧑🏻💻 Spring
gradle dependencies
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
KafkaConfig.java
@Configuration
public class KafkaConfig {
// Kafka producer를 생성하는 팩토리
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Producer가 처음으로 연결할 kafka 브로커 위치
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Producer가 key,value 데이터를 브로커로 전송하기 전에 직렬화과정 필요
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
// KafkaTemplate을 생성하여 Kafka 프로듀서를 사용하여 메시지를 보낼 수 있도록 한다.
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/*
default Round Robin 전체를 다시 재할당
Sticky 재할당되는 partition 은 중단
*/
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList(StickyAssignor.class));
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
// 메시지를 동시에 처리할 수 있는 메시지 리스너 컨테이너를 생성
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
참고
[Kafka] 카프카 주요 개념 정리
Apache Kafka는 분산 메시징 시스템(A high-throuhput distributed messaging system)이다. 2011년 링크드인에서 처음 개발 됐다. 자사 웹사이트의 이벤트 체크 목적으로 만들어지기 시작했고 2014년 아파치 재단으
deep-jin.tistory.com
Comparison of RabbitMQ, Kafka, ActiveMQ
we comparison Message Ordering, Message Retention, Message Lifetime, Message Priorities, Performance of Kafka, RabbitMQ and ActiveMQ.
techblogs.42gears.com
[카프카] 용어 정리🔍
회사에서 IOT 데이터를 스트리밍하기 위하여 Kafka를 사용하고 있다.DevOps Engineer로서 운영 관점에서 Kafka를 잘 사용하는 방법을 공부하고자 카프카 시리즈를 시작한다!첫 게시물이니카프카의 아주
velog.io
'개발' 카테고리의 다른 글
[Cache] 캐시란 ? (0) | 2025.01.31 |
---|---|
[ DB락 활용 ] Ecommerce 프로젝트 동시성 제어와 테스트 (0) | 2025.01.22 |
[Springboot 게시판] postgreSql brew 다운로드 (0) | 2024.12.03 |
[Springboot JPA 게시판] 프로젝트 생성 (0) | 2024.12.02 |
ERD란 ? (0) | 2024.11.27 |
- Total
- Today
- Yesterday
- postgresql brew
- JavaScript
- 풀스택
- Springboot jpa
- 항해후기
- google commit convention
- 개발자
- html
- mock해야하는대상과아닌것
- commit convetion
- spring
- 캐시란
- css
- 티스토리챌린지
- Java
- 오블완
- 웹개발
- postgresql 다운로드
- tdd개발
- Grammarly
- 프로그래밍
- 백엔드개발
- API
- 캐시스탬피드
- synchronized 단점
- mock사용법
- spring.io.start
- 더현대 크리스마스 현장대기
- ChatGPT
- java test 개발
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |