1. 소개
이 사용방법(예제)에서는 SSL 인증을 사용하여 Spring Boot 클라이언트를 Apache Kafka 브로커에 연결하기 위한 기본 설정을 다룹니다.
SSL(Secure Sockets Layer)은 실제로 사용되지 않으며 2015년부터 TLS(Transport Layer Security)로 대체되었습니다. 그러나 역사적인 이유로 Kafka(및 Java)는 여전히 "SSL"을 참조하며 이 문서에서는 이 규칙을 따를 것입니다. 또한.
2. SSL 개요
기본적으로 Apache Kafka는 인증 없이 모든 데이터를 일반 텍스트로 보냅니다.
먼저 브로커와 클라이언트 간의 암호화를 위해 SSL을 구성할 수 있습니다. 기본적으로 클라이언트가 서버 인증서를 인증하는 공개 키 암호화를 사용하는 단방향 인증 이 필요합니다 .
또한 서버는 별도의 메커니즘(예: SSL 또는 SASL)을 사용하여 클라이언트를 인증할 수 있으므로 양방향 인증 또는 mTLS(상호 TLS)를 사용할 수 있습니다. 기본적으로 양방향 SSL 인증은 클라이언트와 서버 모두 SSL 인증서를 사용하여 서로의 신원을 확인하고 양방향으로 서로를 신뢰하도록 합니다 .
이 기사에서 브로커는 SSL을 사용하여 클라이언트를 인증 하고 keystore 및 truststore 는 인증서와 키를 보유하는 데 사용됩니다.
각 브로커에는 개인 키와 공개 인증서가 포함된 자체 키 저장소가 필요합니다. 클라이언트는 신뢰 저장소를 사용하여 이 인증서를 인증하고 서버를 신뢰합니다. 마찬가지로 각 클라이언트에는 개인 키와 공개 인증서가 포함된 자체 키 저장소도 필요합니다. 서버는 신뢰 저장소를 사용하여 클라이언트의 인증서를 인증 및 신뢰하고 Security 연결을 설정합니다.
신뢰 저장소는 인증서에 서명 할 수 있는 인증 기관(CA)을 포함할 수 있습니다 . 이 경우 브로커 또는 클라이언트는 truststore에 있는 CA에서 서명한 모든 인증서를 신뢰 합니다. 이렇게 하면 새 클라이언트나 브로커를 추가할 때 신뢰 저장소를 변경할 필요가 없으므로 인증서 인증이 간소화됩니다.
3. 의존성 및 설정
예제 애플리케이션은 간단한 Spring Boot 애플리케이션입니다.
Kafka에 연결하기 위해 POM 파일에 spring-kafka 의존성을 추가해 보겠습니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
또한 Docker Compose 파일을 사용하여 Kafka 서버 설정을 구성하고 테스트할 것입니다. 처음에는 SSL 구성 없이 이 작업을 수행해 보겠습니다.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
이제 컨테이너를 시작하겠습니다.
docker-compose up
이것은 기본 구성으로 브로커를 가져와야 합니다.
4. 브로커 구성
Security 연결을 설정하기 위해 브로커에 필요한 최소 구성부터 살펴보겠습니다.
4.1. 독립형 브로커
이 예제에서는 브로커의 독립 실행형 인스턴스를 사용하지 않지만 SSL 인증을 활성화하기 위해 필요한 구성 변경을 아는 것이 유용합니다.
먼저 server.properties 의 포트 9093에서 SSL 연결을 수신하도록 브로커를 구성 해야 합니다 .
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
다음으로 인증서 위치 및 자격 증명을 사용 하여 키 저장소 및 신뢰 저장소 관련 속성을 구성해야 합니다 .
ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password
마지막으로 양방향 인증을 달성하려면 클라이언트를 인증하도록 브로커를 구성해야 합니다 .
ssl.client.auth=required
4.2. 도커 작성
Compose를 사용하여 브로커 환경을 관리하므로 위의 모든 속성을 docker-compose.yml 파일에 추가해 보겠습니다.
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
...
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
volumes:
- ./certs/:/etc/kafka/secrets/certs
여기에서는 구성의 포트 섹션에 SSL 포트(9093)를 노출했습니다. 또한 구성의 볼륨 섹션에 certs 프로젝트 폴더를 마운트했습니다 . 여기에는 필수 인증서 및 관련 자격 증명이 포함됩니다.
이제 Compose를 사용하여 스택을 다시 시작하면 브로커 로그에 관련 SSL 세부 정보가 표시됩니다.
...
kafka_1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1 | ===> Configuring ...
<strong>kafka_1 | SSL is enabled.</strong>
....
kafka_1 | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1 | advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1 | ssl.client.auth = required</strong>
<strong>kafka_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1 | ssl.endpoint.identification.algorithm = https
kafka_1 | ssl.key.password = [hidden]
kafka_1 | ssl.keymanager.algorithm = SunX509
<strong>kafka_1 | ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1 | ssl.keystore.password = [hidden]
kafka_1 | ssl.keystore.type = JKS
kafka_1 | ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1 | ssl.protocol = TLSv1.3</strong>
kafka_1 | ssl.trustmanager.algorithm = PKIX
kafka_1 | ssl.truststore.certificates = null
<strong>kafka_1 | ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1 | ssl.truststore.password = [hidden]
kafka_1 | ssl.truststore.type = JKS
....
5. 스프링 부트 클라이언트
이제 서버 설정이 완료되었으므로 필요한 Spring Boot 구성 요소를 생성합니다. 이제 양방향 인증을 위해 SSL이 필요한 브로커와 상호 작용합니다.
5.1. 생산자
먼저 KafkaTemplate 을 사용하여 지정된 주제에 메시지를 보내 겠습니다 .
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}
send 메서드 는 비동기 작업입니다. 따라서 브로커가 메시지를 수신하면 일부 정보만 기록하는 간단한 콜백을 첨부했습니다.
5.2. 소비자
다음으로 @KafkaListener 를 사용하여 간단한 소비자를 만들어 보겠습니다 . 이는 브로커에 연결하고 생산자가 사용하는 것과 동일한 주제의 메시지를 소비합니다.
public class KafkaConsumer {
public static final String TOPIC = "test-topic";
public final List<String> messages = new ArrayList<>();
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
}
데모 애플리케이션에서는 일을 단순하게 유지했으며 소비자는 메시지를 List 에 저장하기만 하면 됩니다. 실제 실제 시스템에서 소비자는 메시지를 수신하고 애플리케이션의 비즈니스 논리에 따라 처리합니다.
5.3. 구성
마지막으로 application.yml 에 필요한 구성을 추가해 보겠습니다 .
spring:
kafka:
security:
protocol: "SSL"
bootstrap-servers: localhost:9093
ssl:
trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
trust-store-password: <password>
key-store-location: classpath:/client-certs/kafka.client.keystore.jks
key-store-password: <password>
# additional config for producer/consumer
여기에서는 생산자와 소비자를 구성하기 위해 Spring Boot에서 제공하는 필수 속성을 설정했습니다. 이 두 구성 요소가 모두 동일한 브로커에 연결되므로 spring.kafka 섹션에서 모든 필수 속성을 선언할 수 있습니다. 그러나 생산자와 소비자가 서로 다른 브로커에 연결하는 경우 각각 spring.kafka.producer 및 spring.kafka.consumer 섹션에 지정합니다.
구성 의 ssl 섹션 에서 Kafka 브로커를 인증하기 위해 JKS 신뢰 저장소를 가리킵니다 . 여기에는 브로커 인증서에도 서명한 CA의 인증서가 포함됩니다. 또한 브로커 측의 신뢰 저장소에 있어야 하는 CA에서 서명한 인증서를 포함하는 Spring 클라이언트 키 저장소의 경로도 제공했습니다 .
5.4. 테스트
Compose 파일을 사용하고 있으므로 Testcontainers 프레임워크를 사용하여 Producer 및 Consumer 로 종단 간 테스트를 생성해 보겠습니다 .
@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {
private static final String KAFKA_SERVICE = "kafka";
private static final int SSL_PORT = 9093;
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
.withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
String message = generateSampleMessage();
kafkaProducer.sendMessage(message, TOPIC);
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
private static String generateSampleMessage() {
return UUID.randomUUID().toString();
}
}
테스트를 실행할 때 Testcontainers는 SSL 구성을 포함하여 Compose 파일을 사용하여 Kafka 브로커를 시작합니다. 또한 응용 프로그램은 SSL 구성으로 시작하여 암호화되고 인증된 연결을 통해 브로커에 연결됩니다. 이것은 이벤트의 비동기 시퀀스이므로 Awaitlity 를 사용 하여 소비자 메시지 저장소에서 예상되는 메시지를 폴링했습니다. 이는 브로커와 클라이언트 간의 모든 구성 및 성공적인 양방향 인증을 확인합니다.
6. 결론
이 기사에서는 Kafka 브로커와 Spring Boot 클라이언트 간에 필요한 SSL 인증 설정의 기본 사항을 다루었습니다.
처음에는 양방향 인증을 활성화하는 데 필요한 브로커 설정을 살펴보았습니다. 그런 다음 암호화되고 인증된 연결을 통해 브로커에 연결하기 위해 클라이언트 측에 필요한 구성을 살펴보았습니다. 마지막으로 통합 테스트를 사용하여 브로커와 클라이언트 간의 Security 연결을 확인했습니다.
항상 그렇듯이 전체 소스 코드는 GitHub에서 사용할 수 있습니다 .