1. 개요
Apache Kafka는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다.
이 빠른 사용방법(예제)에서는 Kafka 항목에서 메시지 수를 가져오는 기술을 배웁니다. 프로그래밍 방식과 기본 명령 기술을 시연합니다.
2. 프로그래밍 기법
Kafka 주제에는 여러 파티션이 있을 수 있습니다. 우리의 기술은 모든 파티션의 메시지 수를 세었는지 확인해야 합니다.
각 파티션을 살펴보고 최신 오프셋을 확인해야 합니다. 이를 위해 소비자를 소개합니다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
두 번째 단계는 이 소비자로부터 모든 파티션을 가져오는 것입니다 .
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
세 번째 단계는 각 파티션의 끝에서 소비자를 오프셋하고 파티션 맵에 결과를 기록하는 것 입니다 .
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));
마지막 단계는 각 파티션의 마지막 위치를 취하고 결과를 합산하여 주제의 메시지 수를 얻는 것입니다.
numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();
3. Kafka 기본 명령
프로그래밍 기술은 Kafka 주제의 메시지 수에 대해 자동화된 작업을 수행하려는 경우에 유용합니다. 그러나 분석 목적으로만 사용하는 경우 이러한 서비스를 만들고 시스템에서 실행하는 데 오버헤드가 발생합니다. 간단한 옵션은 기본 Kafka 명령을 사용하는 것입니다. 그것은 빠른 결과를 줄 것입니다.
3.1. GetoffsetShell 명령 사용
기본 명령을 실행하기 전에 머신에서 Kafka의 루트 폴더로 이동해야 합니다. 다음 명령은 baeldung 주제에 게시되는 메시지 수를 반환합니다 .
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092
--topic baeldung | awk -F ":" '{sum += $3} END {print "Result: "sum}'
Result: 3
3.2. 소비자 콘솔 사용
앞에서 설명한 것처럼 명령을 실행하기 전에 Kafka의 루트 폴더로 이동합니다. 다음 명령은 baeldung 주제에 대해 게시 중인 메시지 수를 반환합니다 .
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092
--property print.key=true --property print.value=false --property print.partition
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages
4. 결론
이 기사에서는 Kafka 주제에서 메시지 수를 가져오는 기술을 살펴보았습니다. 모든 파티션을 소비자에게 할당하고 최신 오프셋을 확인하는 프로그래밍 방식의 기술을 배웠습니다.
우리는 또한 두 가지 기본 Kafka 명령 기술을 보았습니다. 하나는 Kafka 도구의 GetoffsetShell 명령 이었습니다 . 다른 하나는 콘솔에서 소비자를 실행하고 처음부터 메시지 수를 인쇄하는 것이었습니다.
항상 그렇듯이 이 기사의 소스 코드는 GitHub 에서 찾을 수 있습니다 .