NLP Blog

5. 프로그램 내에서 코드로 카프카 관리하기

|

5. 프로그램 내에서 코드로 카프카 관리하기

  • 카프카를 관리하는 데 있어서는 많은 CLI, GUI 툴이 있다 (9장에서 살펴볼 것)
  • 하지만 클라이언트 애플리케이션에서 직접 관리 명령을 내려야 할 때도 있음 (eg. IoT)
  • 0.11 버전부터 CLI뿐만 아니라 프로그래밍 언어에서 사용가능한 AdminClient가 추가 됨
    • 토픽 목록 조회, 생성, 삭제
    • 클러스터 상세 정보 확인
    • ACL 관리
    • 설정 변경

5.1 AdminClient 개요

5.1.1 비동기적이고 최종적 일관성을 가지는 API

  • 가장 중요한 것: AdminClient는 비동기적으로 작동함
  • 각 메서드는 요청을 클러스터 컨트롤러로 전송한 뒤 바로 1개 이상의 Future 객체를 리턴
  • Future 객체는 비동기 작업의 결과를 가리킴
    • 비동기 작업의 결과를 확인
    • 취소
    • 완료까지 대기
    • 작업이 완료되었을 때 실행할 함수를 지정하는 메서드 보유
  • Future 객체를 Result 객체 안에 감싸는데, Result객체는 작업이 끝날 때까지 대기하거나 작업 결과에 대해 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 가지고 있음
    • ex) AdminClient.createTopics 메서드는 CreateTopicsResult 객체를 리턴
    • 이 객체는 모든 토픽이 생성될 떄까지 기다리거나, 각각의 토픽 상태를 하나씩 확인하거나, 아니면 특정한 토픽이 생성된 뒤 해당 토픽의 설정을 가져올 수 있도록 해줌
  • 카프카 컨트롤러로부터 브로커로의 메타데이터 전파가 비동기적으로 이루어지기 때문에, AdminClient API가 리턴하는 Future 객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에서 완료된 것으로 간주된다.
  • 이 시점에서 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지는 못할 수 있기 때문에, listTopics 요청은 최신 상태를 전달받지 않은 (이제 막 만들어진 토픽에 대해 알고 있지 않은) 브로커에 의해 처리될 수 있음
  • 이러한 속성을 최종적 일관성 (eventual consistency)이라고 함
  • 최종적으로 모든 브로커는 모든 토픽에 대해 알게될 것이지만, 그게 언제가 될 지에 대해서는 아무런 보장도 할 수 없다

5.1.2 옵션

  • AdminClient의 각 메서드는 메서드별로 특정한 Options 객체를 인수로 받음
    • listTopics 메서드는 ListTopicsOptions 객체를 인수로 받음
    • describeCluster 메서드는 DescribeClusterOptions를 인수로 받음
  • 모든 AdminClient 메서드가 가지고 있는 매개변수는 timeoutMs
    • TimeoutException을 발생시키기 전, 클러스터로부터의 응답을 기다리는 시간을 조정함

5.1.3 수평 구조

  • 모든 어드민 작업은 KafkaAdminClient에 구현되어 있는 아파치 카프카 프로토콜을 사용해서 이루어짐
    • 여기에는 객체 간의 의존 관계나 네임스페이스 따위가 없다
    • 인터페이스가 굉장히 크기 때문에 다소 논란이 있기는 하지만, 이러한 구조에는 장점이 있음
    • JavaDoc문서에서 메서드를 찾기 쉽고, IDE에서 자동완성 해줌
    • 엉뚱한 곳을 찾고 있는지 고민할 필요 없음, 없으면 구현 안된거임

5.1.4 추가 참고 사항

  • 클러스터의 상태를 변경하는 모든 작업(create, delete, alter)은 컨트롤러에 의해 수행
  • 클러스터 상태를 읽기만 하는 작업(list, describe)는 아무 브로커에서나 수행될 수 있음, 클라이언트 입장에서 보이는 가장 부하가 적은 브로커로 전달
  • 대부분의 어드민 작업은 AdminClient를 통해서 수행되거나, 주키퍼에 저장되어 있는 메타데이터를 직접 수정하는 방식으로 이루어짐.
  • 주키퍼를 직접 수정하는 것을 절대 쓰지 말 것을 강력히 권장, 카프카는 주키퍼 의존성을 제거할 계획

