카프카 메시지를 Consume한 뒤 처리하기 위한 방법들
·
Kafka
지금까지 카프카 토픽에 메시지를 Produce 하고 Consume 했다. 여기서 끝이 아니고 일반적인 서비스에서 생성된 메시지를 가져온 뒤 어떠한 처리 해야 한다. 그러면 어떻게 처리하는 게 좋을까? 대상 독자 카프카 메시지를 스트림 처리하고자 하는 자 Steps 동기로 무한루프 Connector 사용하기 비동기로 무한루프 (FastAPI) 1. 동기로 무한루프 내가 생각하는 가장 쉬운 방식이다. 앞선 포스팅에서 아래와 같은 Consume Loop를 살펴봤었다. while True: message = consumer.poll(1.0) # 메시지를 받을 때까지 최대 1초간 대기 # message를 가지고 지지고 볶고 당연히 위 반복문 안에서 message를 가지고 어떠한 처리건 할 수 있다. 하지만 위 로..
Confluent Kafka 설치하기 with Kubernetes
·
Kafka
Confluent Kafka를 docker-compose를 이용해 설치하는 방법을 소개했었지만 다른 리소스들과 함께 관리 및 사용하고자 Kubernetes에 올리는 방법도 소개하고자 한다. arm 칩을 사용하는 mac 기준으로 작성되었다. 대상 독자 쿠버네티스에 Confluent를 구축하고자 하는 개발자 Steps 로컬 환경에 Kubernetes 구축 Confluent Kafka 설치 1. 로컬 환경에 Kubernetes 구축 여러 가지 방법이 있을 수 있다. docker-desktop을 이용해 손쉽게 Enable Kubernetes 버튼을 통해서도 구축할 수 있고 k3s와 같은 경량화된 쿠버네티스를 구축해도 된다. 다만 여기선 minikube를 통해 쿠버네티스(이하 k8s)를 구축하려 한다. minik..
4. Schema Registry와 함께 안전하게 카프카 메시지 Consume 하기
·
Kafka
앞선 포스팅에서 토픽에 Schema를 등록했다. 이제 토픽에 생성되는 메시지는 해당 스키마를 따른다. 이제 반대로 메시지를 Consume 한 뒤 역직렬화를 수행해 보자. 대상 독자 안전한 메시징 시스템을 구축하고자 하는 개발자. Steps 메시지 소비하기 마무리 1. 메시지 소비하기 2. 파이썬으로 Confluent Kafka의 Consumer 구현하기 에서 Consumer 객체를 정의한 뒤 메시지를 읽고 난 뒤 json.loads를 통해 메시지를 역직렬화를 했었다. 이번 포스팅에서 할 내용은 json.loads를 통해 역직렬화가 아닌 avro deserializer를 통해 역직렬화를 수행하는 것이다. from confluent_kafka import Consumer consumer = Consumer(..
3. Schema Registry와 함께 안전하게 카프카 메시지 Produce 하기
·
Kafka
지금까지 토픽에 메시지를 생성하고 소비했다. users라는 토픽에 아래와 같은 메시지를 생성했고 해당 메시지들을 소비했었다. { "userid": , "gender": , "username": } 위와 같은 형태로 입력해 주기로 프로듀서와 컨슈머가 약속했지만 갑자기 알 수 없는 이유로 토픽에 메시지로 “Hello”가 입력되면 시스템 장애가 발생할 것이다. Schema Registry를 통해 검증된 메시지만 생성되도록 하자. 대상 독자 안전한 메시징 시스템을 구축하고자 하는 개발자. Steps Schema 정의하기 토픽에 스키마 등록하기 메시지 생성하기 마무리 1. Schema 정의하기 Schema는 3가지 타입을 통해 정의할 수 있다. JSON Avro Protobuf 이 중 어떠한 것을 선택하든 상관은..
2. 파이썬으로 Confluent Kafka의 Consumer 구현하기
·
Kafka
앞선 포스팅에서 토픽에 메시지를 생성했다. 이번엔 소비해 보자. 대상 독자 파이썬으로 카프카 토픽에 쌓인 메시지를 소비하고자 하는 개발자 Steps 컨슈머 정의하기 메시지 소비하기 마무리 1. 컨슈머 정의하기 컨슈머(Consumer)는 토픽에 쌓인 메시지를 소비할 수 있는 객체다. 프로듀서와 동일하게 bootstrap.servers 가 필요한데 컨슈머는 추가로 group.id도 필요하다. group.id는 컨슈머 그룹의 고유 ID 값이다. 위 사진으로 알 수 있는 것이 하나의 토픽을 여러 컨슈머가 읽을 수 있다는 것이다. 이를 구분하기 위한 구분자가 group.id 인 것이다. from confluent_kafka import Consumer consumer = Consumer({'bootstrap.se..
1. 파이썬으로 Confluent Kafka의 Producer 구현하기
·
Kafka
앞선 포스팅에서 Confluent Kafka를 설치하고 토픽을 생성했다. 또한 Control Center를 통해 토픽에 메시지를 생성했다. 메시지 생성을 매번 UI를 통해서 수행할 수 없기에 confluent-kafka 라이브러리를 이용해 파이썬으로 메시지를 생성해 보자. 대상 독자 파이썬으로 카프카 토픽에 메시지를 생성하고자 하는 개발자 1. confluent-kafka 설치하기 confluent-kafka 라이브러리는 Kafka에 메시지를 생성하고 소비하는 등의 행위를 파이썬으로 할 수 있게 도와준다. pip install confluent-kafka 나는 2.2.0 버전을 설치했다. 2. 프로듀서 정의하기 프로듀서(Producer)는 토픽에 메시지를 생성할 수 있는 객체다. 그러면 토픽은 어디에 있..