4. Kafka의 Producer, Consumer 서버 만들기 (feat. Spring) (+실습)

2025. 11. 23. 18:16·개발 공부/kafka

사전 준비

  • 이번에는 Kafka의 메시지를 주는 Producer와 받는 Consumer 서버를 SpringBoot로 만들어 볼 것이다.
  • 이에 앞서 AWS EC2에 Kafka가 설치되어 있어야 한다.
    • 만약 필요하다면 밑의 링크를 참고하면 됩니다.

[실습 전 필요한 인프라]

 

2. Kafka 간단히 설치해보기 (feat. EC2) (+ 실습)

환경 세팅 전 갖추어야할 것AWS 계정적당한 성능의 EC2이번 실습은 AWS 프리티어에서 진행하기 위해 EC2 t2.micro (메모리 1GB)에서 진행함EC2에 Kafka 설치 / 실행하기1. JDK 17 설치하기Kafka를 실행시키려

student-developer-story.tistory.com

[필요한 이론]

 

1. Kafka와 메시지큐 (정의, 형태)

Kafka와 메시지 큐Kafka란?대규모 데이터를 처리할 수 있는 메시지 큐메시지 큐란?큐 형태에 데이터를 일시적으로 저장하는 임시 저장소메시지 큐를 사용하면 데이터를 비동기로 처리할 수 있어서

student-developer-story.tistory.com

 

3-1. Kafka의 핵심 요소와 작동 (Topic, Consumer, Producer, Consumer Group, OffSet, CURRENT OFFSET) (+ 실습)

Kafka의 기본 구성Producer: Kafka에 메시지(데이터)를 전달하는 주체Consumer: Kafka에 메시지(데이터)를 처리하는 주체Topic: Kafka에 넣을 메시지의 종류를 구분하는 개념 (카테고리와 비슷)작동 과정Produce

student-developer-story.tistory.com

 

Producer 만들기

1. Spring 프로젝트 생성

  • 기준 java 버전은 21 이다.
  • Dependencies는 Spring Boot DevTools, Spring Web, Spring for Apache Kafka, lombok이 필요하다.

2. application.yml에 Kafka 연결을 위한 정보 작성하기

spring:
  kafka:
    # Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
    bootstrap-servers: {EC2_URL}:9092
    producer:
      # 메시지의 key 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 메시지의 value 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

3. Controller 만들기

EmailController

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/emails")
public class EmailController {

    private final EmailService emailService; // 조금 이따가 만들것

    @PostMapping
    public ResponseEntity<String> sendEmail(
            @RequestBody SendEmailRequestDto sendEmailRequestDto
    ) {
        emailService.sendEmail(sendEmailRequestDto);
        return ResponseEntity.ok("이메일 발송 요청 완료");
    }
}

 

SendEmailRequestDto (Request Body 정보를 받을 객체)

@Getter
public class SendEmailRequestDto {
    private String from; // 발신자 이메일
    private String to; // 수신자 이메일
    private String subject; // 이메일 제목
    private String body; // 이메일 본문
}

 

4. Service 만들기

EmailSendMessage (Kafka에 전달할 메시지의 정보를 담는 객체)

@AllArgsConstructor
@Getter
public class EmailSendMessage {
    private String from;
    private String to;
    private String subject;
    private String body;
}

EmailService

@Service
@RequiredArgsConstructor
public class EmailService {

  // <메시지의 Key 타입, 메시지의 Value 타입>
  // Kafka에 넣는 메시지는 Key-Value 형태로 넣을 수도 있고, 
  // Key는 생략한 채로 Value만 넣을 수도 있다고 얘기했다.
  // 실습에서는 메시지를 만들 때 key는 생략한 채로 value만 넣을 예정이다.
  private final KafkaTemplate<String, String> kafkaTemplate;

  public void sendEmail(SendEmailRequestDto request) {
    EmailSendMessage emailSendMessage = new EmailSendMessage(
      request.getFrom(),
      request.getTo(),
      request.getSubject(),
      request.getBody()
    );

    // 위에서 메시지의 valueEmailSendMessage 타입을 String으로 설정을 했다.
    // 그래서  객체를 String으로 변환해서 넣어주어야 한다. 
    this.kafkaTemplate.send("email.send", toJsonString(emailSendMessage));
  }

  // 객체를 Json 형태의 String으로 만들어주는 메서드
  // (클래스로 분리하면 더 좋지만 편의를 위해 메서드로만 분리)
  private String toJsonString(Object object) {
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      String message = objectMapper.writeValueAsString(object);
      return message;
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Json 직렬화 실패");
    }
  }
}

 

