4. Schema Registry와 함께 안전하게 카프카 메시지 Consume 하기

2023. 10. 24. 22:26·Kafka

앞선 포스팅에서 토픽에 Schema를 등록했다. 이제 토픽에 생성되는 메시지는 해당 스키마를 따른다.

이제 반대로 메시지를 Consume 한 뒤 역직렬화를 수행해 보자.

대상 독자

  • 안전한 메시징 시스템을 구축하고자 하는 개발자.

Steps

  1. 메시지 소비하기
  2. 마무리

1. 메시지 소비하기

2. 파이썬으로 Confluent Kafka의 Consumer 구현하기 에서 Consumer 객체를 정의한 뒤 메시지를 읽고 난 뒤 json.loads를 통해 메시지를 역직렬화를 했었다.

이번 포스팅에서 할 내용은 json.loads를 통해 역직렬화가 아닌 avro deserializer를 통해 역직렬화를 수행하는 것이다.

from confluent_kafka import Consumer

consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group_id'})

topics = ["users"]
consumer.subscribe(topics)

timeout = -1 # -1 은 무한 대기, 메시지가 생성될 때까지 대기
print("consuming...")
message = consumer.poll(timeout=-1) # message 객체는 key, value, offset 등 다양한 정보를 가지고 있슴

이제 메시지의 value를 출력해 보자.

message.value()
>>> b'\x00\x00\x00\x00\x01\x08\x0cFEMALE\x0c\xea\xb4\x91\xec\x88\x98'

흠,, 직렬화가 되어서 어떤 문자열인지 확인할 수 없다. avro에 의해 serialize 된 메시지이므로 deserialize를 수행하자.

앞선 포스팅에서 했던 것처럼 SchemaRegistryClient를 정의한 뒤 AvroDeserializer를 정의하면 된다.

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})
schema_str = schema_registry_client.get_schema(schema_id=1).schema_str
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

user = avro_deserializer(message.value(), SerializationContext(message.topic(), MessageField.VALUE))
user
>>> {'userid': 4, 'gender': 'FEMALE', 'username': '광수'}

이제 메시지가 잘 출력된 것을 확인할 수 있다.

아래는 전체 코드이다.

토픽의 메시지를 읽는 경우는 일반적으로 스트림 처리를 위해 사용된다. 대개의 경우 토픽의 메시지를 실시간으로 처리해야 한다. 따라서 while 문으로 메시지를 계속해서 읽도록 작성하였다.
(confluent_kafka의 examples를 참고하였다.)

from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

# Schema Registry
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})
schema_str = schema_registry_client.get_schema(schema_id=1).schema_str
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Consumer
topics = ["users"]
consumer = Consumer(
    {"bootstrap.servers": "localhost:9092", "group.id": "test_group_id"}
)
consumer.subscribe(topics)

# Consume Loop
while True:
    try:
        msg = consumer.poll(1.0)  # 1초동안 메시지가 없으면 None을 반환
        if msg is None:
            continue

        user = avro_deserializer(
            msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)
        )
        if user is not None:
            print(f"offset: {msg.offset()}, user: {user}")
    except KeyboardInterrupt:
        break

consumer.close()

2. 마무리

이렇게 하여 Schema Registry와 함께 카프카 메시지를 안전하게 Consume 하는 방법을
살펴보았다. 토픽에 스키마를 등록하고 Avro Deserializer를 사용해 메시지를
역직렬화하였다.

지금까지 정말 간단한 내용만 설명하였는데 실제 운영 중엔 여러 가지 시나리오가
발생할 수 있다. 당연히 스키마가 변경될 수 있고 하나의 토픽에 여러 스키마를
사용하고 싶을 수도 있다. 이러한 경우에 대해서도 앞으로의 포스팅에서 다뤄보자.

 

 

'Kafka' 카테고리의 다른 글
  • 카프카 메시지를 Consume한 뒤 처리하기 위한 방법들
  • Confluent Kafka 설치하기 with Kubernetes
  • 3. Schema Registry와 함께 안전하게 카프카 메시지 Produce 하기
  • 2. 파이썬으로 Confluent Kafka의 Consumer 구현하기
ssuwani
ssuwani
  • ssuwani
    완두콩
    ssuwani
  • 전체
    오늘
    어제
    • 분류 전체보기 (69)
      • MLOps (19)
      • 데이터 엔지니어링 (4)
      • Kubernetes (5)
      • Kafka (10)
      • 📚책 (3)
      • 라즈베리파이 (1)
      • ETC (8)
      • Python (6)
      • 언어모델 (5)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    FastAPI
    RDD
    auto tagging
    asyncronous
    Github Actions
    Docker
    BentoML
    LLM
    gcp
    Prometheus
    evidently ai
    topic
    mlflow
    producer
    Airflow
    태그2
    Kubernetes
    Kubeflow
    Kafka
    redis
    Python
    fluentbit
    consumer
    LangChain
    Spark
    태그1
    Confluent Cloud
    datadrift
    MLOps
    Schema Registry
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.1
ssuwani
4. Schema Registry와 함께 안전하게 카프카 메시지 Consume 하기
상단으로

티스토리툴바