1. 개요

Apache Kafka를 사용하는 클라이언트 애플리케이션은 일반적으로 생산자와 소비자라는 두 범주 중 하나에 속합니다. 생산자와 소비자 모두 각각 생산 및 소비 작업을 시작하기 전에 기본 Kafka 서버가 가동 및 실행 중이어야 합니다 .

이 기사에서는 Kafka 서버가 실행 중인지 확인하는 몇 가지 전략을 배웁니다.

2. 사육사 명령 사용

활성 브로커가 있는지 확인하는 가장 빠른 방법 중 하나는 Zookeeper의 덤프 명령을 사용하는 것입니다. dump 명령 Zookeeper 서버를 관리하는 데 사용할 수 있는 4LW 명령 중 하나입니다 .

계속해서 nc 명령을 사용하여 2181 포트에서 수신 대기 중인 Zookeeper 서버를 통해 덤프 명령을 보내겠습니다.

$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0

명령을 실행하면 Zookeeper 서버에 등록된 임시 브로커 ID List이 표시됩니다. 임시 ID가 없으면 실행 중인 브로커 노드가 없습니다.

또한 일반적으로 zookeeper.properties 또는 zoo.cfg 구성 파일 내에서 사용 가능한 구성에서 dump 명령을 명시적으로 허용해야 한다는 점에 유의해야 합니다 .

lw.commands.whitelist=dump

또는 Zookeeper API를 사용하여 활성 브로커 List을 찾을 수도 있습니다 .

3. Apache Kafka의 AdminClient 사용

생산자 또는 소비자가 Java 애플리케이션인 경우 Apache Kafka의 AdminClient 클래스를 사용하여 Kafka 서버가 작동하는지 여부를 확인할 수 있습니다 .

코드를 빠르게 테스트할 수 있도록 KafkaAdminClient 클래스를 정의하여 AdminClient 클래스  의 인스턴스를 래핑합니다 .

public class KafkaAdminClient {
    private final AdminClient client;

    public KafkaAdminClient(String bootstrap) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("request.timeout.ms", 3000);
        props.put("connections.max.idle.ms", 5000);

        this.client = AdminClient.create(props);
    }
}

다음으로 클라이언트가 실행 중인 브로커 서버에 연결할 수 있는지 확인하기 위해 KafkaAdminClient 클래스 에서 verifyConnection() 메서드를 정의해 보겠습니다 .

public boolean verifyConnection() throws ExecutionException, InterruptedException {
    Collection<Node> nodes = this.client.describeCluster()
      .nodes()
      .get();
    return nodes != null && nodes.size() > 0;
}

마지막으로 실행 중인 Kafka 클러스터에 연결하여 코드를 테스트해 보겠습니다.

@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception {
    boolean alive = kafkaAdminClient.verifyConnection();
    assertThat(alive).isTrue();
}

4. kcat 유틸리티 사용하기

kcat (이전 kafkacat ) 명령을 사용하여 실행 중인 Kafka 브로커 노드가 있는지 확인할 수 있습니다. 이를 위해 -L 옵션을 사용하여 기존 주제의 메타데이터를 표시해 보겠습니다 .

$ kcat -b localhost:9092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:9092/bootstrap):
 1 brokers:
  broker 0 at 192.168.1.53:9092 (controller)
 1 topics:
  topic "demo-topic" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0

다음으로 브로커 노드가 다운되었을 때 동일한 명령을 실행해 보겠습니다.

$ kcat -b localhost:9092 -t demo-topic -L -m 1
%3|1660579562.937|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 1ms in state CONNECT)
% ERROR: Failed to acquire metadata: Local: Broker transport failure (Are the brokers reachable? Also try increasing the metadata timeout with -m <timeout>?)

이 경우 실행 중인 브로커 노드가 없으므로 "연결 거부" 오류가 발생합니다. 또한 -m 옵션 을 사용하여 요청 시간 초과를 1초로 제한하여 빠르게 실패할 수 있었다는 점에 유의해야 합니다 .

5. UI 도구 사용

자동 검사가 필요하지 않은 실험적인 POC 프로젝트를 위해 Offset Explorer 와 같은 UI 도구를 사용할 수 있습니다 . 그러나 엔터프라이즈급 Kafka 클라이언트에 대한 브로커 노드의 상태를 확인하려는 경우에는 이 접근 방식을 권장하지 않습니다.

Offset Explorer를 사용하여 Zookeeper 호스트 및 포트 세부 정보를 사용하여 Kafka 클러스터에 연결해 보겠습니다.오프셋 탐색기를 사용하여 연결 확인

왼쪽 창에서 실행 중인 브로커 List을 볼 수 있습니다. 그게 다야. 우리는 버튼 클릭으로 그것을 얻었습니다.

6. 결론

이 사용방법(예제)에서는 Zookeeper 명령, Apache의 AdminClientkcat 유틸리티 를 사용한 몇 가지 명령줄 접근 방식과 Kafka 서버가 작동하는지 여부를 확인하기 위한 UI 기반 접근 방식을 살펴보았습니다.

항상 그렇듯이 사용방법(예제)의 전체 소스 코드는 GitHub에서 사용할 수 있습니다 .

res – REST with Spring (eBook) (everywhere)