사전 준비

- 이번에는 Kafka의 메시지를 주는 Producer와 받는 Consumer 서버를 SpringBoot로 만들어 볼 것이다.
- 이에 앞서 AWS EC2에 Kafka가 설치되어 있어야 한다.
- 만약 필요하다면 밑의 링크를 참고하면 됩니다.
[실습 전 필요한 인프라]
2. Kafka 간단히 설치해보기 (feat. EC2) (+ 실습)
환경 세팅 전 갖추어야할 것AWS 계정적당한 성능의 EC2이번 실습은 AWS 프리티어에서 진행하기 위해 EC2 t2.micro (메모리 1GB)에서 진행함EC2에 Kafka 설치 / 실행하기1. JDK 17 설치하기Kafka를 실행시키려
student-developer-story.tistory.com
[필요한 이론]
1. Kafka와 메시지큐 (정의, 형태)
Kafka와 메시지 큐Kafka란?대규모 데이터를 처리할 수 있는 메시지 큐메시지 큐란?큐 형태에 데이터를 일시적으로 저장하는 임시 저장소메시지 큐를 사용하면 데이터를 비동기로 처리할 수 있어서
student-developer-story.tistory.com
3-1. Kafka의 핵심 요소와 작동 (Topic, Consumer, Producer, Consumer Group, OffSet, CURRENT OFFSET) (+ 실습)
Kafka의 기본 구성Producer: Kafka에 메시지(데이터)를 전달하는 주체Consumer: Kafka에 메시지(데이터)를 처리하는 주체Topic: Kafka에 넣을 메시지의 종류를 구분하는 개념 (카테고리와 비슷)작동 과정Produce
student-developer-story.tistory.com
Producer 만들기
1. Spring 프로젝트 생성

- 기준 java 버전은 21 이다.
- Dependencies는 Spring Boot DevTools, Spring Web, Spring for Apache Kafka, lombok이 필요하다.
2. application.yml에 Kafka 연결을 위한 정보 작성하기
spring:
kafka:
# Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
bootstrap-servers: {EC2_URL}:9092
producer:
# 메시지의 key 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 메시지의 value 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3. Controller 만들기
EmailController
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/emails")
public class EmailController {
private final EmailService emailService; // 조금 이따가 만들것
@PostMapping
public ResponseEntity<String> sendEmail(
@RequestBody SendEmailRequestDto sendEmailRequestDto
) {
emailService.sendEmail(sendEmailRequestDto);
return ResponseEntity.ok("이메일 발송 요청 완료");
}
}
SendEmailRequestDto (Request Body 정보를 받을 객체)
@Getter
public class SendEmailRequestDto {
private String from; // 발신자 이메일
private String to; // 수신자 이메일
private String subject; // 이메일 제목
private String body; // 이메일 본문
}
4. Service 만들기
EmailSendMessage (Kafka에 전달할 메시지의 정보를 담는 객체)
@AllArgsConstructor
@Getter
public class EmailSendMessage {
private String from;
private String to;
private String subject;
private String body;
}
EmailService
@Service
@RequiredArgsConstructor
public class EmailService {
// <메시지의 Key 타입, 메시지의 Value 타입>
// Kafka에 넣는 메시지는 Key-Value 형태로 넣을 수도 있고,
// Key는 생략한 채로 Value만 넣을 수도 있다고 얘기했다.
// 실습에서는 메시지를 만들 때 key는 생략한 채로 value만 넣을 예정이다.
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendEmail(SendEmailRequestDto request) {
EmailSendMessage emailSendMessage = new EmailSendMessage(
request.getFrom(),
request.getTo(),
request.getSubject(),
request.getBody()
);
// 위에서 메시지의 valueEmailSendMessage 타입을 String으로 설정을 했다.
// 그래서 객체를 String으로 변환해서 넣어주어야 한다.
this.kafkaTemplate.send("email.send", toJsonString(emailSendMessage));
}
// 객체를 Json 형태의 String으로 만들어주는 메서드
// (클래스로 분리하면 더 좋지만 편의를 위해 메서드로만 분리)
private String toJsonString(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String message = objectMapper.writeValueAsString(object);
return message;
} catch (JsonProcessingException e) {
throw new RuntimeException("Json 직렬화 실패");
}
}
}
Spring Boot가 Kafka에 메시지 잘 넣는 지 테스트해보기
깔끔한 테스트를 위해 기존에 생성되어 있는 Kafka 리소스 삭제하기
1. 전체 토픽 조회하기
# 전체 토픽 조회하기
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

2. 토픽 삭제하기
# 토픽 삭제
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--delete --topic email.send
# 잘 삭제됐는 지 확인하기
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

3. 전체 컨슈머 그룹 조회하기
# 전체 컨슈머 그룹 조회하기
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

4. 컨슈머 그룹 삭제하기
# 컨슈머 그룹 삭제
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete \
--group email-send-group
# 잘 삭제됐는 지 확인하기
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
5. 토픽 다시 생성하기
# 토픽 생성
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic email.send
# 토픽 잘 생성됐는 지 확인하기
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

Spring Boot가 Kafka에 메시지 잘 넣는 지 테스트해보기
1. Spring Boot 서버 실행시키기

2. API 요청 보내기

- 혹시나 직렬화 실패가 뜬다면 메시지 전송 객체에 getter가 있는 지 보자
3. Kafka의 "email.send" 토픽에 메시지가 잘 들어갔는 지 확인하기
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic email.send \
--from-beginning

