지금까지 카프카 토픽에 메시지를 Produce 하고 Consume 했다.
여기서 끝이 아니고 일반적인 서비스에서 생성된 메시지를 가져온 뒤 어떠한 처리 해야 한다.
그러면 어떻게 처리하는 게 좋을까?
대상 독자
- 카프카 메시지를 스트림 처리하고자 하는 자
Steps
- 동기로 무한루프
- Connector 사용하기
- 비동기로 무한루프 (FastAPI)
1. 동기로 무한루프
내가 생각하는 가장 쉬운 방식이다.
앞선 포스팅에서 아래와 같은 Consume Loop를 살펴봤었다.
while True:
message = consumer.poll(1.0) # 메시지를 받을 때까지 최대 1초간 대기
# message를 가지고 지지고 볶고
당연히 위 반복문 안에서 message를 가지고 어떠한 처리건 할 수 있다.
하지만 위 로직은 메시지를 받아오기까지 동기적으로 대기하게 된다. 따라서 메시지를 처리하는 것 이외에 다른 요청을 받거나 처리할 수 없다. Stateless 한 애플리케이션이라면 이렇게 간단하게 구현해도 된다고 생각한다.
그리고 message를 가지고 다른 곳에 HTTP 요청을 보낼 수 있다.
2. Connector 사용하기
카프카를 사용하는 개발자가 해야 하는 일은 크게 두 가지다.
- 어딘가로부터 토픽의 메시지를 생성하기
- 토픽의 메시지를 가져와서 어떤 처리하기
이 두 가지 일을 손쉽게 할 수 있도록 만들어진 게 Connector이다.
Connector도 두 가지 종류가 있다.
- Source Connector
- Sink Connector
다른 시스템으로부터 토픽의 메시지를 생성하는 게 Source Connector
토픽의 메시지로부터 다른 시스템에 메시지를 전달하는 게 Sync Connector이다.
위에 말한 개발자가 해야할 일과 정확하게 1:1 매치된다.
위의 동기로 무한루프에서 “message를 가지고 다른 곳에 HTTP 요청을 보낼 수 있다”라고 했다.
토픽의 메시지로부터 다른 시스템에 메시지를 전달하는 것이다. HTTP Sink Connector를 사용하면 된다.
나중에 기회가 된다면 로컬의 Confluent 환경에 Connector Plugin을 설치하고 생성하는 실습을 하자.
3. 비동기로 무한루프 (FastAPI)
앞선 동기로 무한루프에서 메시지를 받아오기까지 동기로 대기한다고 했다. 이를 비동기로 처리할 수 있게 한다면 하나의 애플리케이션에서 다양한 일들을 할 수 있다. FastAPI로 웹 애플리케이션을 동작하는 것 자체가 다양한 일이라고 할 수 있을 것 같다.
메시지를 읽은 뒤 다른 애플리케이션에 HTTP 요청을 보내는 것을 하나의 애플리케이션에서 처리할 수 있는 것이다.
예제 코드는 다음과 같다.
from fastapi import FastAPI
from confluent_kafka import Consumer
from pydantic import BaseModel
import json
import asyncio
class User(BaseModel):
userid: int
gender: str
username: str
# Kafka Consumer 설정
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})
consumer.subscribe(['users'])
app = FastAPI()
# 비동기 Kafka 메시지 소비
async def consume_messages():
current_loop = asyncio.get_running_loop()
print("start consuming...")
while True:
message = await current_loop.run_in_executor(None, consumer.poll, 1.0)
if message is None:
continue
if message.error():
print(f"Kafka Consumer error: {message.error()}")
continue
user_data = json.loads(message.value())
user = User(**user_data)
await save_user(user)
# 앱 시작 시 Kafka 메시지 Consume를 비동기로 시작
@app.on_event('startup')
async def app_startup():
asyncio.create_task(consume_messages())
# 앱 종료 시 Kafka Consumer 닫기
@app.on_event('shutdown')
async def app_shutdown():
consumer.close()
# 사용자 정보 저장 로직을 추가하여 데이터베이스에 사용자 정보를 저장
@app.post("/users")
async def save_user(user: User):
# 여기에 데이터베이스 저장 로직을 추가
print("Saving user to the database:")
print(user)
fastapi로 정의한 app은 startup 될 때 메시지를 consume 하는 로직이 비동기로 수행되기 시작하고 끝날 땐 consumer 객체를 닫아준다.
startup 시 while True에 의해 무한루프 처리 된다면 startup에서 끝나지 않겠지만 run_in_executor 를 사용하여 consume 작업을 블로킹하지 않고 백그라운드에서 실행하므로 이벤트 루프가 차단되지 않는다.