기록만이살길

Apache Kafka 항목의 메시지 수 가져오기

Spring

Apache Kafka 항목의 메시지 수 가져오기

기록만이살길 2023. 3. 22. 00:00
반응형

1. 개요

Apache Kafka는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다.

이 빠른 사용방법(예제)에서는 Kafka 항목에서 메시지 수를 가져오는 기술을 배웁니다. 프로그래밍 방식과 기본 명령 기술을 시연합니다.

2. 프로그래밍 기법

Kafka 주제에는 여러 파티션이 있을 수 있습니다. 우리의 기술은 모든 파티션의 메시지 수를 세었는지 확인해야 합니다.

각 파티션을 살펴보고 최신 오프셋을 확인해야 합니다. 이를 위해 소비자를 소개합니다.

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

두 번째 단계는 이 소비자로부터 모든 파티션을 가져오는 것입니다 .

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
    .collect(Collectors.toList());

세 번째 단계는 각 파티션의 끝에서 소비자를 오프셋하고 파티션 맵에 결과를 기록하는 것 입니다 .

consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));

마지막 단계는 각 파티션의 마지막 위치를 취하고 결과를 합산하여 주제의 메시지 수를 얻는 것입니다.

numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();

3. Kafka 기본 명령

프로그래밍 기술은 Kafka 주제의 메시지 수에 대해 자동화된 작업을 수행하려는 경우에 유용합니다. 그러나 분석 목적으로만 사용하는 경우 이러한 서비스를 만들고 시스템에서 실행하는 데 오버헤드가 발생합니다. 간단한 옵션은 기본 Kafka 명령을 사용하는 것입니다. 그것은 빠른 결과를 줄 것입니다.

3.1. GetoffsetShell 명령 사용

기본 명령을 실행하기 전에 머신에서 Kafka의 루트 폴더로 이동해야 합니다. 다음 명령은 baeldung 주제에 게시되는 메시지 수를 반환합니다 .

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell   --broker-list localhost:9092   
--topic baeldung   | awk -F  ":" '{sum += $3} END {print "Result: "sum}'
Result: 3

3.2. 소비자 콘솔 사용

앞에서 설명한 것처럼 명령을 실행하기 전에 Kafka의 루트 폴더로 이동합니다. 다음 명령은 baeldung 주제에 대해 게시 중인 메시지 수를 반환합니다 .

$ bin/kafka-console-consumer.sh  --from-beginning  --bootstrap-server localhost:9092 
--property print.key=true --property print.value=false --property print.partition 
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages

4. 결론

이 기사에서는 Kafka 주제에서 메시지 수를 가져오는 기술을 살펴보았습니다. 모든 파티션을 소비자에게 할당하고 최신 오프셋을 확인하는 프로그래밍 방식의 기술을 배웠습니다.

우리는 또한 두 가지 기본 Kafka 명령 기술을 보았습니다. 하나는 Kafka 도구의 GetoffsetShell 명령 이었습니다 . 다른 하나는 콘솔에서 소비자를 실행하고 처음부터 메시지 수를 인쇄하는 것이었습니다.

항상 그렇듯이 이 기사의 소스 코드는 GitHub 에서 찾을 수 있습니다 .

res – REST with Spring (eBook) (everywhere)
참고
  • https://docs.spring.io/spring-framework/docs/current/reference/html
  • https://www.baeldung.com/java-kafka-count-topic-messages
반응형