Kafka 및 Strimzi 소개
kafka는 대량의 데이터를 안정적이고 실시간으로 처리하는 데 사용되는 분산형 스트리밍 플랫폼입니다.
이벤트 스트림 등을 효과적으로 처리하고 , 데이터를 여러 시스템 간에 안전하게 전송할 수 있도록 지원합니다.
카프카는 주요 요소는 다음과 같다.
- 주키퍼 ZooKeeper : 카프카의 메타데이터 관리 및 브로커의 정상 상태 점검 health check 을 담당
- 카프카 Kafka 또는 카프카 클러스터 Kafka cluster : 여러 대의 브로커를 구성한 클러스터를 의미
- 브로커 broker : 카프카 애플리케이션이 설치된 서버 또는 노드를 말함
- 프로듀서 producer : 카프카로 메시지를 보내는 역할을 하는 클라이언트를 총칭
- 컨슈머 consumer : 카프카에서 메시지를 꺼내가는 역할을 하는 클라이언트를 총칭
- 토픽 topic : 카프카는 메시지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유함
- 파티션 partition : 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것을 말함
- 세그먼트 segment : 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일
- 메시지 message 또는 레코드 record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말함
Kafka의 기본 데이터 스트림 처리 기능과 구성은 다음을 제공합니다.
- 매우 높은 처리량과 짧은 대기 시간으로 데이터를 공유하는 마이크로서비스 및 기타 애플리케이션
- 메시지 순서 보장
- 애플리케이션 상태를 재구성하기 위해 데이터 저장소에서 메시지 되감기/재생
- 키-값 로그를 사용할 때 오래된 레코드를 제거하기 위한 메시지 압축
- 클러스터 구성의 수평적 확장성
- 내결함성을 제어하기 위한 데이터 복제
- 즉각적인 액세스를 위해 대용량 데이터 보존
kafka는 주로, 이벤트 중심 아키텍처, 로그 수집, 실시간 데이터를 사용하는 애플리케이션 등에서 사용됩니다.
kafka의 내용만으로도 너무 방대하기 때문에 자세하게 알고 싶다면 Apache Kafka를 참고합니다.
Strimzi 기능
Strimzi는 쿠버네티스 환경에서 Apache Kafka의 운영 및 프로세스를 자동화한 Operator입니다.
아래와 같이 다양한 카프카 운영 프로세스를 지원합니다.
- Kafka 클러스터 배포 및 실행
- Kafka 구성요소 배포 및 실행
- Kafka에 대한 액세스 구성
- Kafka에 대한 액세스 보안
- 카프카 업그레이드
- 중개인 관리
- 주제 생성 및 관리
- 사용자 생성 및 관리
Strimzi 구성
strimzi는 다음과 같은 역할로 구성됩니다.
- ZooKeeper cluster of replicated ZooKeeper instances
- IMPORTANT : KRaft mode is not ready for production in Apache Kafka or in Strimzi - Link
- Kafka Connect cluster for external data connections
- Kafka MirrorMaker cluster to mirror the Kafka cluster in a secondary cluster
- Kafka Exporter to extract additional Kafka metrics data for monitoring
- Kafka Bridge to make HTTP-based requests to the Kafka cluster
Strimzi & 카프카 클러스터 설치
Strimzi Operator 설치(Helm)
- Strimzi v0.38 버전을 설치합니다.
- Apache Kafka 지원 버전은 v3.5.0, v3.5.1, v3.6.0입니다.
# 네임스페이스 생성
kubectl create namespace kafka
# Repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator
# 차트 설치 : 오퍼레이터 파드 설치
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka
# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka
# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnectors.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnects.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkamirrormaker2s.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkamirrormakers.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkanodepools.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkarebalances.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkas.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkatopics.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkausers.kafka.strimzi.io 2023-11-11T06:01:21Z
strimzipodsets.core.strimzi.io 2023-11-11T06:01:20Z
# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io
카프카 클러스터 생성(Zookeeper)
- Statefulsets이 아닌 StrimziPodSets(sps)라는 커스텀 리소스를 사용합니다.
- Statefulsets에 종속되지 않고 카프카의 특성에 맞게 유연하게 사용함을 위함입니다.
# (옵션) 신규 터미널 : 모니터링
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
# 카프카 클러스터 YAML 파일 확인 : listeners(3개), podAntiAffinity
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml
cat kafka-1.yaml | yh
# 카프카 클러스터 배포 : 카프카(브로커 3개), 주키퍼(3개), entityOperator 디플로이먼트
## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원 >> preferredDuringSchedulingIgnoredDuringExecution 미지원...
kubectl apply -f kafka-1.yaml -n kafka
# 배포된 리소스들 확인
kubectl get-all -n kafka
# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨
kubectl get kafka -n kafka
kubectl get cm,secret -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X
kubectl get strimzipodsets -n kafka
# 노드 정보 확인
kubectl describe node | more
kubectl get node --label-columns=topology.ebs.csi.aws.com/zone
kubectl describe pv | grep 'Node Affinity:' -A2
# 배포된 리소스 확인 : 배포된 파드 생성 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster
# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)
kubectl get svc,endpointslice -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 파드 저장소 확인
kubectl get pvc,pv -n kafka
kubectl df-pv
# 배포된 리소스 확인 : 컨피그맵 확인
kubectl get cm -n kafka
# 컨피그맵 상세 확인
kubectl describe cm -n kafka strimzi-cluster-operator
kubectl describe cm -n kafka my-cluster-zookeeper-config
kubectl describe cm -n kafka my-cluster-entity-topic-operator-config
kubectl describe cm -n kafka my-cluster-entity-user-operator-config
kubectl describe cm -n kafka my-cluster-kafka-0
kubectl describe cm -n kafka my-cluster-kafka-1
kubectl describe cm -n kafka my-cluster-kafka-2
...(생략)...
##########
# Node / Broker ID
##########
broker.id=${STRIMZI_BROKER_ID}
node.id=${STRIMZI_BROKER_ID}
##########
# Kafka message logs configuration >> 로그 디렉터리
##########
log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}
...
##########
# User provided configuration
##########
default.replication.factor=3
inter.broker.protocol.version=3.6
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
log.message.format.version=3.6
...
# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
Kafka Client 파드를 생성하여 클러스터 정보 확인
- 카프카 클라이언트 파드를 생성하여 중개인, 토픽 등 카프카 클러스터 정보를 조회합니다.
# 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
cat myclient.yaml | yh
# 데몬셋으로 myclient 파드 배포 : 어떤 네임스페이스에 배포되는가?
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
kubectl get pod -l name=kafkaclient -owide
# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile
# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe
# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list
# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka
Kafka UI
- 오픈소스로 제공하는 Apache Kafka UI 도구를 설치합니다.
# 배포
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts
cat <<EOF > kafkaui-values.yml
yamlApplicationConfig:
kafka:
clusters:
- name: yaml
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
auth:
type: disabled
management:
health:
ldap:
enabled: false
EOF
# 설치
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml
# 접속 확인
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui.$MyDomain"
echo -e "kafka-ui Web URL = http://kafka-ui.$MyDomain"
# 노드포트(도메인이 없을 경우)
kubectl patch svc kafka-ui -p '{"spec":{"type":"NodePort"}}'
아래와 같이 UI를 통해 카프카의 기본적인 기능(Broker, Topic, Consumer) 등을 조회할 수 있습니다.
카프카 토픽 생성 및 메시지 주고받기
토픽 생성 및 관리
- 토픽을 생성하고 정보를 확인합니다.
# 토픽 모니터링 : Kafka-UI 같이 확인 >> 설정 반응이 조금 느릴 수 있음
watch -d kubectl get kafkatopics -n kafka
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, envsubst 활용
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml
cat mytopic.yaml | yh
TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
# 토픽 생성 확인 (kubectl native)
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
# 토픽 상세 정보 확인 : 설정값 미 지정 시 기본값이 적용
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe
Topic: mytopic1
TopicId: hz--p3HXTRq-54EVuuY68w
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2
segment.bytes=1073741824
retention.ms=7200000
message.format.version=3.0-IV1
Topic: mytopic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
# 토픽 Topic 생성 : 파티션 1개 리플리케이션 3개
kubectl exec -it ds/myclient -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000
# 토픽 생성 확인
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3 True
mytopic2 my-cluster 1 3 True
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
mytopic2
# 토픽 상세 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# 토픽의 파티션 갯수 늘리기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mytopic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# Kafka-UI 같이 확인
# 실습 구성도 그림 확인
# 토픽의 파티션 갯수 줄이기(안됨)
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 1
Error while executing topic command : Topic currently has 2 partitions, which is higher than the requested 1.
[2022-06-03 14:59:31,427] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
(kafka.admin.TopicCommand$)
# 토픽 일부 옵션 설정 : min.insync.replicas=2 를 min.insync.replicas=3 으로 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2 TopicId: 965ASQDmQfiuIxPiiC9RPQ PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=3,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mytopic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# 토픽 일부 옵션 설정 : 다음 실습을 위해 min.insync.replicas=2 로 다시 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=2
토픽에 메시지 주고받기
- 토픽의 메시지를 실시간으로 주고받는지 확인합니다.
# 토픽 모니터링
watch -d kubectl get kafkatopics -n kafka
# 사용 스크립트
kafka-console-producer.sh
kafka-console-consumer.sh
# 토픽에 데이터 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
> hello
> world
> 0
> 1
> 2
CTRL+D 로 빠져나오기
# 토픽 데이터 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
hello
world
0
1
2
CTRL+C 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property "parse.key=true" --property "key.separator=:"
>key1:doik1
>key2:doik2
>key3:doik3
CTRL+D 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property print.key=true --property key.separator="-" --from-beginning
null-hello
null-world
null-0
null-1
null-2
key1-doik1
key2-doik2
key3-doik3
CTRL+C 로 빠져나오기
# 토픽에 데이터 최대 컨슘 메시지 갯수 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --max-messages 2 --from-beginning
doik2
hello
Processed a total of 2 messages
# 토픽에서 특정 파티션만 컨슘 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --partition 0 --from-beginning
...
CTRL+C 로 빠져나오기
# 특정 오프셋 기준 컨슘 확인 등 다양한 조건의 확인 가능 >> 직접 찾아서 해보세요!
# 토픽 삭제 (kubectl native)
kubectl delete kafkatopics -n kafka mytopic1
장애 복구 테스트
Case 1 : 강제로 Kafka or Zookeeper 파드 1개 삭제
- 파드를 직접 삭제하지 않고 strimzi.io/delete-pod-and-pvc=true 어노테이션을 설지 시, 오퍼레이터가 감지하여 자동으로 파드를 삭제해 줍니다.
- 강제로 Zookeeper 파드 1개 삭제를 하여도 메시지를 주고받을 수 있는지 확인합니다.
# 모니터링
watch -d kubectl get pod -owide -n kafka
kubectl logs -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, ISR=2, envsubst 활용
TOPICNAME=mytopic3 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
# 토픽 정보 확인 : 컨트롤러 브로커 위치 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka -owide
NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')
NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L -t mytopic3 | grep controller
broker 0 at ec2-3-36-53-67.ap-northeast-2.compute.amazonaws.com:31498 (controller) >> 해당 유동 공인IP를 가진 EC2 찾기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# 메시지 받기 : script 혹은 kafka-ui
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
for ((i=1; i<=100; i++)); do echo "failover-test1-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
# 강제로 컨트롤러 브로커 파드 삭제(위치 확인) : 오퍼레이터가 annotate 설정을 모니터링 주기(2분)가 있어서 시간이 지나면 삭제가 실행됨
kubectl annotate pod -n kafka my-cluster-kafka-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-1 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-2 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
# 메시지 보내고 받기 가능!
...
failover-test1-415
% Reached end of topic mytopic3 [0] at offset 491
failover-test1-416
...
# 강제로 주키퍼 파드 삭제
kubectl annotate pod -n kafka my-cluster-zookeeper-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
# 메시지 보내고 받기 가능!
Case 2 : 강제로 토픽 리더 파드가 있는 노드를 drain
- 강제로 토픽 리더 파드가 있는 노드를 drain 하고 메시지를 주고받을 수 있는지 확인합니다.
- ISR(In-Sync Replica)
- 카프카에서 특정 시점에서 리더 파티션과 동기화된 상태에 있는 복제본들을 나타냅니다
- 데이터의 안정성과 내결함성을 유지하는 데 중요하며, 모든 복제본이 동기화된 상태에 있으면 데이터 손실 없이 안정적으로 처리할 수 있습니다
# 모니터링
watch kubectl get pod -owide -n kafka
kubetail -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
# 카프카 토픽 정보 확인 : 리더파드가 있는 워커노드 위치 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
# test 토픽 리더 kafka 파드의 워커노드 확인
kubectl get pod -owide -n kafka | grep kafka
# (터미널2) 메시지 받기
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
for ((i=1; i<=100; i++)); do echo "failover-test2-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test2-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
# test 토픽 리더 kafka 파드의 워커노드에서 drain : test topic leader pod evict
# kubectl drain <<노드>> --ignore-daemonsets --delete-emptydir-data
kubectl get node
NODE=<각자 자신의 EC2 노드 이름 지정>
NODE=ip-192-168-3-96.ap-northeast-2.compute.internal
kubectl drain $NODE --delete-emptydir-data --force --ignore-daemonsets && kubectl get node -w
# 해당 워커노드 drain 확인
kubectl get kafka,strimzipodsets -n kafka
kubectl get node
# kafka 파드 상태
kubectl get pod -l app.kubernetes.io/name=kafka -n kafka
# 카프카 토픽 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3 TopicId: 077wfV5dSnORaZrLh3WLAw PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3 Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 2,0 # 브로커1는 not in-sync 상태
# ISR min.insync.replicas=3 으로 증가 후 메시지 보내고 받기 확인 >> ISR 기능에 대한 이해를 하자!
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=3
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# ISR min.insync.replicas=2 으로 설정 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=2
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# 동작 확인 후 uncordon 설정
kubectl get kafka,strimzipodsets -n kafka
kubectl uncordon $NODE
KEDA를 이용한 오토스케일링
- KEDA는 CNCF의 Graduated 된 오픈소스이다.
- 쿠버네티스에서 제공하는 오토스케일 기능의 HPA(Horizontal Pod Autoscaler)는 메트릭(CPU, MOMERY)을 기반으로 스케일의 여부가 결정된다.
- 하지만, KEDA는 특정한 이벤트를 기반으로 스케일 여부를 결정합니다.
- 쿠버네티스, 카프카를 제외하고도 수많은 오브젝트의 이벤트들을 지원.
KEDA 실습
- Helm을 이용하여 KEDA를 설치합니다.
- 카프카의 특정 토픽의 LAG Threshold 가 '1' 이상 일 때(이벤트) 파드가 오토스케일링 되는지 테스트합니다.
# KEDA 설치
kubectl create namespace keda
helm repo add kedacore https://kedacore.github.io/charts
helm install keda kedacore/keda --version 2.12.0 --namespace keda
# KEDA 설치 확인
kubectl get all -n keda
kubectl get-all -n keda
kubectl get crd | grep keda
# Deploy Consumer application : 컨슈머로 인해 토픽도 생성됨
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-deploy-svc.yaml
cat keda-deploy-svc.yaml | yh
kubectl apply -f keda-deploy-svc.yaml
kubectl get pod -n keda -l app=consumer-service
# 확인
kubectl get kafkatopics -n kafka
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic my-topic --describe
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group keda-consumer --describe
kubectl logs -n keda -l app=consumer-service -f
# kube-ops-view 로 증가/감소 확인
# KEDA 스케일 관련 정책 생성 : LAG 1 기준 달성 시 파드 증가, producer traffic rate 가 기준 이상이 되면 consumer instances 증가
# 컨슈머 LAG (지연) = ‘프로듀서가 보낸 메시지 갯수(카프카에 남아 있는 메시지 갯수)’ - 컨슈머가 가져간 메시지 갯수’
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/keda-scale.yaml
cat keda-scale.yaml | yh
kubectl apply -f keda-scale.yaml
# 모니터링
watch 'kubectl get ScaledObject,hpa,pod -n keda'
kubectl get ScaledObject,hpa -n keda
kubectl logs -n keda -l app=consumer-service
# (터미널1) for문 반복 메시지 보내기
for ((i=1; i<=100; i++)); do echo "keda-scale-test-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic my-topic" ; date ; done
# 모니터링 : 증가한 consumer 파드 확인
kubectl get pod -n keda -l app=consumer-service
# 메시지 보내기 취소 후 일정 시간이 지나면 자동으로 consumer 파드가 최소 갯수 1개로 줄어든다
'Kubernetes > Database Operator' 카테고리의 다른 글
쿠버네티스 데이터베이스 오퍼레이터 스터디 2기(DOIK2) 마치며 (1) | 2023.11.28 |
---|---|
Stackable Operator(Data Platform) - DOIK2_6주차 (1) | 2023.11.26 |
MongoDB Operator - Percona Operator for MongoDB(PSMDB) - DOIK 4주차 (3) | 2023.11.12 |
PostgreSQL Operator - CloudNative PostgreSQL - DOIK2_3주차 (0) | 2023.11.05 |
쿠버네티스 MySQL InnoDB Cluster - DOIK2_2주차 2 (2) | 2023.10.28 |