Spring Boot가 Kafka에 메시지 잘 넣는 지 테스트해보기

깔끔한 테스트를 위해 기존에 생성되어 있는 Kafka 리소스 삭제하기

1. 전체 토픽 조회하기

# 전체 토픽 조회하기
bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--list

 

2. 토픽 삭제하기

# 토픽 삭제
bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--delete --topic email.send
	
# 잘 삭제됐는 지 확인하기
bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--list

 

3. 전체 컨슈머 그룹 조회하기

# 전체 컨슈머 그룹 조회하기
bin/kafka-consumer-groups.sh \
	--bootstrap-server localhost:9092 \
	--list

 

4. 컨슈머 그룹 삭제하기

# 컨슈머 그룹 삭제
bin/kafka-consumer-groups.sh \
	--bootstrap-server localhost:9092 \
	--delete \
	--group email-send-group
	
# 잘 삭제됐는 지 확인하기
bin/kafka-consumer-groups.sh \
	--bootstrap-server localhost:9092 \
	--list

 

5. 토픽 다시 생성하기

# 토픽 생성
bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--create --topic email.send
	
# 토픽 잘 생성됐는 지 확인하기
bin/kafka-topics.sh \
	--bootstrap-server localhost:9092 \
	--list

 

Spring Boot가 Kafka에 메시지 잘 넣는 지 테스트해보기

1. Spring Boot 서버 실행시키기

2. API 요청 보내기

  • 혹시나 직렬화 실패가 뜬다면 메시지 전송 객체에 getter가 있는 지 보자

3. Kafka의 "email.send" 토픽에 메시지가 잘 들어갔는 지 확인하기

bin/kafka-console-consumer.sh \
	--bootstrap-server localhost:9092 \
	--topic email.send \
	--from-beginning

 

Consumer 만들기

1. Spring 프로젝트 생성하기

  • producer와 같은 스펙으로 consumer를 맞춰준다.

 

2. application.yml에 Kafka 연결을 위한 정보 작성하기

server:
  port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지)

spring:
  kafka:
    # Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
    bootstrap-servers: {EC2_URL}:9092
    consumer:
      # 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      
      # 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음.
      # 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음.
      
      # 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때, 
      # 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽어버린다.
      # 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락돼버린다.
      auto-offset-reset: earliest

 

3. Kafka의 메시지를 가져와 담을 객체 만들기

EmailSendMessage (Kafka로부터 전달받는 메시지의 정보를 담을 객체)

@Getter
@NoArgsConstructor // 역직렬화시에 필요
@AllArgsConstructor
public class EmailSendMessage {

    private String from;         // 발신자 이메일
    private String to;           // 수신자 이메일
    private String subject;      // 이메일 제목
    private String body;         // 이메일 본문

    public static EmailSendMessage fromJson(String json) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.readValue(json, EmailSendMessage.class);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON 파싱 실패");
        }
    }
}

 

EmailSendConsumer

@Service
public class EmailSendConsumer {

    @KafkaListener(
            topics = "email.send",
            groupId = "email-send-group" // 컨슈머 그룹 이름
    )
    public void consume(String message) {
        System.out.println("Kafka로부터 받아온 메시지: " + message);

        EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);

        // ... 실제 이메일 발송 로직은 생략 ...

        System.out.println("이메일 발송 완료");
    }
}

 

잘 작동하는 지 테스트해보기

1. Spring Boot 서버 실행시키기

  • 위에서 producer에서 넣었던 메시지가 나오는 모습을 볼 수 있다.

 

2. API 요청을 통해 Kafka에 메시지를 추가로 넣어보기

  • 당연하게도 Producer 쪽으로 API를 보내야한다.

 

3. CLI로 Consumer Group 확인해보기

# 컨슈머 그룹 세부 정보 조회하기
bin/kafka-consumer-groups.sh \
	--bootstrap-server localhost:9092 \
	--group email-send-group \
	--describe

  • consumer service 만들 때 넣었던 이름이 있는 것을 알 수 있다.

 

Kafka의 비동기 처리로 인한 성능 이점 느껴보기

1. Consumer 서버 코드 수정하기

@Service
public class EmailSendConsumer
{

