Continuous Challenge

[항해플러스 7기 백엔드] 9주차 회고 - Kafka를 활용한 이벤트 비동기 처리 (feat. Transactional Outbox Pattern) 본문

Study/항해플러스 7기

[항해플러스 7기 백엔드] 9주차 회고 - Kafka를 활용한 이벤트 비동기 처리 (feat. Transactional Outbox Pattern)

응굥 2025. 5. 10. 00:43

어느덧 9주차...!

피로가 누적되어 점차 지쳐갔고, 정신력으로 버텼던 때인 것 같다..

2주 남았다는 사실 하나로 버티고 버텼던 한 주였다.


Kafka 기반 Transactional Outbox Pattern 이해하기

1. 개요

이 문서에서는 Kafka를 사용하는 이벤트 기반 아키텍처에서 데이터 정합성을 보장하기 위한 전략으로 Transactional Outbox Pattern을 설명합니다. 이 패턴은 데이터베이스와 메시지 브로커 간의 트랜잭션 불일치 문제를 해결하여 이벤트 유실 없이 안정적인 데이터 처리를 가능하게 합니다. 특히 Spring Boot, Kafka, MySQL을 사용하는 실무 환경을 바탕으로 설명합니다.

2. 문제 정의: 데이터베이스와 Kafka 간 트랜잭션 불일치

이벤트 기반 시스템에서는 일반적으로 비즈니스 데이터를 데이터베이스에 저장한 뒤 Kafka로 이벤트를 발행합니다. 하지만 이 두 작업은 서로 다른 트랜잭션으로 처리되기 때문에 다음과 같은 문제가 발생할 수 있습니다.

  • DB 저장 성공, Kafka 발행 실패 → 이벤트 유실
  • Kafka 발행 성공, DB 롤백 → 중복 처리 위험

이러한 불일치는 시스템의 신뢰성과 정합성을 떨어뜨리는 주요 원인입니다.

3. 해결책: Transactional Outbox Pattern

Transactional Outbox Pattern은 이벤트 데이터를 별도의 Outbox 테이블에 저장한 후, 이 테이블의 데이터를 Kafka로 비동기적으로 전송하는 방식입니다. 핵심은 DB 저장과 Outbox 기록을 단일 트랜잭션으로 처리하여 두 작업 간의 정합성을 확보하는 데 있습니다.

4. 아키텍처 구성

4.1 주요 흐름

  1. 애플리케이션은 비즈니스 데이터를 DB에 저장하면서 동시에 Outbox 테이블에 이벤트 정보를 기록합니다 (하나의 트랜잭션).
  2. 별도의 Polling 프로세스 또는 CDC(Change Data Capture) 도구가 Outbox 테이블을 감시합니다.
  3. 새로운 이벤트가 감지되면 Kafka에 메시지를 발행합니다.
  4. 발행 성공 시 해당 Outbox 레코드를 삭제하거나 상태를 업데이트합니다.

이 구조는 Kafka와 DB 사이의 일관성을 유지하면서 이벤트 유실 없이 안정적인 메시지 처리를 보장합니다.

5. 장점과 단점

5.1 장점

  • 메시지 유실 방지
  • 트랜잭션 일관성 확보
  • 장애 발생 시 재처리 가능

5.2 단점

  • Polling 기반 구현 시 지연 가능성
  • 별도의 컴포넌트(CDC, 스케줄러 등) 도입 필요
  • Outbox 테이블 관리 필요

Kafka 트랜잭션과의 비교

Kafka는 자체 트랜잭션 기능을 제공하지만, DB와 Kafka를 함께 포함하는 단일 트랜잭션은 어렵습니다.

이 때문에 Transactional Outbox Pattern이 더 현실적이고 널리 사용됩니다.

6. 실사용 예제: Spring Boot + Kafka + MySQL

6.1 시스템 구성

  • 도메인 이벤트 발행: 결제 성공 시 PaymentSuccessEvent 객체를 생성하고 이벤트 발행
  • Outbox 기록: 트랜잭션 커밋 전에 Outbox 테이블에 이벤트 데이터를 저장
  • Kafka 발행: 트랜잭션 커밋 이후 Kafka로 이벤트 발행
  • 장애 복구: Outbox 테이블을 기반으로 재시도 가능