5.2 AdminClient 사용법: 생성, 설정, 닫기

  • AdminClient를 사용하기 위해서는 객체를 생성해야 함. 이를 위해 설정값 객체인 Properties를 인수로 주어야 함
  • close 메서드로 닫을 수 있음, 이 때 타임 아웃 매개변수를 받음

5.2.1 client.dns.lookup

  • 카프카는 부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 검증, 해석, 생성함
  • 이 단순한 모델은 대부분의 경우 잘 작동하지만 두 맹점이 있음
    • DNS alias를 사용한 경우와, 2개 이상의 IP 주소로 연결되는 하나의 DNS 항목을 사용할 경우

DNS 별칭을 사용하는 경우

  • broker1.hostname.com, broker2.hostname.com …와 같은 naming convention을 따르는 브로커들을 가지고 있다고 가정
  • 이 모든 부로커들을 부트스트랩 서버 설정에 일일이 지정하는것보다 이 모든 브로커 전체를 가리킬 하나의 DNS alias을 만들 수 있음 (ex. all-brokers.hostname.com)
  • 이는 매우 편리하지만, SASL을 사용해서 인증을 하려고 할 때는 문제가 생김
    • SASL을 사용할 경우 클라이언트는 all-brokers.hostname.com에 대해서 인증을 하려하는데, 서버의 보안 주체는 broker2.hostname.com인 탓 -> man-in-the-middle attack일 가능성이 있음 -> 인증 거부, 연결 실패
      • 이 경우, client.dns.lookup-=resolve_canonical_bootstrap_server_only
      • 위 성정은 DNS별칭에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동함

다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우

  • 최근 네트워크 아키텍처에서 모든 브로커를 프록시나 로드 밸런서 뒤로 숨기는 것은 매우 흔함
  • 이 경우, 로드 밸러서가 SPOF가 되는 걸 보고싶지는 않을 것임
  • 위 이유 때문에, broker.hostname.com를 여러 개의 IP 주소로 연결하는 것은 매우 흔함 (이 IP주소들은 모두 로드밸런서로 연결되고 따라서 모든 트래픽이 동일한 브로커로 전달됨)
  • 이 IP 주소들은 시간이 지남에 따라 변경될 수 있음
  • 기본적으로, 카프카 클라이언트는 해석된 첫 번째 호스트명으로 연결을 시도할 뿐.
    • 해석된 IP 주소가 사용 불능일 경우 브로커가 멀쩡하게 작동하고 있는데도 클라이언트는 연결에 실패할 수 있음
  • 이를 위해 client.dns.lookup=use_all_dns_ips 사용 권장

5.2.2 request.timeout.ms

  • 이 설정은 애플리케이션이 AdminClient의 응답을 기다릴 수 있는 시간의 최대값을 정의
  • 만약 애플리케이션에서 AdminClient 작업이 주요 경로 상에 있을 경우, 타임아웃 값을 낮게 잡아준 뒤 제 시간에 리턴되지 않는 응답은 조금 다른 방식으로 처리해야 할 수도 있음
    • 서비스가 시작될 때 특정한 토픽이 존재하는지를 확인
    • 브로커가 응답하는 데 30초 이상 걸릴 경우, 확인 작업을 건너뛰거나 일단 서버 기동을 계속한 뒤 나중에 토픽의 존재를 확인

5.3 필수적인 토픽 관리 기능

  • admin.listTopics()Future 객체들을 감싸고 있는 ListTopicsResult 객체를 리턴 함
  • Future 객체에서 get 메서드를 호출하면, 실행 스레드는 서버가 토픽 이름 집합을 리턴할 때까지 기다리거나 아니면 타임아웃 예외를 발생함
  • 좀 더 복잡한 작업이 필요할 수도 있음 (해당 토픽이 필요한 만큼의 파티션과 레플리카를 가지고 있는지 확인)
  • 카프카 커넥트와 컨플루언트의 스키마 레지스트리는 설정을 저장하기 위해 카프카 토픽을 사용함, 이들은 처음 시작할 때 아래의 조건을 만족하는 설정 토픽이 있는지를 확인
    • 하나의 파티션을 가짐. 이것은 설정 변경에 온전한 순서를 부여하기 위해 필요
    • 가용성을 보장하기 위해 3개의 레플리카를 가짐
    • 오래 된 설정값도 계속해서 저장되도록 토픽에 압착 설정이 되어 있음
    • 131p 예제 참고
  • 지금까지 우리가 살펴본 예제들은 서로 다른 AdminClient 메서드가 리턴하는 Future 객체에 블로킹방식으로 작동하는 get() 메서드를 호출 함
  • 만약 많은 어드민 요청을 처리할 것으로 예상되는 서버를 개발하고 있을 경우, 블로킹을 사용하는 것은 불리함
  • 사용자로부터 계속해서 요청을 받고, 카프카로 요청을 보내고, 카프카가 응답하면 그제서야 클라이언트로 응답을 보내는 게 더 합리적
  • 133p. 예제 참고
    • 여기서 중요한 것은 우리가 카프카로부터의 응답을 기다리지 않는다는 점
    • 카프카로부터 응답이 도착하면 DescribeTopicResult가 HTTP 클라이언트에게 응답을 보낼 것
    • 그 사이, HTTP 서버는 다른 요청을 처리 가능

