2. 파이썬으로 Confluent Kafka의 Consumer 구현하기

2023. 10. 24. 22:15·Kafka
목차
  1. 대상 독자
  2. Steps
  3. 1. 컨슈머 정의하기
  4. 2. 메시지 소비하기
  5. 3. 마무리

앞선 포스팅에서 토픽에 메시지를 생성했다. 이번엔 소비해 보자.

대상 독자

  • 파이썬으로 카프카 토픽에 쌓인 메시지를 소비하고자 하는 개발자

Steps

  1. 컨슈머 정의하기
  2. 메시지 소비하기
  3. 마무리

1. 컨슈머 정의하기

컨슈머(Consumer)는 토픽에 쌓인 메시지를 소비할 수 있는 객체다.

프로듀서와 동일하게 bootstrap.servers 가 필요한데 컨슈머는 추가로 group.id도
필요하다.

group.id는 컨슈머 그룹의 고유 ID 값이다.

위 사진으로 알 수 있는 것이 하나의 토픽을 여러 컨슈머가 읽을 수 있다는 것이다.
이를 구분하기 위한 구분자가 group.id 인 것이다.

from confluent_kafka import Consumer
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})

2. 메시지 소비하기

당연히 컨슈머도 어떤 토픽에서 메시지를 가져올지를 명시해야 한다.

topics = ["users"]
consumer subscribe(topics)

위를 통해 짐작할 수 있는 점은 여러 토픽의 메시지를 동시에 가져올 수 있다는
점이다. 나중에 다뤄보자.

이제 대상 토픽도 정했으니 메시지를 가져오자.

timeout = -1 # -1 은 무한 대기, 메시지가 생성될 때까지 대기
message = consumer.poll(timeout=-1) # message 객체는 key, value, offset 등 다양한 정보를 가지고 있슴

poll은 토픽의 메시지를 가져오는 행위를 말한다.

이제 아래 코드를 실행해 보자.

from confluent_kafka import Consumer
import json
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})
topics = ["users"]
consumer.subscribe(topics)
timeout = -1 # -1 은 무한 대기, 메시지가 생성될 때까지 대기
print("consuming...")
message = consumer.poll(timeout=-1) # message 객체는 key, value, offset 등 다양한 정보를 가지고 있슴
print("key: ", message.key())
print("value: ", json.loads(message.value()))
print("offset: ", message.offset())
print("partition: ", message.partition())
consumer.close()

메시지를 읽어오지 못하고 무한대기하게 된다.

우리는 분명 앞서 토픽에 메시지를 넣었는데 왜 읽어오지 못하는 것일까?

바로 auto.offset.reset 이라는 config의 기본값이 latest 이기 때문이다.

해당 config는 새로운 컨슈머 그룹의 offset을 결정해 준다.

앞서 정의한 test_group_id라는 컨슈머 그룹은 처음 생성되었으므로 offset이
latest로 초기화되었다.

offset은 컨슈머 그룹이 토픽의 메시지를 어디까지를 소비했는지를 추적하는 개념인데
latest라고 초기화가 되었기 때문에 앞서 생성되었던 메시지는 읽을 수 없는 것이다.

UI에서 새로운 유저 정보를 등록해 보자.

{
"userid": 3,
"gender": "MALE",
"username": "길동"
}

 

그리고 다시 터미널을 확인해 보자.

메시지를 정상적으로 읽어왔다.

3. 마무리

위 본문에서 메시지의 value를 json.loads로 역직렬화를 수행한 것을 볼 수 있다.
하지만 토픽의 메시지가 항상 이 방식으로 역직렬화가 될 수 있을까?

토픽의 메시지는 byte 형태라면 어떠한 것이든 입력할 수 있다. 문자열도 가능하고
이미지와 오디오도 직렬화를 한 뒤 입력할 수 있다.

우리의 users라는 토픽엔 메시지 형태에 대한 아무런 제한이 없다. 그렇기 때문에
위험하다.

하지만 프로듀서와 약속을 한다면 괜찮다. 그 약속을 좀 더 정교하고 확실하게 지킬
수 있도록 도와주는 툴이 스키마 레지스트리이다.

다음엔 스키마 레지스트리를 통해 메시지를 안전하게 생성하고 소비해 보자.

  1. 대상 독자
  2. Steps
  3. 1. 컨슈머 정의하기
  4. 2. 메시지 소비하기
  5. 3. 마무리
'Kafka' 카테고리의 다른 글
  • 4. Schema Registry와 함께 안전하게 카프카 메시지 Consume 하기
  • 3. Schema Registry와 함께 안전하게 카프카 메시지 Produce 하기
  • 1. 파이썬으로 Confluent Kafka의 Producer 구현하기
  • Confluent Kafka 설치하기 with Docker Compose
ssuwani
ssuwani
Oops!!ssuwani 님의 블로그입니다.
  • ssuwani
    Oops!!
    ssuwani
  • 전체
    오늘
    어제
    • 분류 전체보기 (69)
      • MLOps (19)
      • 데이터 엔지니어링 (4)
      • Kubernetes (5)
      • Kafka (10)
      • 📚책 (3)
      • 라즈베리파이 (1)
      • ETC (8)
      • Python (6)
      • 언어모델 (5)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    태그1
    Spark
    datadrift
    MLOps
    태그2
    topic
    Kafka
    LLM
    Airflow
    Confluent Cloud
    redis
    auto tagging
    Kubernetes
    FastAPI
    asyncronous
    Schema Registry
    consumer
    Prometheus
    mlflow
    BentoML
    LangChain
    producer
    Docker
    fluentbit
    Kubeflow
    Github Actions
    evidently ai
    RDD
    Python
    gcp
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.1
ssuwani
2. 파이썬으로 Confluent Kafka의 Consumer 구현하기

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.