1. 동기 통신의 함정
마이크로서비스를 처음 도입할 때 대부분 이렇게 시작해.
주문 서비스 → HTTP → 재고 서비스
주문 서비스 → HTTP → 결제 서비스
주문 서비스 → HTTP → 알림 서비스
직관적이고 단순해. 주문이 들어오면 재고 체크하고, 결제하고, 알림 보내고. 순서도 명확하고.
근데 이게 규모가 커지면 문제가 터져.
시나리오: 주문 서비스가 재고 서비스를 호출하는데, 재고 서비스가 200ms 걸려. 그다음 결제 서비스가 500ms, 알림 서비스가 100ms. 총 응답 시간은 800ms야. 거기다 알림 서비스가 갑자기 죽으면?
주문 서비스: 알림 서비스에 HTTP 요청
알림 서비스: 503 Service Unavailable
주문 서비스: 😱 주문 실패 처리?
알림은 비즈니스 critical 기능이 아닌데, 알림 서비스가 죽었다고 주문 전체가 실패하는 건 말이 안 되잖아.
이것이 동기 통신의 근본적인 문제야:
- 시간적 결합(Temporal Coupling): 두 서비스가 동시에 살아있어야 해
- 연쇄 장애(Cascading Failure): 하나가 죽으면 연쇄적으로 죽어
- 응답 시간 누적: 각 서비스 응답 시간이 더해져
2. 이벤트 기반 아키텍처란
EDA(Event-Driven Architecture)는 서비스 간 통신을 **이벤트 발행(Publish)과 구독(Subscribe)**으로 바꾸는 패턴이야.
핵심 개념:
- Producer: 이벤트를 발행하는 서비스
- Event: "어떤 일이 일어났다"는 사실을 담은 메시지
- Message Broker: 이벤트를 받아서 전달하는 중간 시스템 (Kafka, RabbitMQ 등)
- Consumer: 이벤트를 구독하고 처리하는 서비스
주문 서비스 → [OrderPlaced 이벤트 발행] → Message Broker
↓
재고 서비스 (구독)
결제 서비스 (구독)
알림 서비스 (구독)
이제 주문 서비스는 재고, 결제, 알림 서비스의 존재를 몰라도 돼. 그냥 "주문이 접수됐어"라는 이벤트를 던지면 끝이야.
알림 서비스가 죽어도? 주문 서비스는 모르고, 메시지 브로커가 나중에 다시 전달해줘.
3. 이벤트 타입: 도메인 이벤트 vs 통합 이벤트
도메인 이벤트 (Domain Event)
같은 서비스(바운디드 컨텍스트) 내부에서 발생하는 비즈니스 사실.
// 도메인 이벤트: 주문 도메인 내부에서 발생
interface OrderPlacedDomainEvent {
type: 'ORDER_PLACED';
orderId: string;
userId: string;
items: OrderItem[];
totalAmount: number;
placedAt: Date;
}
// 주문 애그리게이트 내부에서 발행
class Order {
private domainEvents: DomainEvent[] = [];
place(): void {
if (this.status !== 'PENDING') {
throw new Error('이미 처리된 주문입니다');
}
this.status = 'PLACED';
// 도메인 이벤트 기록 (실제 발행은 나중에)
this.domainEvents.push({
type: 'ORDER_PLACED',
orderId: this.id,
userId: this.userId,
items: this.items,
totalAmount: this.totalAmount,
placedAt: new Date(),
});
}
pullDomainEvents(): DomainEvent[] {
const events = [...this.domainEvents];
this.domainEvents = [];
return events;
}
}
통합 이벤트 (Integration Event)
서비스 경계를 넘어서 다른 바운디드 컨텍스트에게 전파되는 이벤트.
// 통합 이벤트: 다른 서비스가 구독하는 외부 이벤트
interface OrderPlacedIntegrationEvent {
eventId: string; // 멱등성을 위한 고유 ID
eventType: 'order.placed';
occurredAt: string; // ISO 8601
version: '1.0'; // 스키마 버전
payload: {
orderId: string;
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
totalAmount: number;
};
}
도메인 이벤트와 통합 이벤트를 분리하는 이유:
- 도메인 이벤트는 내부 세부 정보를 담아도 되지만
- 통합 이벤트는 다른 팀과의 계약이라 신중하게 설계해야 해
- 내부 도메인 모델 변경이 외부 이벤트 스키마에 자동으로 노출되면 안 돼
4. 메시지 브로커 비교: Kafka vs RabbitMQ vs SQS
세 가지 대표 브로커를 비교해보자.
Apache Kafka
특징:
- 분산 로그 기반 스트리밍 플랫폼
- 메시지를 소비해도 삭제하지 않음 (설정한 보관 기간까지 유지)
- Consumer가 Offset을 관리하므로 언제든 과거 메시지 재처리 가능
- 초당 수백만 메시지 처리 가능한 높은 처리량
적합한 경우:
- 이벤트 소싱, 감사 로그
- 실시간 스트리밍 분석
- 대용량 데이터 파이프라인
- 여러 Consumer 그룹이 같은 이벤트를 독립적으로 처리해야 할 때
RabbitMQ
특징:
- AMQP 프로토콜 기반 메시지 브로커
- Exchange, Queue, Binding의 유연한 라우팅
- 메시지 소비 후 큐에서 삭제 (일반적으로)
- 복잡한 라우팅 패턴 지원 (Direct, Topic, Fanout, Headers)
적합한 경우:
- 작업 큐 (Task Queue), 워커 패턴
- 복잡한 라우팅이 필요한 경우
- 메시지 우선순위가 필요한 경우
- RPC 패턴
AWS SQS
특징:
- 완전 관리형 서비스 (인프라 관리 불필요)
- Standard Queue vs FIFO Queue
- 최소 1회 전달 보장 (Standard는 중복 가능)
- SNS와 결합해서 Fan-out 패턴 구현
적합한 경우:
- AWS 기반 인프라
- 간단한 비동기 작업 처리
- 서버리스(Lambda) 연동
| 항목 | Kafka | RabbitMQ | SQS |
|---|---|---|---|
| 처리량 | 최고 | 높음 | 높음 |
| 메시지 보관 | 장기 (기본 7일) | 소비 후 삭제 | 최대 14일 |
| 순서 보장 | 파티션 내에서 | 큐 내에서 | FIFO 큐만 |
| 재처리 | 쉬움 (Offset 조정) | 어려움 | Dead Letter Queue |
| 운영 복잡도 | 높음 | 중간 | 낮음 (관리형) |
| 적합한 규모 | 대규모 | 중간 | 소~중규모 |
5. Node.js 실전 예제: Kafka로 주문 처리
// shared/events.ts — 공유 이벤트 타입 정의
interface BaseEvent {
eventId: string;
occurredAt: string;
version: string;
}
interface OrderPlacedEvent extends BaseEvent {
type: 'order.placed';
payload: {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; unitPrice: number }>;
totalAmount: number;
};
}
type AppEvent = OrderPlacedEvent; // union type으로 확장
// order-service/OrderEventPublisher.ts — 이벤트 발행
import { Kafka, Producer } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';
class OrderEventPublisher {
private producer: Producer;
constructor(kafka: Kafka) {
this.producer = kafka.producer();
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publishOrderPlaced(order: Order): Promise<void> {
const event: OrderPlacedEvent = {
eventId: uuidv4(),
occurredAt: new Date().toISOString(),
version: '1.0',
type: 'order.placed',
payload: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
},
};
await this.producer.send({
topic: 'order-events',
messages: [
{
key: order.id, // 같은 주문 ID는 같은 파티션으로
value: JSON.stringify(event),
headers: {
'event-type': 'order.placed',
'content-type': 'application/json',
},
},
],
});
console.log(`이벤트 발행: ${event.eventId}`);
}
}
// inventory-service/InventoryEventConsumer.ts — 재고 서비스 컨슈머
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
class InventoryEventConsumer {
private consumer: Consumer;
constructor(
kafka: Kafka,
private inventoryService: InventoryService,
) {
this.consumer = kafka.consumer({ groupId: 'inventory-service' });
}
async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({
topic: 'order-events',
fromBeginning: false,
});
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { message } = payload;
const eventType = message.headers?.['event-type']?.toString();
if (eventType !== 'order.placed') return;
const event: OrderPlacedEvent = JSON.parse(message.value!.toString());
try {
await this.inventoryService.decreaseStock(event.payload.items);
console.log(`재고 차감 완료: 주문 ${event.payload.orderId}`);
} catch (error) {
// 재고 부족 등의 오류 처리
// Dead Letter Topic으로 보내거나, 보상 이벤트 발행
console.error(`재고 차감 실패: ${error}`);
throw error; // Kafka가 재시도하도록
}
}
}
6. 멱등성(Idempotency): 중복 처리 방어
메시지 브로커는 최소 1회(at-least-once) 전달을 보장하는 경우가 많아. 즉 같은 메시지가 두 번 올 수 있어. Consumer는 반드시 멱등성을 구현해야 해.
// 멱등성 처리 예시
class IdempotentInventoryConsumer {
constructor(
private inventoryService: InventoryService,
private processedEvents: ProcessedEventRepository, // Redis 또는 DB
) {}
async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
// 1. 이미 처리한 이벤트인지 확인
const alreadyProcessed = await this.processedEvents.exists(event.eventId);
if (alreadyProcessed) {
console.log(`이미 처리된 이벤트 스킵: ${event.eventId}`);
return;
}
// 2. 처리 (트랜잭션 내에서 멱등성 키와 함께)
await this.processedEvents.markAsProcessing(event.eventId);
try {
await this.inventoryService.decreaseStock(event.payload.items);
await this.processedEvents.markAsCompleted(event.eventId);
} catch (error) {
await this.processedEvents.markAsFailed(event.eventId);
throw error;
}
}
}
7. 이벤트 소싱 (Event Sourcing) 기초
전통적인 방식은 현재 상태를 저장해. 이벤트 소싱은 이벤트의 로그 자체가 진실의 원천이야.
전통적 방식: DB에 현재 상태만 저장
orders 테이블: { id: '123', status: 'SHIPPED', totalAmount: 50000 }
이벤트 소싱: DB에 이벤트 로그 저장
order_events 테이블:
{ eventId: '1', orderId: '123', type: 'ORDER_PLACED', data: {...} }
{ eventId: '2', orderId: '123', type: 'PAYMENT_CONFIRMED', data: {...} }
{ eventId: '3', orderId: '123', type: 'ORDER_SHIPPED', data: {...} }
현재 상태가 필요하면 이벤트를 처음부터 replay해서 재구성해.
// 이벤트 소싱 기본 구현
interface OrderEvent {
eventId: string;
orderId: string;
type: string;
occurredAt: Date;
data: Record<string, unknown>;
}
class OrderAggregate {
id: string = '';
status: string = 'PENDING';
items: OrderItem[] = [];
totalAmount: number = 0;
// 이벤트 로그로부터 상태 재구성
static rehydrate(events: OrderEvent[]): OrderAggregate {
const order = new OrderAggregate();
for (const event of events) {
order.apply(event);
}
return order;
}
private apply(event: OrderEvent): void {
switch (event.type) {
case 'ORDER_PLACED':
this.id = event.orderId;
this.status = 'PLACED';
this.items = event.data.items as OrderItem[];
this.totalAmount = event.data.totalAmount as number;
break;
case 'PAYMENT_CONFIRMED':
this.status = 'PAID';
break;
case 'ORDER_SHIPPED':
this.status = 'SHIPPED';
break;
}
}
}
// 사용
const events = await eventStore.getEvents(orderId);
const order = OrderAggregate.rehydrate(events);
console.log(order.status); // 최신 상태
이벤트 소싱의 장점:
- 완전한 감사 로그(Audit Log)
- 과거 특정 시점의 상태 조회 가능 (Time Travel)
- 이벤트 replay로 새로운 프로젝션(View) 생성 가능
- 버그 재현이 쉬움
주의할 점:
- 스키마 진화(schema evolution)가 어려워 — 과거 이벤트 포맷도 영원히 지원해야 해
- 현재 상태 조회가 느릴 수 있어 — CQRS와 함께 스냅샷으로 해결
8. 최종 일관성(Eventual Consistency)
EDA를 쓰면 즉각적인 일관성(Immediate Consistency)을 포기하고 **최종 일관성(Eventual Consistency)**을 받아들여야 해.
사용자: 주문 완료!
주문 서비스: OrderPlaced 이벤트 발행 → 즉시 응답
재고 서비스: (100ms 후) 이벤트 수신 → 재고 차감
알림 서비스: (200ms 후) 이벤트 수신 → 이메일 발송
이 100~200ms 사이에 재고가 아직 차감되지 않은 상태가 존재해. 이게 최종 일관성이야.
언제 문제가 되는가:
- 재고가 1개인데 두 사람이 동시에 주문하면? → 재고 서비스에서 충돌 감지 후 보상 이벤트 발행
// 보상 트랜잭션(Compensating Transaction) 패턴
class InventoryConsumer {
async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
try {
await this.inventoryService.reserveStock(event.payload.items);
} catch (error) {
if (error instanceof InsufficientStockError) {
// 보상 이벤트 발행: 재고 부족으로 주문 취소 요청
await this.eventPublisher.publish({
type: 'inventory.reservation.failed',
payload: {
orderId: event.payload.orderId,
reason: 'INSUFFICIENT_STOCK',
},
});
}
}
}
}
// 주문 서비스가 보상 이벤트 수신 후 주문 취소
class OrderCompensationConsumer {
async handleInventoryFailed(event: InventoryReservationFailedEvent): Promise<void> {
await this.orderService.cancelOrder(event.payload.orderId, event.payload.reason);
}
}
이 패턴을 Saga 패턴이라고 해. 분산 트랜잭션을 이벤트와 보상으로 처리하는 방법이야.
9. RabbitMQ 실전 예제
Kafka가 무거울 때 RabbitMQ가 좋은 선택이야. 특히 요청-응답 패턴이나 우선순위 큐가 필요할 때.
import amqp, { Channel, Connection } from 'amqplib';
// RabbitMQ 이벤트 발행
class RabbitMQPublisher {
private channel!: Channel;
async connect(url: string): Promise<void> {
const connection: Connection = await amqp.connect(url);
this.channel = await connection.createChannel();
// Exchange 설정 (topic 타입: 라우팅 키로 필터링)
await this.channel.assertExchange('order-events', 'topic', {
durable: true,
});
}
async publishOrderPlaced(order: Order): Promise<void> {
const event: OrderPlacedEvent = {
eventId: uuidv4(),
occurredAt: new Date().toISOString(),
version: '1.0',
type: 'order.placed',
payload: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
},
};
this.channel.publish(
'order-events', // exchange
'order.placed', // routing key
Buffer.from(JSON.stringify(event)),
{
persistent: true, // 메시지 디스크에 저장 (브로커 재시작 시에도 유지)
contentType: 'application/json',
messageId: event.eventId,
}
);
}
}
// RabbitMQ 이벤트 구독
class RabbitMQConsumer {
async start(url: string): Promise<void> {
const connection = await amqp.connect(url);
const channel = await connection.createChannel();
await channel.assertExchange('order-events', 'topic', { durable: true });
// 재고 서비스 전용 큐 생성
const queue = await channel.assertQueue('inventory-order-queue', {
durable: true,
});
// 'order.placed'와 'order.cancelled' 이벤트 구독
await channel.bindQueue(queue.queue, 'order-events', 'order.placed');
await channel.bindQueue(queue.queue, 'order-events', 'order.cancelled');
// 한 번에 하나씩 처리 (처리 완료 전까지 다음 메시지 받지 않음)
channel.prefetch(1);
await channel.consume(queue.queue, async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
await this.processEvent(event);
channel.ack(msg); // 성공: 큐에서 제거
} catch (error) {
console.error('처리 실패:', error);
channel.nack(msg, false, true); // 실패: 큐에 다시 넣기
}
});
}
private async processEvent(event: AppEvent): Promise<void> {
switch (event.type) {
case 'order.placed':
await this.inventoryService.reserveStock(event.payload.items);
break;
}
}
}
10. 언제 EDA를, 언제 동기 호출을?
모든 서비스 통신을 이벤트 기반으로 바꿀 필요는 없어.
이벤트 기반이 적합한 경우:
- 결과를 즉시 기다릴 필요가 없는 작업 (알림, 로그, 통계 업데이트)
- 여러 서비스가 같은 이벤트에 반응해야 하는 경우
- 생산자와 소비자를 독립적으로 배포해야 하는 경우
- 처리량이 불규칙해서 버퍼링이 필요한 경우
동기 호출이 적합한 경우:
- 응답이 필요한 경우 (쿼리: "이 상품 재고 있어?")
- 트랜잭션 경계가 단일 서비스 내에 있는 경우
- 실시간성이 중요한 경우 (결제 승인 결과 즉시 표시)
- 팀 규모가 작아서 EDA의 복잡도를 감당하기 어려운 경우
마무리
EDA를 한 문장으로 요약하면:
"Producer는 '무슨 일이 일어났다'고 선언하고, Consumer는 그걸 듣고 각자 알아서 반응한다."
이게 동기 호출 대비 가장 큰 차이야. Producer는 Consumer를 알 필요가 없고, 기다릴 필요도 없어.
대신 얻어야 하는 게 있어: 복잡성. 메시지 브로커 운영, 멱등성 처리, 최종 일관성 디버깅, Saga 패턴 — 이걸 감당할 준비가 됐을 때 도입해야 해.
시작은 작게. 알림이나 이메일 같은 비핵심 기능부터 이벤트 기반으로 분리해보고, 익숙해지면 더 중요한 흐름으로 확장하는 게 현실적인 접근이야.