5.4 설정 관리

  • 설정 관리는 ConfigResource 객체를 사용해서 할 수 있음
  • 설정 가능한 자원에는 브로커, 브로커 로그, 토픽이 있음
  • 위 설정은 kafka-configs.sh 혹은 다른 툴로 하는게 보통이지만, 애플리케이션에서 토픽의 설정을 확인하거나 수정하는 일도 흔하다
  • 많은 어플리케이션들은 정확한 작동을 위해 압착 설정이 된 토픽을 사용
  • 이 경우 애플리케이션이 주기적으로 해당 토픽에 실제로 압착 설정이 되어 있는지를 확인해서, 설정이 안 되어 있을 경우 설정을 교정해 주는 것이 합리적.
  • 135p 참고

5.5 컨슈머 그룹 관리

  • 다른 메시지 큐와는 달리, 카프카는 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해준다는 점은 언급
  • 메시지를 재처리해야 하는 시나리오에는 여러 가지가 있을 수 있음
    • 사고가 발생한 와중에 오작동하는 애플리케이션을 트러블슈팅하는 경우
    • 재해 복구 상황에서 애플리케이션을 새로운 클러스터에서 작동시키려 하는 경우
  • 여기서는 AdminClient를 사용해서 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정하는 방법에 대해서 살펴볼 것

