앞선 포스팅에서 토픽에 메시지를 생성했다. 이번엔 소비해 보자.
대상 독자
- 파이썬으로 카프카 토픽에 쌓인 메시지를 소비하고자 하는 개발자
Steps
- 컨슈머 정의하기
- 메시지 소비하기
- 마무리
1. 컨슈머 정의하기
컨슈머(Consumer)는 토픽에 쌓인 메시지를 소비할 수 있는 객체다.
프로듀서와 동일하게 bootstrap.servers
가 필요한데 컨슈머는 추가로 group.id
도
필요하다.
group.id
는 컨슈머 그룹의 고유 ID 값이다.
위 사진으로 알 수 있는 것이 하나의 토픽을 여러 컨슈머가 읽을 수 있다는 것이다.
이를 구분하기 위한 구분자가 group.id
인 것이다.
from confluent_kafka import Consumer
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})
2. 메시지 소비하기
당연히 컨슈머도 어떤 토픽에서 메시지를 가져올지를 명시해야 한다.
topics = ["users"]
consumer subscribe(topics)
위를 통해 짐작할 수 있는 점은 여러 토픽의 메시지를 동시에 가져올 수 있다는
점이다. 나중에 다뤄보자.
이제 대상 토픽도 정했으니 메시지를 가져오자.
timeout = -1 # -1 은 무한 대기, 메시지가 생성될 때까지 대기
message = consumer.poll(timeout=-1) # message 객체는 key, value, offset 등 다양한 정보를 가지고 있슴
poll은 토픽의 메시지를 가져오는 행위를 말한다.
이제 아래 코드를 실행해 보자.
from confluent_kafka import Consumer
import json
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})
topics = ["users"]
consumer.subscribe(topics)
timeout = -1 # -1 은 무한 대기, 메시지가 생성될 때까지 대기
print("consuming...")
message = consumer.poll(timeout=-1) # message 객체는 key, value, offset 등 다양한 정보를 가지고 있슴
print("key: ", message.key())
print("value: ", json.loads(message.value()))
print("offset: ", message.offset())
print("partition: ", message.partition())
consumer.close()
메시지를 읽어오지 못하고 무한대기하게 된다.
우리는 분명 앞서 토픽에 메시지를 넣었는데 왜 읽어오지 못하는 것일까?
바로 auto.offset.reset
이라는 config의 기본값이 latest 이기 때문이다.
해당 config는 새로운 컨슈머 그룹의 offset을 결정해 준다.
앞서 정의한 test_group_id라는 컨슈머 그룹은 처음 생성되었으므로 offset이
latest로 초기화되었다.
offset은 컨슈머 그룹이 토픽의 메시지를 어디까지를 소비했는지를 추적하는 개념인데
latest라고 초기화가 되었기 때문에 앞서 생성되었던 메시지는 읽을 수 없는 것이다.
UI에서 새로운 유저 정보를 등록해 보자.
{
"userid": 3,
"gender": "MALE",
"username": "길동"
}
그리고 다시 터미널을 확인해 보자.
메시지를 정상적으로 읽어왔다.
3. 마무리
위 본문에서 메시지의 value를 json.loads로 역직렬화를 수행한 것을 볼 수 있다.
하지만 토픽의 메시지가 항상 이 방식으로 역직렬화가 될 수 있을까?
토픽의 메시지는 byte 형태라면 어떠한 것이든 입력할 수 있다. 문자열도 가능하고
이미지와 오디오도 직렬화를 한 뒤 입력할 수 있다.
우리의 users라는 토픽엔 메시지 형태에 대한 아무런 제한이 없다. 그렇기 때문에
위험하다.
하지만 프로듀서와 약속을 한다면 괜찮다. 그 약속을 좀 더 정교하고 확실하게 지킬
수 있도록 도와주는 툴이 스키마 레지스트리이다.
다음엔 스키마 레지스트리를 통해 메시지를 안전하게 생성하고 소비해 보자.