    @KafkaListener(
            topics = "email.send",
            groupId = "email-send-group" // 컨슈머 그룹 이름
    )
    public void consume(String message) {
        System.out.println("Kafka로부터 받아온 메시지: " + message);
    
        EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
    	
        // 이메일 발송 시간이 3000ms가 포함 된다고 가정
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    
        System.out.println("이메일 발송 완료");
    }
}

 

2. 이메일 발송 API 요청 보내보기

  • 이메일 발송해보니 37ms만 걸리는 것을 볼 수 있음
  • 사용자 입장에서는 이메일 발송 시간을 체감하지 않을 수 있다는 이점이 있음

 

비동기로 처리하면 Consumer가 제대로 작업을 처리했는 지 어떻게 확신할 수 있을까?

  • 기존에 REST API 방식을 활용해 동기적으로 처리할 때는 이메일 발송 처리 작업이 끝날 때까지 기다렸다가 응답을 한다. 그러다보니 비교적 응답 속도가 느릴 수 밖에 없다. 그리고 이메일 발송 처리 작업이 끝난 뒤에 응답을 하기 때문에, 이메일 발송의 성공 여부를 확인하고 그에 맞게 응답을 할 수 있다.

 

  • Kafka와 같은 메시지 큐를 활용한 비동기 처리는 위에서 보았듯이 사용자에게 빠르게 응답을 줄 수 있다는 큰 장점이 있다.
  • 하지만, 사용자에게 작업의 실제 성공 여부를 확인하지 않고 응답을 먼저 보내버리기에 실패해도 사용자에게 알려줄 수 없다.
    • 예를 들어, 이메일 발송 요청을 Kafka에 메시지로 넣고 25ms 만에 성공 응답을 보냈지만, 이후 Consumer 쪽에서 잘못된 이메일 주소로 인해 이메일 발송 도중 실패가 발생할 수 있다. 이 때는 이미 사용자에게 성공 응답을 보낸 상태이기 때문에 실패를 다시 사용자에게 알려줄 방법이 없다.
  • 이러한 비동기 구조의 단점을 보완하기 위해 시스템에서는 다양한 보완 전략을 사용한다. 대표적으로는 메시지 처리 중 실패가 발생했을 때 자동으로 재시도(retry)하는 방식, 여러 번의 재시도 끝에도 실패한 메시지를 별도로 보관하는 Dead Letter Topic(DLT)을 활용하는 방식을 주로 활용한다.
    • 다음에는 이것에 공부를 해보고 정리해봐야겠다.

 

출처: https://inf.run/qzJua

저작자표시 (새창열림)

'개발 공부 > kafka' 카테고리의 다른 글

6. Kafka 파티션 이용해서 성능 향상 시키기 (+실습)  (0) 2025.12.01
5. Kafka 메시지 처리 실패 시 대처 방법 (feat. Spring) (DLT, 재시도) (+실습)  (0) 2025.11.24
3-2. Kafka 토픽 네이밍 규칙  (0) 2025.11.21
3-1. Kafka의 핵심 요소와 작동 (Topic, Consumer, Producer, Consumer Group, OffSet, CURRENT OFFSET) (+ 실습)  (0) 2025.11.21
2. Kafka 간단히 설치해보기 (feat. EC2) (+실습)  (0) 2025.11.20
'개발 공부/kafka' 카테고리의 다른 글
  • 6. Kafka 파티션 이용해서 성능 향상 시키기 (+실습)
  • 5. Kafka 메시지 처리 실패 시 대처 방법 (feat. Spring) (DLT, 재시도) (+실습)
  • 3-2. Kafka 토픽 네이밍 규칙
  • 3-1. Kafka의 핵심 요소와 작동 (Topic, Consumer, Producer, Consumer Group, OffSet, CURRENT OFFSET) (+ 실습)
Jamey
Jamey
  • Jamey
    컴공 대학생의 이야기
    Jamey
  • 전체
    오늘
    어제
    • 분류 전체보기 (36)
      • 개발 공부 (33)
        • k8s (24)
        • kafka (8)
        • AI (1)
      • 개발기 (2)
      • 프로젝트 홍보 (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

    • 깃허브
  • 공지사항

  • 인기 글

  • 태그

    current offset
    토픽
    AI
    Linux
    K8S
    Graphana
    llm최적화
    Producer
    Kafka
    Jenkins
    consumer
    조인 쿼리
    Rag
    topic
    카프카
    Kubernetes
    langchain
    serialDB
    sql자동화
    cloudflare workers
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.6
Jamey
4. Kafka의 Producer, Consumer 서버 만들기 (feat. Spring) (+실습)
상단으로

티스토리툴바