6.2 코드 예제

1. PaymentFacade.java – 결제 및 이벤트 발행

@Transactional
public Payment payment(PaymentCommand command) {
	userService.useBalance(command.getUserId(), command.getPayAmount());
    Payment payment = paymentService.pay(command.getOrderId(), command.getPayAmount());
	orderService.completeOrder(command.getOrderId());

    OrderInfo orderInfo = orderService.getOrderInfo(command.getOrderId());
    // 결제 성공 이벤트 발행
    paymentEventPublisher.send(new PaymentSuccessEvent(orderInfo));

    return payment;
}

2. PaymentEventListener.java – Outbox 저장 및 Kafka 발행

@Component
@RequiredArgsConstructor
public class PaymentEventListener {

    private final OutboxService outboxService;
    private final KafkaProducer kafkaProducer;

    @Value("${commerce-api.payment.topic-name}")
    String topic;

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void paymentSuccessHandler(PaymentSuccessEvent event) {
        Long orderId = event.getOrderInfo().getOrderId();
        kafkaProducer.send(topic, orderId.toString(), event);
    }

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    public void saveOutbox(PaymentSuccessEvent event) {
        outboxService.save(PaymentOutbox.of(event.getOrderInfo().getOrderId().toString(), OutboxStatus.INIT, JsonUtil.toJson(event)));
    }
}

3. OutboxService.java - Outbox 테이블 관리

@Service
@RequiredArgsConstructor
public class OutboxService {

    private final OutboxRepository outboxRepository;

    public PaymentOutbox save(PaymentOutbox outbox) {
        return outboxRepository.save(outbox);
    }

    public List<PaymentOutbox> getUnSuccessedEventList(int limit) {
        return outboxRepository.findUnSuccessedEventList(limit);
    }
}

4. KafkaProducer.java - Kafka 발행 처리

@Slf4j
@Service
public class KafkaProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String topic, String messageId, Object message) {
        kafkaTemplate.send(topic, messageId, message).whenComplete((result, throwable) -> {
            if (throwable != null) {
                log.error("[Produce Fail] message: {}", throwable.getMessage(), throwable);
            } else {
                log.debug("[Produce Success] topic:{}, offset: {}",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().offset());
            }
        });
    }
}

5. PaymentOutbouxFacade.java - 실패한 이벤트 재처리

@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentSuccessEventConsumer {

    private final OutboxService outboxService;
    private final PlatformClient platformClient;

    // 결제 성공 이벤트 소비 → 외부 플랫폼 호출
    @KafkaListener(topics = "${commerce-api.payment.topic-name}", groupId = "payment-group")
    public void sendToPlatform(ConsumerRecord<String, byte[]> record, Acknowledgment acknowledgment) {
        try {
            log.info("[payment-group] value: {}", record.value());
            String jsonString = new String(record.value());
            PaymentSuccessEvent event = JsonUtil.fromJson(jsonString, PaymentSuccessEvent.class);
            platformClient.send(event.getOrderInfo());
            log.info("데이터 플랫폼 전송 성공 : {}", event.getOrderInfo());
        } catch (Exception e) {
            log.error(e.getMessage());
            PaymentOutbox outbox = outboxService.getOutboxById(record.key());
            outboxService.save(outbox.fail());
        }
    }

    // Outbox 상태 성공 처리
    @KafkaListener(topics = "${commerce-api.payment.topic-name}", groupId = "payment-outbox")
    public void consumeOutbox(ConsumerRecord<String, byte[]> record) {
        log.info("[payment-outbox] key: {}", record.key());
        PaymentOutbox outbox = outboxService.getOutboxById(record.key());
        outboxService.save(outbox.success());
    }
}

 

7. 마무리

Transactional Outbox Pattern은 Kafka 기반 이벤트 시스템에서 데이터 정합성과 메시지 신뢰성을 보장하는 효과적인 아키텍처입니다. 특히 분산 시스템에서 메시지 유실과 중복 문제를 방지하는 데 유용하며, 구현이 비교적 단순하면서도 높은 안정성과 유연성을 제공합니다.

 

728x90
728x90
Comments