1. 개요
생산자가 Apache Kafka에 메시지를 보내면 로그 파일에 추가하고 구성된 기간 동안 보관합니다.
이 자습서에서는 Kafka 주제에 대한 시간 기반 메시지 보존 속성 을 구성 하는 방법을 알아 봅니다 .
2. 시간 기반 보존
보존 기간 속성을 사용하면 메시지에 TTL (수명)이 있습니다. 만료되면 메시지가 삭제 표시되어 디스크 공간을 확보합니다.
동일한 보관 기간 속성이 지정된 Kafka 주제 내의 모든 메시지에 적용됩니다. 또한 토픽 생성 전에 이러한 속성을 설정하거나 기존 토픽에 대해 런타임시 변경할 수 있습니다 .
다음 섹션에서는 새 주제에 대한 보존 기간을 설정하기위한 브로커 구성 및 런타임에이를 제어하기 위한 주제 수준 구성을 통해이를 조정하는 방법을 알아 봅니다 .
3. 서버 수준 구성
Apache Kafka는 세 가지 시간 기반 구성 속성 중 정확히 하나를 구성하여 조정할 수 있는 서버 수준 보존 정책을 지원 합니다 .
- log.retention.hours
- log.retention.minutes
- log.retention.ms
Kafka가 낮은 정밀도 값을 높은 값으로 재정의한다는 것을 이해하는 것이 중요합니다. 따라서 log.retention.ms 가 가장 높은 우선 순위를 갖습니다 .
3.1. 기초
먼저 Apache Kafka 디렉토리 에서 grep 명령을 실행하여 보존 기본값을 검사 해 보겠습니다 .
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
여기에서 기본 보존 기간이 7 일임을 알 수 있습니다 .
10 분 동안 만 메시지를 보관하기 위해 config / server.properties 에서 log.retention.minutes 속성 값을 설정할 수 있습니다 .
log.retention.minutes=10
3.2. 새 주제의 보존 기간
Apache Kafka 패키지에는 관리 작업을 수행하는 데 사용할 수있는 여러 셸 스크립트가 포함되어 있습니다. 이를 사용하여이 자습서 과정에서 사용할 도우미 스크립트 functions.sh 를 만듭니다 .
에 두 가지 기능을 추가하여하자의 시작 functions.sh 하는 구성을 주제를 작성하고 설명 각각 :
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
다음으로 두 개의 독립 실행 형 스크립트 create-topic.sh 및 get-topic-retention-time.sh를 만들어 보겠습니다 .
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
describe_topic_config 는 주제에 대해 구성된 모든 속성을 제공 한다는 점에 유의해야합니다 . 따라서 awk one-liner를 사용 하여 retention.ms 속성에 대한 필터를 추가했습니다 .
마지막으로 Kafka 환경을 시작하고 새 샘플 주제에 대한 보존 기간 구성을 확인 하겠습니다 .
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
항목을 만들고 설명하면 retention.ms 가 600000 (10 분)으로 설정되어 있음을 알 수 있습니다. 이는 실제로 이전에 server.properties 파일에 정의한 log.retention.minutes 속성 에서 파생되었습니다 .
4. 주제 수준 구성
브로커 서버가 시작되면 log.retention. {hours | minutes | ms} 서버 수준 속성이 읽기 전용이됩니다 . 반면에 우리 는 항목 수준에서 조정할 수 있는 retention.ms 속성에 액세스 할 수 있습니다.
functions.sh 스크립트에 메소드를 추가 하여 주제의 속성을 구성 해 보겠습니다 .
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
그런 다음 alter-topic-config.sh 스크립트 내에서 이를 사용할 수 있습니다 .
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
마지막으로 테스트 주제에 대해 머무름 시간을 5 분으로 설정 하고 동일하게 확인합니다.
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
5. 검증
지금까지 Kafka 주제 내에서 메시지의 보존 기간을 구성하는 방법을 살펴 보았습니다. 보존 시간 초과 후 메시지가 실제로 만료되는지 확인할 때입니다.
5.1. 생산자-소비자
하자 추가 produce_message 및 consume_message 함수 functions.sh. 내부적으로 이들은 메시지를 생성 / 소비하기 위해 각각 kafka-console-producer.sh 및 kafka-console-consumer.sh를 사용합니다 .
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
우리는 Kafka에서 사용 가능한 모든 메시지 를 읽는 소비자가 필요하기 때문에 소비자는 항상 처음부터 메시지 를 읽습니다 .
다음으로 독립형 메시지 생성자를 생성 해 보겠습니다.
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
마지막으로 독립형 메시지 소비자를 만들어 보겠습니다.
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2. 메시지 만료
이제 기본 설정이 준비되었으므로 단일 메시지를 생성하고 즉시 두 번 사용하겠습니다.
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
따라서 소비자가 사용 가능한 메시지를 반복적으로 소비하고 있음을 알 수 있습니다.
이제 5 분의 절전 지연을 도입 한 다음 메시지 사용을 시도해 보겠습니다.
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
예상대로 메시지가 보존 기간을 초과했기 때문에 소비자가 사용할 메시지를 찾지 못했습니다 .
6. 제한
내부적으로 Kafka Broker는 log.retention.check.interval.ms 라는 또 다른 속성을 유지 합니다. 이 속성은 메시지의 만료 여부를 확인하는 빈도를 결정합니다.
따라서 보존 정책을 효과적으로 유지하려면 log.retention.check.interval.ms 의 값이 주어진 주제에 대한 retention.ms 의 속성 값보다 낮아야합니다 .
7. 결론
이 자습서에서는 메시지에 대한 시간 기반 보존 정책을 이해하기 위해 Apache Kafka를 탐색했습니다 . 이 과정에서 관리 작업을 단순화하기 위해 간단한 셸 스크립트를 만들었습니다. 나중에 우리는 보존 기간 후 메시지 만료를 확인하기 위해 독립형 소비자와 생산자를 만들었습니다.