5.5.1 컨슈머 그룹 살펴보기

  • `admin.listConsumerGroups().valid().get().forEach(System.out::println);
  • valid() 메서드, get() 메서트를 호출함으로써 리턴되는 모음은 클러스터가 에러 없이 리턴한 컨슈머 그룹만을 포함
  • errors() 메서드를 사용해서 모든 예외를 가져올 수 있음
  • 특정 그룹에 대해 더 상세한 정보를 보고싶다면
ConsumerGroupDescription groupDescription = admin.describeConsumerGroups(CONSUMER_GRP_LIST)
        .describedGroups().get(CONSUMER_GROUP).get();
System.out.println("Description of group " + CONSUMER_GROUP + ":" + groupDescription);
  • description은 해당 그룹에 대한 상세한 정보를 담는다.
    • 그룹 멤버, 멤버별 식별자, 호스트명, 멤버별로 할당된 파티션, 할당 알고리즘, 그룹 코디네이터의 호스트명
    • 트러블 슈팅 시 매우 유용
  • 하지만 가장 중요한 정보중 하나인 컨슈머 그룹이 읽고 있는 각 파티션에 대해 마지막으로 커밋된 오프셋 값, 최신 메시지에서 얼마나 뒤떨어졌는지 (lag)
  • 예전에는 커밋 메시지 가져와서 파싱하는 방법 뿐 -> 호환성 보장이 안됨
  • AdminClient를 사용해서 커밋 정보를 얻어오는 방법
  • 137p 참고

5.5.2 컨슈머 그룹 수정하기

  • AdminClient는 컨슈머 그룹을 수정하기 위한 메서드들 역시 가지고 있음
    • 그룹 삭제, 멤버 제외, 커밋된 오프셋 삭제 혹은 변경
    • SRE가 비상 상황에서 임기 응변으로 복구를 위한 툴을 제작할 때 자주 사용
  • 오프셋 변경 기능이 가장 유용함
    • 오프셋 삭제는 컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법처럼 보이지만, 이는 컨슈머 설정에 의존
    • 만약 컨슈머가 시작되는데 커밋된 오프셋을 못찾을 경우, 맨 처음부터 시작해야할까? 아니면 가장 최신 메시지로 건너뛰어야 할까?
    • auto.offset.reset 설정값을 가지고 있지 않을 한 알 길이 없음
    • 명시적으로 커밋된 오프셋을 맨 앞으로 변경하면 컨슈머는 토픽의 맨 앞에서부터 처리를 시작하게 됨 -> 컨슈머가 reset 됨
  • 오프셋 토픽의 오프셋 값을 변경한다 해도 컨슈머 그룹에 변경 여부가 전달되지는 않는다
    • 컨슈머 그룹은 컨슈머가 새로운 파티션을 할당받거나 새로 시작할 때만 오프셋 토픽에 저장된 값을 읽어올 뿐
    • 컨슈머가 모르는 오프셋 변경을 방지하기 위해, 카프카에서는 현재 작업이 돌아가고 있는 컨슈머 그룹에 대한 오프셋을 수정하는 것을 허용하지 않음
  • 오프셋만 변경하면 집계값 등을 중복계산 할 가능성이 있음 -> 상태 저장소를 적절히 변경해주어야 함
  • 139p 예제

5.6 클러스터 메타데이터

  • 클러스터 메타데이터를 들여다 볼 일은 거의 없음
  • 관심 있으면 140p 한번 실행시켜 보던지

5.7 고급 어드민 작업

  • 잘 쓰이지 않고 위험하기까지 한, 하지만 필요할 때 사용하면 믿을수 없을 정도로 유용한 메서드 몇 개에 대해서 설명
  • 사고에 대응중인 SRE에게 매우 중요

5.7.1 토픽에 파티션 추가하기

  • 대체로 토픽의 파티션 수는 토픽이 생성될 때 결정됨
  • 여러 이유 때문에 토픽에 파티션을 추가해야 하는 경우는 매우 드물며 위험할 수 있음
  • 하지만 파티션이 처리할 수 있는 최대 처리량까지 부하가 차올라서 파티션 추가 외에는 선택지가 없는 경우도 있음
  • createPartitions 메서드 사용
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS+2));
admin.createPartitions(newPartitions).all().get;

5.7.2 토픽에서 레코드 삭제하기

  • 개인정보 보호법 등의 이유로 인해 데이터를 삭제해야 하는 경우가 있음
  • deleteRecords 메서드는 호출 시점을 기준으로 지정된 오프셋보다 더 오래된 모든 레코드에 삭제 표시를 함으로써 컨슈머가 접근할 수 없도록 한다.
  • 이 메서드는 삭제된 레코드의 오프셋 중 가장 큰 값을 리턴하기 때문에 의도했던 대로 삭제가 이루어졌는지 확인 가능
  • 삭제 표시된 레코드를 디스크에서 실제로 지우는 작업은 비동기적으로 일어남
  • 142p. 예제 참고

5.7.3

선호 리더 선출 (preferred leader election)

  • 각 파티션은 선호 리더라 불리는 레플리카를 하나낀 가짐
  • 기본적으로, 카프카는 5분마다 선호 리더 레플리카가 실제로 리더를 맡고있는지를 확인해서, 리더를 맡을 수 있는데도 맡고 있지 않은 경우 해당 레플리카를 리더로 삼는다.
  • electLeader() 메서드 호출

언클린 리더 선출 (unclean leader election)

  • 만약 어느 파티션의 리더 레플리카가 사용 불능 상태가 되었는데 다른 레플리카들은 리더를 맡을 수 없는 상황 이라면 (대개 데이터가 없어서 그렇다). 해당 파티션은 리더가 없게 되고 따라서 사용 불능 상태가 됨
  • 해결 방법 중 하나가 리더가 될 수 없는 레플리카를 그냥 리더로 삼아버리는 것 (언클린 리더 선출)
  • 이는 데이터 유실을 초래

  • 이 메서드는 비동기적으로 작동, 시간이 좀 걸릴 수 있음
  • 143p 참고

5.7.4 레플리카 재할당

  • 레플리카 위치를 바꿔야 하는 경우들이 있음
    • 브로커에 너무 많은 레플리카가 올라가 있어 이동을 원할 때
    • 레플리카를 추가하고 싶을 때
    • 장비를 내리기 위해 모든 레플리카를 다른 장비로 내보내야 할 때
  • alterPartitionReassignments 사용하면 파티션에 속한 각각의 레플리카의 위치를 정밀하게 제어 가능
  • 레플리카 이동은 대량의 데이터 복제를 초래 함
  • 144p 참고

5.8 테스트하기

  • 아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient 테스트 클래스 제공
  • 자주 사용되는 메서드가 매우 포괄적인 목업 기능을 제공함
  • 145p~147p 예제 참고

5.9 요약

  • AdminClient는 매우 유용하다!

Comments