5. 프로그램 내에서 코드로 카프카 관리하기
15 Nov 2023 | Kafka
5. 프로그램 내에서 코드로 카프카 관리하기
- 카프카를 관리하는 데 있어서는 많은 CLI, GUI 툴이 있다 (9장에서 살펴볼 것)
- 하지만 클라이언트 애플리케이션에서 직접 관리 명령을 내려야 할 때도 있음 (eg. IoT)
- 0.11 버전부터 CLI뿐만 아니라 프로그래밍 언어에서 사용가능한
AdminClient
가 추가 됨
- 토픽 목록 조회, 생성, 삭제
- 클러스터 상세 정보 확인
- ACL 관리
- 설정 변경
5.1 AdminClient 개요
5.1.1 비동기적이고 최종적 일관성을 가지는 API
- 가장 중요한 것:
AdminClient
는 비동기적으로 작동함
- 각 메서드는 요청을 클러스터 컨트롤러로 전송한 뒤 바로 1개 이상의
Future
객체를 리턴
Future
객체는 비동기 작업의 결과를 가리킴
- 비동기 작업의 결과를 확인
- 취소
- 완료까지 대기
- 작업이 완료되었을 때 실행할 함수를 지정하는 메서드 보유
Future
객체를 Result
객체 안에 감싸는데, Result
객체는 작업이 끝날 때까지 대기하거나 작업 결과에 대해 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 가지고 있음
- ex)
AdminClient.createTopics
메서드는 CreateTopicsResult
객체를 리턴
- 이 객체는 모든 토픽이 생성될 떄까지 기다리거나, 각각의 토픽 상태를 하나씩 확인하거나, 아니면 특정한 토픽이 생성된 뒤 해당 토픽의 설정을 가져올 수 있도록 해줌
- 카프카 컨트롤러로부터 브로커로의 메타데이터 전파가 비동기적으로 이루어지기 때문에,
AdminClient
API가 리턴하는 Future
객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에서 완료된 것으로 간주된다.
- 이 시점에서 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지는 못할 수 있기 때문에,
listTopics
요청은 최신 상태를 전달받지 않은 (이제 막 만들어진 토픽에 대해 알고 있지 않은) 브로커에 의해 처리될 수 있음
- 이러한 속성을 최종적 일관성 (eventual consistency)이라고 함
- 최종적으로 모든 브로커는 모든 토픽에 대해 알게될 것이지만, 그게 언제가 될 지에 대해서는 아무런 보장도 할 수 없다
5.1.2 옵션
AdminClient
의 각 메서드는 메서드별로 특정한 Options
객체를 인수로 받음
listTopics
메서드는 ListTopicsOptions
객체를 인수로 받음
describeCluster
메서드는 DescribeClusterOptions
를 인수로 받음
- 모든
AdminClient
메서드가 가지고 있는 매개변수는 timeoutMs
임
TimeoutException
을 발생시키기 전, 클러스터로부터의 응답을 기다리는 시간을 조정함
5.1.3 수평 구조
- 모든 어드민 작업은
KafkaAdminClient
에 구현되어 있는 아파치 카프카 프로토콜을 사용해서 이루어짐
- 여기에는 객체 간의 의존 관계나 네임스페이스 따위가 없다
- 인터페이스가 굉장히 크기 때문에 다소 논란이 있기는 하지만, 이러한 구조에는 장점이 있음
- JavaDoc문서에서 메서드를 찾기 쉽고, IDE에서 자동완성 해줌
- 엉뚱한 곳을 찾고 있는지 고민할 필요 없음, 없으면 구현 안된거임
5.1.4 추가 참고 사항
- 클러스터의 상태를 변경하는 모든 작업(create, delete, alter)은 컨트롤러에 의해 수행
- 클러스터 상태를 읽기만 하는 작업(list, describe)는 아무 브로커에서나 수행될 수 있음, 클라이언트 입장에서 보이는 가장 부하가 적은 브로커로 전달
- 대부분의 어드민 작업은
AdminClient
를 통해서 수행되거나, 주키퍼에 저장되어 있는 메타데이터를 직접 수정하는 방식으로 이루어짐.
- 주키퍼를 직접 수정하는 것을 절대 쓰지 말 것을 강력히 권장, 카프카는 주키퍼 의존성을 제거할 계획
5.2 AdminClient 사용법: 생성, 설정, 닫기
AdminClient
를 사용하기 위해서는 객체를 생성해야 함. 이를 위해 설정값 객체인 Properties
를 인수로 주어야 함
close
메서드로 닫을 수 있음, 이 때 타임 아웃 매개변수를 받음
5.2.1 client.dns.lookup
- 카프카는 부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 검증, 해석, 생성함
- 이 단순한 모델은 대부분의 경우 잘 작동하지만 두 맹점이 있음
- DNS alias를 사용한 경우와, 2개 이상의 IP 주소로 연결되는 하나의 DNS 항목을 사용할 경우
DNS 별칭을 사용하는 경우
broker1.hostname.com
, broker2.hostname.com
…와 같은 naming convention을 따르는 브로커들을 가지고 있다고 가정
- 이 모든 부로커들을 부트스트랩 서버 설정에 일일이 지정하는것보다 이 모든 브로커 전체를 가리킬 하나의 DNS alias을 만들 수 있음 (ex.
all-brokers.hostname.com
)
- 이는 매우 편리하지만, SASL을 사용해서 인증을 하려고 할 때는 문제가 생김
- SASL을 사용할 경우 클라이언트는
all-brokers.hostname.com
에 대해서 인증을 하려하는데, 서버의 보안 주체는 broker2.hostname.com
인 탓 -> man-in-the-middle attack일 가능성이 있음 -> 인증 거부, 연결 실패
- 이 경우,
client.dns.lookup-=resolve_canonical_bootstrap_server_only
- 위 성정은 DNS별칭에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동함
다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우
- 최근 네트워크 아키텍처에서 모든 브로커를 프록시나 로드 밸런서 뒤로 숨기는 것은 매우 흔함
- 이 경우, 로드 밸러서가 SPOF가 되는 걸 보고싶지는 않을 것임
- 위 이유 때문에,
broker.hostname.com
를 여러 개의 IP 주소로 연결하는 것은 매우 흔함 (이 IP주소들은 모두 로드밸런서로 연결되고 따라서 모든 트래픽이 동일한 브로커로 전달됨)
- 이 IP 주소들은 시간이 지남에 따라 변경될 수 있음
- 기본적으로, 카프카 클라이언트는 해석된 첫 번째 호스트명으로 연결을 시도할 뿐.
- 해석된 IP 주소가 사용 불능일 경우 브로커가 멀쩡하게 작동하고 있는데도 클라이언트는 연결에 실패할 수 있음
- 이를 위해
client.dns.lookup=use_all_dns_ips
사용 권장
5.2.2 request.timeout.ms
- 이 설정은 애플리케이션이
AdminClient
의 응답을 기다릴 수 있는 시간의 최대값을 정의
- 만약 애플리케이션에서
AdminClient
작업이 주요 경로 상에 있을 경우, 타임아웃 값을 낮게 잡아준 뒤 제 시간에 리턴되지 않는 응답은 조금 다른 방식으로 처리해야 할 수도 있음
- 서비스가 시작될 때 특정한 토픽이 존재하는지를 확인
- 브로커가 응답하는 데 30초 이상 걸릴 경우, 확인 작업을 건너뛰거나 일단 서버 기동을 계속한 뒤 나중에 토픽의 존재를 확인
5.3 필수적인 토픽 관리 기능
admin.listTopics()
가 Future
객체들을 감싸고 있는 ListTopicsResult
객체를 리턴 함
- 이
Future
객체에서 get
메서드를 호출하면, 실행 스레드는 서버가 토픽 이름 집합을 리턴할 때까지 기다리거나 아니면 타임아웃 예외를 발생함
- 좀 더 복잡한 작업이 필요할 수도 있음 (해당 토픽이 필요한 만큼의 파티션과 레플리카를 가지고 있는지 확인)
- 카프카 커넥트와 컨플루언트의 스키마 레지스트리는 설정을 저장하기 위해 카프카 토픽을 사용함, 이들은 처음 시작할 때 아래의 조건을 만족하는 설정 토픽이 있는지를 확인
- 하나의 파티션을 가짐. 이것은 설정 변경에 온전한 순서를 부여하기 위해 필요
- 가용성을 보장하기 위해 3개의 레플리카를 가짐
- 오래 된 설정값도 계속해서 저장되도록 토픽에 압착 설정이 되어 있음
- 131p 예제 참고
- 지금까지 우리가 살펴본 예제들은 서로 다른
AdminClient
메서드가 리턴하는 Future
객체에 블로킹방식으로 작동하는 get()
메서드를 호출 함
- 만약 많은 어드민 요청을 처리할 것으로 예상되는 서버를 개발하고 있을 경우, 블로킹을 사용하는 것은 불리함
- 사용자로부터 계속해서 요청을 받고, 카프카로 요청을 보내고, 카프카가 응답하면 그제서야 클라이언트로 응답을 보내는 게 더 합리적
- 133p. 예제 참고
- 여기서 중요한 것은 우리가 카프카로부터의 응답을 기다리지 않는다는 점
- 카프카로부터 응답이 도착하면
DescribeTopicResult
가 HTTP 클라이언트에게 응답을 보낼 것
- 그 사이, HTTP 서버는 다른 요청을 처리 가능
5.4 설정 관리
- 설정 관리는
ConfigResource
객체를 사용해서 할 수 있음
- 설정 가능한 자원에는 브로커, 브로커 로그, 토픽이 있음
- 위 설정은
kafka-configs.sh
혹은 다른 툴로 하는게 보통이지만, 애플리케이션에서 토픽의 설정을 확인하거나 수정하는 일도 흔하다
- 많은 어플리케이션들은 정확한 작동을 위해 압착 설정이 된 토픽을 사용
- 이 경우 애플리케이션이 주기적으로 해당 토픽에 실제로 압착 설정이 되어 있는지를 확인해서, 설정이 안 되어 있을 경우 설정을 교정해 주는 것이 합리적.
- 135p 참고
5.5 컨슈머 그룹 관리
- 다른 메시지 큐와는 달리, 카프카는 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해준다는 점은 언급
- 메시지를 재처리해야 하는 시나리오에는 여러 가지가 있을 수 있음
- 사고가 발생한 와중에 오작동하는 애플리케이션을 트러블슈팅하는 경우
- 재해 복구 상황에서 애플리케이션을 새로운 클러스터에서 작동시키려 하는 경우
- 여기서는
AdminClient
를 사용해서 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정하는 방법에 대해서 살펴볼 것
5.5.1 컨슈머 그룹 살펴보기
- `admin.listConsumerGroups().valid().get().forEach(System.out::println);
valid()
메서드, get()
메서트를 호출함으로써 리턴되는 모음은 클러스터가 에러 없이 리턴한 컨슈머 그룹만을 포함
errors()
메서드를 사용해서 모든 예외를 가져올 수 있음
- 특정 그룹에 대해 더 상세한 정보를 보고싶다면
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST)
.describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
description
은 해당 그룹에 대한 상세한 정보를 담는다.
- 그룹 멤버, 멤버별 식별자, 호스트명, 멤버별로 할당된 파티션, 할당 알고리즘, 그룹 코디네이터의 호스트명
- 트러블 슈팅 시 매우 유용
- 하지만 가장 중요한 정보중 하나인 컨슈머 그룹이 읽고 있는 각 파티션에 대해 마지막으로 커밋된 오프셋 값, 최신 메시지에서 얼마나 뒤떨어졌는지 (lag)
- 예전에는 커밋 메시지 가져와서 파싱하는 방법 뿐 -> 호환성 보장이 안됨
AdminClient
를 사용해서 커밋 정보를 얻어오는 방법
- 137p 참고
5.5.2 컨슈머 그룹 수정하기
AdminClient
는 컨슈머 그룹을 수정하기 위한 메서드들 역시 가지고 있음
- 그룹 삭제, 멤버 제외, 커밋된 오프셋 삭제 혹은 변경
- SRE가 비상 상황에서 임기 응변으로 복구를 위한 툴을 제작할 때 자주 사용
- 오프셋 변경 기능이 가장 유용함
- 오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법처럼 보이지만, 이는 컨슈머 설정에 의존
- 만약 컨슈머가 시작되는데 커밋된 오프셋을 못찾을 경우, 맨 처음부터 시작해야할까? 아니면 가장 최신 메시지로 건너뛰어야 할까?
auto.offset.reset
설정값을 가지고 있지 않을 한 알 길이 없음
- 명시적으로 커밋된 오프셋을 맨 앞으로 변경하면 컨슈머는 토픽의 맨 앞에서부터 처리를 시작하게 됨 -> 컨슈머가 reset 됨
- 오프셋 토픽의 오프셋 값을 변경한다 해도 컨슈머 그룹에 변경 여부가 전달되지는 않는다
- 컨슈머 그룹은 컨슈머가 새로운 파티션을 할당받거나 새로 시작할 때만 오프셋 토픽에 저장된 값을 읽어올 뿐
- 컨슈머가 모르는 오프셋 변경을 방지하기 위해, 카프카에서는 현재 작업이 돌아가고 있는 컨슈머 그룹에 대한 오프셋을 수정하는 것을 허용하지 않음
- 오프셋만 변경하면 집계값 등을 중복계산 할 가능성이 있음 -> 상태 저장소를 적절히 변경해주어야 함
- 139p 예제
5.6 클러스터 메타데이터
- 클러스터 메타데이터를 들여다 볼 일은 거의 없음
- 관심 있으면 140p 한번 실행시켜 보던지
5.7 고급 어드민 작업
- 잘 쓰이지 않고 위험하기까지 한, 하지만 필요할 때 사용하면 믿을수 없을 정도로 유용한 메서드 몇 개에 대해서 설명
- 사고에 대응중인 SRE에게 매우 중요
5.7.1 토픽에 파티션 추가하기
- 대체로 토픽의 파티션 수는 토픽이 생성될 때 결정됨
- 여러 이유 때문에 토픽에 파티션을 추가해야 하는 경우는 매우 드물며 위험할 수 있음
- 하지만 파티션이 처리할 수 있는 최대 처리량까지 부하가 차올라서 파티션 추가 외에는 선택지가 없는 경우도 있음
createPartitions
메서드 사용
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2));
admin.createPartitions(newPartitions).all().get;
5.7.2 토픽에서 레코드 삭제하기
- 개인정보 보호법 등의 이유로 인해 데이터를 삭제해야 하는 경우가 있음
deleteRecords
메서드는 호출 시점을 기준으로 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 함으로써 컨슈머가 접근할 수 없도록 한다.
- 이 메서드는 삭제된 레코드의 오프셋 중 가장 큰 값을 리턴하기 때문에 의도했던 대로 삭제가 이루어졌는지 확인 가능
- 삭제 표시된 레코드를 디스크에서 실제로 지우는 작업은 비동기적으로 일어남
- 142p. 예제 참고
5.7.3
선호 리더 선출 (preferred leader election)
- 각 파티션은 선호 리더라 불리는 레플리카를 하나낀 가짐
- 기본적으로, 카프카는 5분마다 선호 리더 레플리카가 실제로 리더를 맡고있는지를 확인해서, 리더를 맡을 수 있는데도 맡고 있지 않은 경우 해당 레플리카를 리더로 삼는다.
electLeader()
메서드 호출
언클린 리더 선출 (unclean leader election)
- 만약 어느 파티션의 리더 레플리카가 사용 불능 상태가 되었는데 다른 레플리카들은 리더를 맡을 수 없는 상황 이라면 (대개 데이터가 없어서 그렇다). 해당 파티션은 리더가 없게 되고 따라서 사용 불능 상태가 됨
- 해결 방법 중 하나가 리더가 될 수 없는 레플리카를 그냥 리더로 삼아버리는 것 (언클린 리더 선출)
- 이는 데이터 유실을 초래
- 이 메서드는 비동기적으로 작동, 시간이 좀 걸릴 수 있음
- 143p 참고
5.7.4 레플리카 재할당
- 레플리카 위치를 바꿔야 하는 경우들이 있음
- 브로커에 너무 많은 레플리카가 올라가 있어 이동을 원할 때
- 레플리카를 추가하고 싶을 때
- 장비를 내리기 위해 모든 레플리카를 다른 장비로 내보내야 할 때
- …
alterPartitionReassignments
사용하면 파티션에 속한 각각의 레플리카의 위치를 정밀하게 제어 가능
- 레플리카 이동은 대량의 데이터 복제를 초래 함
- 144p 참고
5.8 테스트하기
- 아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는
MockAdminClient
테스트 클래스 제공
- 자주 사용되는 메서드가 매우 포괄적인 목업 기능을 제공함
- 145p~147p 예제 참고
5.9 요약
AdminClient
는 매우 유용하다!
5. 프로그램 내에서 코드로 카프카 관리하기
- 카프카를 관리하는 데 있어서는 많은 CLI, GUI 툴이 있다 (9장에서 살펴볼 것)
- 하지만 클라이언트 애플리케이션에서 직접 관리 명령을 내려야 할 때도 있음 (eg. IoT)
- 0.11 버전부터 CLI뿐만 아니라 프로그래밍 언어에서 사용가능한
AdminClient
가 추가 됨- 토픽 목록 조회, 생성, 삭제
- 클러스터 상세 정보 확인
- ACL 관리
- 설정 변경
5.1 AdminClient 개요
5.1.1 비동기적이고 최종적 일관성을 가지는 API
- 가장 중요한 것:
AdminClient
는 비동기적으로 작동함 - 각 메서드는 요청을 클러스터 컨트롤러로 전송한 뒤 바로 1개 이상의
Future
객체를 리턴 Future
객체는 비동기 작업의 결과를 가리킴- 비동기 작업의 결과를 확인
- 취소
- 완료까지 대기
- 작업이 완료되었을 때 실행할 함수를 지정하는 메서드 보유
Future
객체를Result
객체 안에 감싸는데,Result
객체는 작업이 끝날 때까지 대기하거나 작업 결과에 대해 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 가지고 있음- ex)
AdminClient.createTopics
메서드는CreateTopicsResult
객체를 리턴 - 이 객체는 모든 토픽이 생성될 떄까지 기다리거나, 각각의 토픽 상태를 하나씩 확인하거나, 아니면 특정한 토픽이 생성된 뒤 해당 토픽의 설정을 가져올 수 있도록 해줌
- ex)
- 카프카 컨트롤러로부터 브로커로의 메타데이터 전파가 비동기적으로 이루어지기 때문에,
AdminClient
API가 리턴하는Future
객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에서 완료된 것으로 간주된다. - 이 시점에서 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지는 못할 수 있기 때문에,
listTopics
요청은 최신 상태를 전달받지 않은 (이제 막 만들어진 토픽에 대해 알고 있지 않은) 브로커에 의해 처리될 수 있음 - 이러한 속성을 최종적 일관성 (eventual consistency)이라고 함
- 최종적으로 모든 브로커는 모든 토픽에 대해 알게될 것이지만, 그게 언제가 될 지에 대해서는 아무런 보장도 할 수 없다
5.1.2 옵션
AdminClient
의 각 메서드는 메서드별로 특정한Options
객체를 인수로 받음listTopics
메서드는ListTopicsOptions
객체를 인수로 받음describeCluster
메서드는DescribeClusterOptions
를 인수로 받음
- 모든
AdminClient
메서드가 가지고 있는 매개변수는timeoutMs
임TimeoutException
을 발생시키기 전, 클러스터로부터의 응답을 기다리는 시간을 조정함
5.1.3 수평 구조
- 모든 어드민 작업은
KafkaAdminClient
에 구현되어 있는 아파치 카프카 프로토콜을 사용해서 이루어짐- 여기에는 객체 간의 의존 관계나 네임스페이스 따위가 없다
- 인터페이스가 굉장히 크기 때문에 다소 논란이 있기는 하지만, 이러한 구조에는 장점이 있음
- JavaDoc문서에서 메서드를 찾기 쉽고, IDE에서 자동완성 해줌
- 엉뚱한 곳을 찾고 있는지 고민할 필요 없음, 없으면 구현 안된거임
5.1.4 추가 참고 사항
- 클러스터의 상태를 변경하는 모든 작업(create, delete, alter)은 컨트롤러에 의해 수행
- 클러스터 상태를 읽기만 하는 작업(list, describe)는 아무 브로커에서나 수행될 수 있음, 클라이언트 입장에서 보이는 가장 부하가 적은 브로커로 전달
- 대부분의 어드민 작업은
AdminClient
를 통해서 수행되거나, 주키퍼에 저장되어 있는 메타데이터를 직접 수정하는 방식으로 이루어짐. - 주키퍼를 직접 수정하는 것을 절대 쓰지 말 것을 강력히 권장, 카프카는 주키퍼 의존성을 제거할 계획
5.2 AdminClient 사용법: 생성, 설정, 닫기
AdminClient
를 사용하기 위해서는 객체를 생성해야 함. 이를 위해 설정값 객체인Properties
를 인수로 주어야 함close
메서드로 닫을 수 있음, 이 때 타임 아웃 매개변수를 받음
5.2.1 client.dns.lookup
- 카프카는 부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 검증, 해석, 생성함
- 이 단순한 모델은 대부분의 경우 잘 작동하지만 두 맹점이 있음
- DNS alias를 사용한 경우와, 2개 이상의 IP 주소로 연결되는 하나의 DNS 항목을 사용할 경우
DNS 별칭을 사용하는 경우
broker1.hostname.com
,broker2.hostname.com
…와 같은 naming convention을 따르는 브로커들을 가지고 있다고 가정- 이 모든 부로커들을 부트스트랩 서버 설정에 일일이 지정하는것보다 이 모든 브로커 전체를 가리킬 하나의 DNS alias을 만들 수 있음 (ex.
all-brokers.hostname.com
) - 이는 매우 편리하지만, SASL을 사용해서 인증을 하려고 할 때는 문제가 생김
- SASL을 사용할 경우 클라이언트는
all-brokers.hostname.com
에 대해서 인증을 하려하는데, 서버의 보안 주체는broker2.hostname.com
인 탓 -> man-in-the-middle attack일 가능성이 있음 -> 인증 거부, 연결 실패- 이 경우,
client.dns.lookup-=resolve_canonical_bootstrap_server_only
- 위 성정은 DNS별칭에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동함
- 이 경우,
- SASL을 사용할 경우 클라이언트는
다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우
- 최근 네트워크 아키텍처에서 모든 브로커를 프록시나 로드 밸런서 뒤로 숨기는 것은 매우 흔함
- 이 경우, 로드 밸러서가 SPOF가 되는 걸 보고싶지는 않을 것임
- 위 이유 때문에,
broker.hostname.com
를 여러 개의 IP 주소로 연결하는 것은 매우 흔함 (이 IP주소들은 모두 로드밸런서로 연결되고 따라서 모든 트래픽이 동일한 브로커로 전달됨) - 이 IP 주소들은 시간이 지남에 따라 변경될 수 있음
- 기본적으로, 카프카 클라이언트는 해석된 첫 번째 호스트명으로 연결을 시도할 뿐.
- 해석된 IP 주소가 사용 불능일 경우 브로커가 멀쩡하게 작동하고 있는데도 클라이언트는 연결에 실패할 수 있음
- 이를 위해
client.dns.lookup=use_all_dns_ips
사용 권장
5.2.2 request.timeout.ms
- 이 설정은 애플리케이션이
AdminClient
의 응답을 기다릴 수 있는 시간의 최대값을 정의 - 만약 애플리케이션에서
AdminClient
작업이 주요 경로 상에 있을 경우, 타임아웃 값을 낮게 잡아준 뒤 제 시간에 리턴되지 않는 응답은 조금 다른 방식으로 처리해야 할 수도 있음- 서비스가 시작될 때 특정한 토픽이 존재하는지를 확인
- 브로커가 응답하는 데 30초 이상 걸릴 경우, 확인 작업을 건너뛰거나 일단 서버 기동을 계속한 뒤 나중에 토픽의 존재를 확인
5.3 필수적인 토픽 관리 기능
admin.listTopics()
가Future
객체들을 감싸고 있는ListTopicsResult
객체를 리턴 함- 이
Future
객체에서get
메서드를 호출하면, 실행 스레드는 서버가 토픽 이름 집합을 리턴할 때까지 기다리거나 아니면 타임아웃 예외를 발생함 - 좀 더 복잡한 작업이 필요할 수도 있음 (해당 토픽이 필요한 만큼의 파티션과 레플리카를 가지고 있는지 확인)
- 카프카 커넥트와 컨플루언트의 스키마 레지스트리는 설정을 저장하기 위해 카프카 토픽을 사용함, 이들은 처음 시작할 때 아래의 조건을 만족하는 설정 토픽이 있는지를 확인
- 하나의 파티션을 가짐. 이것은 설정 변경에 온전한 순서를 부여하기 위해 필요
- 가용성을 보장하기 위해 3개의 레플리카를 가짐
- 오래 된 설정값도 계속해서 저장되도록 토픽에 압착 설정이 되어 있음
- 131p 예제 참고
- 지금까지 우리가 살펴본 예제들은 서로 다른
AdminClient
메서드가 리턴하는Future
객체에 블로킹방식으로 작동하는get()
메서드를 호출 함 - 만약 많은 어드민 요청을 처리할 것으로 예상되는 서버를 개발하고 있을 경우, 블로킹을 사용하는 것은 불리함
- 사용자로부터 계속해서 요청을 받고, 카프카로 요청을 보내고, 카프카가 응답하면 그제서야 클라이언트로 응답을 보내는 게 더 합리적
- 133p. 예제 참고
- 여기서 중요한 것은 우리가 카프카로부터의 응답을 기다리지 않는다는 점
- 카프카로부터 응답이 도착하면
DescribeTopicResult
가 HTTP 클라이언트에게 응답을 보낼 것 - 그 사이, HTTP 서버는 다른 요청을 처리 가능
5.4 설정 관리
- 설정 관리는
ConfigResource
객체를 사용해서 할 수 있음 - 설정 가능한 자원에는 브로커, 브로커 로그, 토픽이 있음
- 위 설정은
kafka-configs.sh
혹은 다른 툴로 하는게 보통이지만, 애플리케이션에서 토픽의 설정을 확인하거나 수정하는 일도 흔하다 - 많은 어플리케이션들은 정확한 작동을 위해 압착 설정이 된 토픽을 사용
- 이 경우 애플리케이션이 주기적으로 해당 토픽에 실제로 압착 설정이 되어 있는지를 확인해서, 설정이 안 되어 있을 경우 설정을 교정해 주는 것이 합리적.
- 135p 참고
5.5 컨슈머 그룹 관리
- 다른 메시지 큐와는 달리, 카프카는 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해준다는 점은 언급
- 메시지를 재처리해야 하는 시나리오에는 여러 가지가 있을 수 있음
- 사고가 발생한 와중에 오작동하는 애플리케이션을 트러블슈팅하는 경우
- 재해 복구 상황에서 애플리케이션을 새로운 클러스터에서 작동시키려 하는 경우
- 여기서는
AdminClient
를 사용해서 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정하는 방법에 대해서 살펴볼 것
5.5.1 컨슈머 그룹 살펴보기
- `admin.listConsumerGroups().valid().get().forEach(System.out::println);
valid()
메서드,get()
메서트를 호출함으로써 리턴되는 모음은 클러스터가 에러 없이 리턴한 컨슈머 그룹만을 포함errors()
메서드를 사용해서 모든 예외를 가져올 수 있음- 특정 그룹에 대해 더 상세한 정보를 보고싶다면
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST)
.describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
description
은 해당 그룹에 대한 상세한 정보를 담는다.- 그룹 멤버, 멤버별 식별자, 호스트명, 멤버별로 할당된 파티션, 할당 알고리즘, 그룹 코디네이터의 호스트명
- 트러블 슈팅 시 매우 유용
- 하지만 가장 중요한 정보중 하나인 컨슈머 그룹이 읽고 있는 각 파티션에 대해 마지막으로 커밋된 오프셋 값, 최신 메시지에서 얼마나 뒤떨어졌는지 (lag)
- 예전에는 커밋 메시지 가져와서 파싱하는 방법 뿐 -> 호환성 보장이 안됨
AdminClient
를 사용해서 커밋 정보를 얻어오는 방법- 137p 참고
5.5.2 컨슈머 그룹 수정하기
AdminClient
는 컨슈머 그룹을 수정하기 위한 메서드들 역시 가지고 있음- 그룹 삭제, 멤버 제외, 커밋된 오프셋 삭제 혹은 변경
- SRE가 비상 상황에서 임기 응변으로 복구를 위한 툴을 제작할 때 자주 사용
- 오프셋 변경 기능이 가장 유용함
- 오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법처럼 보이지만, 이는 컨슈머 설정에 의존
- 만약 컨슈머가 시작되는데 커밋된 오프셋을 못찾을 경우, 맨 처음부터 시작해야할까? 아니면 가장 최신 메시지로 건너뛰어야 할까?
auto.offset.reset
설정값을 가지고 있지 않을 한 알 길이 없음- 명시적으로 커밋된 오프셋을 맨 앞으로 변경하면 컨슈머는 토픽의 맨 앞에서부터 처리를 시작하게 됨 -> 컨슈머가 reset 됨
- 오프셋 토픽의 오프셋 값을 변경한다 해도 컨슈머 그룹에 변경 여부가 전달되지는 않는다
- 컨슈머 그룹은 컨슈머가 새로운 파티션을 할당받거나 새로 시작할 때만 오프셋 토픽에 저장된 값을 읽어올 뿐
- 컨슈머가 모르는 오프셋 변경을 방지하기 위해, 카프카에서는 현재 작업이 돌아가고 있는 컨슈머 그룹에 대한 오프셋을 수정하는 것을 허용하지 않음
- 오프셋만 변경하면 집계값 등을 중복계산 할 가능성이 있음 -> 상태 저장소를 적절히 변경해주어야 함
- 139p 예제
5.6 클러스터 메타데이터
- 클러스터 메타데이터를 들여다 볼 일은 거의 없음
- 관심 있으면 140p 한번 실행시켜 보던지
5.7 고급 어드민 작업
- 잘 쓰이지 않고 위험하기까지 한, 하지만 필요할 때 사용하면 믿을수 없을 정도로 유용한 메서드 몇 개에 대해서 설명
- 사고에 대응중인 SRE에게 매우 중요
5.7.1 토픽에 파티션 추가하기
- 대체로 토픽의 파티션 수는 토픽이 생성될 때 결정됨
- 여러 이유 때문에 토픽에 파티션을 추가해야 하는 경우는 매우 드물며 위험할 수 있음
- 하지만 파티션이 처리할 수 있는 최대 처리량까지 부하가 차올라서 파티션 추가 외에는 선택지가 없는 경우도 있음
createPartitions
메서드 사용
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2));
admin.createPartitions(newPartitions).all().get;
5.7.2 토픽에서 레코드 삭제하기
- 개인정보 보호법 등의 이유로 인해 데이터를 삭제해야 하는 경우가 있음
deleteRecords
메서드는 호출 시점을 기준으로 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 함으로써 컨슈머가 접근할 수 없도록 한다.- 이 메서드는 삭제된 레코드의 오프셋 중 가장 큰 값을 리턴하기 때문에 의도했던 대로 삭제가 이루어졌는지 확인 가능
- 삭제 표시된 레코드를 디스크에서 실제로 지우는 작업은 비동기적으로 일어남
- 142p. 예제 참고
5.7.3
선호 리더 선출 (preferred leader election)
- 각 파티션은 선호 리더라 불리는 레플리카를 하나낀 가짐
- 기본적으로, 카프카는 5분마다 선호 리더 레플리카가 실제로 리더를 맡고있는지를 확인해서, 리더를 맡을 수 있는데도 맡고 있지 않은 경우 해당 레플리카를 리더로 삼는다.
electLeader()
메서드 호출
언클린 리더 선출 (unclean leader election)
- 만약 어느 파티션의 리더 레플리카가 사용 불능 상태가 되었는데 다른 레플리카들은 리더를 맡을 수 없는 상황 이라면 (대개 데이터가 없어서 그렇다). 해당 파티션은 리더가 없게 되고 따라서 사용 불능 상태가 됨
- 해결 방법 중 하나가 리더가 될 수 없는 레플리카를 그냥 리더로 삼아버리는 것 (언클린 리더 선출)
- 이는 데이터 유실을 초래
- 이 메서드는 비동기적으로 작동, 시간이 좀 걸릴 수 있음
- 143p 참고
5.7.4 레플리카 재할당
- 레플리카 위치를 바꿔야 하는 경우들이 있음
- 브로커에 너무 많은 레플리카가 올라가 있어 이동을 원할 때
- 레플리카를 추가하고 싶을 때
- 장비를 내리기 위해 모든 레플리카를 다른 장비로 내보내야 할 때
- …
alterPartitionReassignments
사용하면 파티션에 속한 각각의 레플리카의 위치를 정밀하게 제어 가능- 레플리카 이동은 대량의 데이터 복제를 초래 함
- 144p 참고
5.8 테스트하기
- 아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는
MockAdminClient
테스트 클래스 제공 - 자주 사용되는 메서드가 매우 포괄적인 목업 기능을 제공함
- 145p~147p 예제 참고
5.9 요약
AdminClient
는 매우 유용하다!
Comments