Consumer 만들기
1. Spring 프로젝트 생성하기

- producer와 같은 스펙으로 consumer를 맞춰준다.
2. application.yml에 Kafka 연결을 위한 정보 작성하기
server:
port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지)
spring:
kafka:
# Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
bootstrap-servers: {EC2_URL}:9092
consumer:
# 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음.
# 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음.
# 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때,
# 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽어버린다.
# 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락돼버린다.
auto-offset-reset: earliest
3. Kafka의 메시지를 가져와 담을 객체 만들기
EmailSendMessage (Kafka로부터 전달받는 메시지의 정보를 담을 객체)
@Getter
@NoArgsConstructor // 역직렬화시에 필요
@AllArgsConstructor
public class EmailSendMessage {
private String from; // 발신자 이메일
private String to; // 수신자 이메일
private String subject; // 이메일 제목
private String body; // 이메일 본문
public static EmailSendMessage fromJson(String json) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, EmailSendMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON 파싱 실패");
}
}
}
EmailSendConsumer
@Service
public class EmailSendConsumer {
@KafkaListener(
topics = "email.send",
groupId = "email-send-group" // 컨슈머 그룹 이름
)
public void consume(String message) {
System.out.println("Kafka로부터 받아온 메시지: " + message);
EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
// ... 실제 이메일 발송 로직은 생략 ...
System.out.println("이메일 발송 완료");
}
}
잘 작동하는 지 테스트해보기
1. Spring Boot 서버 실행시키기

- 위에서 producer에서 넣었던 메시지가 나오는 모습을 볼 수 있다.
2. API 요청을 통해 Kafka에 메시지를 추가로 넣어보기

- 당연하게도 Producer 쪽으로 API를 보내야한다.

3. CLI로 Consumer Group 확인해보기
# 컨슈머 그룹 세부 정보 조회하기
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group email-send-group \
--describe

- consumer service 만들 때 넣었던 이름이 있는 것을 알 수 있다.
Kafka의 비동기 처리로 인한 성능 이점 느껴보기
1. Consumer 서버 코드 수정하기
@Service
public class EmailSendConsumer
{
@KafkaListener(
topics = "email.send",
groupId = "email-send-group" // 컨슈머 그룹 이름
)
public void consume(String message) {
System.out.println("Kafka로부터 받아온 메시지: " + message);
EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
// 이메일 발송 시간이 3000ms가 포함 된다고 가정
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("이메일 발송 완료");
}
}
2. 이메일 발송 API 요청 보내보기

- 이메일 발송해보니 37ms만 걸리는 것을 볼 수 있음
- 사용자 입장에서는 이메일 발송 시간을 체감하지 않을 수 있다는 이점이 있음
비동기로 처리하면 Consumer가 제대로 작업을 처리했는 지 어떻게 확신할 수 있을까?

- 기존에 REST API 방식을 활용해 동기적으로 처리할 때는 이메일 발송 처리 작업이 끝날 때까지 기다렸다가 응답을 한다. 그러다보니 비교적 응답 속도가 느릴 수 밖에 없다. 그리고 이메일 발송 처리 작업이 끝난 뒤에 응답을 하기 때문에, 이메일 발송의 성공 여부를 확인하고 그에 맞게 응답을 할 수 있다.

- Kafka와 같은 메시지 큐를 활용한 비동기 처리는 위에서 보았듯이 사용자에게 빠르게 응답을 줄 수 있다는 큰 장점이 있다.
- 하지만, 사용자에게 작업의 실제 성공 여부를 확인하지 않고 응답을 먼저 보내버리기에 실패해도 사용자에게 알려줄 수 없다.
- 예를 들어, 이메일 발송 요청을 Kafka에 메시지로 넣고 25ms 만에 성공 응답을 보냈지만, 이후 Consumer 쪽에서 잘못된 이메일 주소로 인해 이메일 발송 도중 실패가 발생할 수 있다. 이 때는 이미 사용자에게 성공 응답을 보낸 상태이기 때문에 실패를 다시 사용자에게 알려줄 방법이 없다.
- 이러한 비동기 구조의 단점을 보완하기 위해 시스템에서는 다양한 보완 전략을 사용한다. 대표적으로는 메시지 처리 중 실패가 발생했을 때 자동으로 재시도(retry)하는 방식, 여러 번의 재시도 끝에도 실패한 메시지를 별도로 보관하는 Dead Letter Topic(DLT)을 활용하는 방식을 주로 활용한다.
- 다음에는 이것에 공부를 해보고 정리해봐야겠다.
'개발 공부 > kafka' 카테고리의 다른 글
| 6. Kafka 파티션 이용해서 성능 향상 시키기 (+실습) (0) | 2025.12.01 |
|---|---|
| 5. Kafka 메시지 처리 실패 시 대처 방법 (feat. Spring) (DLT, 재시도) (+실습) (0) | 2025.11.24 |
| 3-2. Kafka 토픽 네이밍 규칙 (0) | 2025.11.21 |
| 3-1. Kafka의 핵심 요소와 작동 (Topic, Consumer, Producer, Consumer Group, OffSet, CURRENT OFFSET) (+ 실습) (0) | 2025.11.21 |
| 2. Kafka 간단히 설치해보기 (feat. EC2) (+실습) (0) | 2025.11.20 |