1. 개요
Apache Kafka 는 강력하고 분산된 내결함성 스트림 처리 시스템입니다. 이전 예제에서 우리는 Spring과 Kafka로 작업하는 방법을 배웠습니다 .
이 사용방법(예제)에서는 이전 버전을 기반으로 하고 실행 중인 외부 Kafka 서버에 의존하지 않는 안정적이고 독립적인 통합 테스트를 작성하는 방법을 배웁니다 .
먼저 시작하지만 Kafka의 포함된 인스턴스를 사용하고 구성하는 방법을 살펴보겠습니다. 그런 다음 테스트에서 인기 있는 프레임워크인 Testcontainers 를 사용하는 방법을 살펴보겠습니다 .
2. 의존성
물론 pom.xml에 표준 spring-kafka 의존성 을 추가해야 합니다 .
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
그런 다음 테스트를 위해 두 가지 의존성이 더 필요합니다 . 먼저 spring-kafka-test 아티팩트를 추가합니다 .
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.3.RELEASE</version>
<scope>test</scope>
</dependency>
마지막으로 Maven Central 에서도 사용할 수 있는 Testcontainers Kafka 의존성을 추가합니다 .
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
필요한 모든 의존성을 구성했으므로 이제 Kafka를 사용하여 간단한 Spring Boot 애플리케이션을 작성할 수 있습니다.
3. 간단한 Kafka 생산자-소비자 애플리케이션
이 예제 전체에서 테스트의 초점은 단순한 생산자-소비자 Spring Boot Kafka 애플리케이션이 될 것입니다.
애플리케이션 진입점을 정의하여 시작하겠습니다.
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerConsumerApplication.class, args);
}
}
보시다시피 이것은 표준 Spring Boot 애플리케이션입니다. 가능한 경우 기본 구성 값을 사용하려고 합니다 . 이를 염두에 두고 @EnableAutoConfiguration 어노테이션을 사용하여 애플리케이션을 자동 구성합니다.
3.1. 프로듀서 설정
다음으로 주어진 Kafka 주제에 메시지를 보내는 데 사용할 생산자 빈을 살펴보겠습니다.
@Component
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
우리 KafkaProducer의 위에서 정의 된 빈은 단지 래퍼입니다 KafkaTemplate의 클래스입니다. 이 클래스는 제공된 주제로 데이터를 보내는 것과 같은 높은 수준의 스레드로부터 안전한 작업을 제공합니다. 이는 우리가 send 메서드 에서 수행하는 것과 정확히 같습니다 .
3.2. 소비자 설정
마찬가지로 이제 Kafka 주제를 수신하고 메시지를 수신하는 간단한 소비자 빈을 정의합니다.
@Component
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
}
우리의 단순 소비자는 주어진 주제에 대한 메시지를 수신하기 위해 수신 메소드 에 @KafkaListener 어노테이션을 사용합니다 . 나중에 테스트 에서 test.topic 을 구성하는 방법을 살펴보겠습니다 .
게다가, 수신 메소드는 빈에 메시지 내용을 저장하고 래치 변수 의 수를 감소시킵니다 . 이 변수는 메시지를 성공적으로 수신했는지 확인하기 위해 나중에 테스트에서 사용할 간단한 스레드 안전 카운터 필드입니다 .
이제 Spring Boot를 사용하는 간단한 Kafka 애플리케이션이 구현되었으므로 통합 테스트를 작성하는 방법을 살펴보겠습니다.
4. 테스트에 대한 한마디
일반적으로 깨끗한 통합 테스트를 작성할 때 제어할 수 없거나 갑자기 작동을 멈출 수 있는 외부 서비스에 의존해서는 안 됩니다. 이는 테스트 결과에 부정적인 영향을 미칠 수 있습니다.
마찬가지로 외부 서비스(이 경우 실행 중인 Kafka 브로커)에 의존하는 경우 테스트에서 원하는 방식으로 설정, 제어 및 해제할 수 없습니다.
4.1. 애플리케이션 속성
테스트에서 매우 간단한 애플리케이션 구성 속성 집합을 사용할 것입니다. src/test/resources/application.yml 파일 에서 이러한 속성을 정의 합니다.
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
이것은 포함된 Kafka 인스턴스 또는 로컬 브로커로 작업할 때 필요한 최소 속성 집합입니다.
이들 중 대부분은 자명 하지만 특히 중요하게 강조해야 하는 것은 소비자 속성 auto-offset-reset: early 입니다. 이 속성은 전송이 완료된 후 컨테이너가 시작될 수 있으므로 소비자 그룹이 우리가 보내는 메시지를 가져오도록 합니다.
또한 테스트에서 사용할 주제 인 Embedded-test-topic 값을 사용하여 주제 속성을 구성합니다 .
5. 임베디드 Kafka를 사용한 테스트
이 섹션에서는 메모리 내 Kafka 인스턴스를 사용하여 테스트를 실행하는 방법을 살펴보겠습니다. 임베디드 카프카라고도 합니다.
이전에 추가한 의존성 spring-kafka-test 에는 애플리케이션 테스트를 지원하는 몇 가지 유용한 유틸리티가 포함되어 있습니다. 특히 EmbeddedKafkaBroker 클래스 가 포함되어 있습니다 .
이를 염두에 두고 첫 번째 통합 테스트를 작성해 보겠습니다.
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
테스트의 주요 부분을 살펴보겠습니다. 먼저 두 개의 꽤 표준적인 Spring 어노테이션으로 테스트 클래스를 장식하는 것으로 시작합니다.
- @SpringBootTest 어노테이션은 우리의 테스트는 Spring 애플리케이션 컨텍스트를 부트 스트랩 보장합니다
- 또한 @DirtiesContext 어노테이션을 사용하여 이 컨텍스트가 다른 테스트 간에 정리되고 재설정되는지 확인합니다.
여기에 중요한 부분이 있습니다. @EmbeddedKafka 어노테이션을 사용하여 EmbeddedKafkaBroker 의 인스턴스를 테스트 에 삽입합니다 . 또한 포함된 Kafka 노드를 구성하는 데 사용할 수 있는 몇 가지 속성이 있습니다.
- 파티션 – 주제당 사용되는 파티션 수입니다. 일을 멋지고 단순하게 유지하기 위해 우리는 테스트에서 하나만 사용하기를 원합니다.
- BrokerProperties – Kafka 브로커에 대한 추가 속성입니다. 다시 우리는 일을 단순하게 유지하고 일반 텍스트 리스너와 포트 번호를 지정합니다.
다음으로 소비자 및 생산자 클래스를 자동 연결 하고 application.properties 의 값을 사용하도록 주제를 구성합니다 .
퍼즐의 마지막 부분의 경우 테스트 주제에 메시지를 보내고 메시지가 수신되었으며 테스트 주제의 이름이 포함되어 있는지 확인합니다 .
테스트를 실행할 때 장황한 Spring 출력을 볼 수 있습니다.
...
12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer -
sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
INFO c.b.kafka.embedded.KafkaConsumer - received payload=
'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
CreateTime = 1605267935099, serialized key size = -1,
serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
key = null, value = 우리만의 간단한 KafkaProducer로 보내기)'
이것은 우리의 테스트가 제대로 작동하고 있음을 확인시켜줍니다. 엄청난! 이제 메모리 내 Kafka 브로커를 사용하여 독립적이고 독립적인 통합 테스트를 작성할 수 있습니다.
6. TestContainers로 카프카 테스트하기
때때로 우리는 실제 외부 서비스와 테스트 목적으로 특별히 제공된 서비스의 임베디드 인메모리 인스턴스 사이에 작은 차이를 볼 수 있습니다. 가능성은 낮지만 테스트에서 사용된 포트가 점유되어 실패할 수도 있습니다 .
이를 염두에 두고 이 섹션에서는 Testcontainers 프레임워크를 사용하여 테스트하는 이전 접근 방식의 변형을 볼 수 있습니다 . 통합 테스트에서 Docker 컨테이너 내부에서 호스팅되는 외부 Apache Kafka 브로커를 인스턴스화하고 관리하는 방법을 살펴보겠습니다 .
이전 섹션에서 본 것과 매우 유사한 또 다른 통합 테스트를 정의해 보겠습니다.
@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {
@ClassRule
public static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
@Value("${test.topic}")
private String topic;
@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}
이번에는 차이점을 살펴보겠습니다. 표준 JUnit @ClassRule 인 kafka 필드를 선언하고 있습니다. 이 필드는 Kafka를 실행하는 컨테이너의 수명 주기를 준비하고 관리할 KafkaContainer 클래스 의 인스턴스입니다 .
포트 충돌을 피하기 위해 Testcontainers는 도커 컨테이너가 시작될 때 동적으로 포트 번호를 할당합니다. 이러한 이유로 KafkaTestContainersConfiguration 클래스를 사용하여 사용자 지정 소비자 및 생산자 팩토리 구성을 제공합니다 .
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
// more standard configuration
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
// more standard configuration
return new DefaultKafkaProducerFactory<>(configProps);
}
그런 다음 테스트 시작 시 @Import 어노테이션을 통해 이 구성을 참조합니다 .
그 이유는 앞에서 언급했듯이 동적으로 생성되는 서버 주소를 응용 프로그램에 주입하는 방법이 필요하기 때문입니다. 부트스트랩 서버 위치를 반환하는 getBootstrapServers() 메서드 를 호출하여 이를 달성합니다 .
bootstrap.servers = [PLAINTEXT://localhost:32789]
이제 테스트를 실행할 때 Testcontainers가 몇 가지 작업을 수행하는 것을 볼 수 있습니다.
- 로컬 Docker 설정을 확인합니다.
- 필요한 경우 confluentinc/cp-kafka:5.4.3 도커 이미지를 가져 옵니다 .
- 새 컨테이너를 시작하고 준비될 때까지 기다립니다.
- 마지막으로 테스트가 완료된 후 컨테이너를 종료하고 삭제합니다.
다시 말하지만, 이것은 테스트 출력을 검사하여 확인됩니다.
13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3]
- Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
프레스토 악장! Kafka 도커 컨테이너를 사용하여 작동하는 통합 테스트.
7. 결론
이 기사에서 우리는 Spring Boot로 Kafka 애플리케이션을 테스트하기 위한 몇 가지 접근 방식에 대해 배웠습니다. 첫 번째 접근 방식에서는 로컬 메모리 내 Kafka 브로커를 구성하고 사용하는 방법을 살펴보았습니다.
그런 다음 테스트에서 도커 컨테이너 내부에서 실행되는 외부 Kafka 브로커를 설정하기 위해 Testcontainers를 사용하는 방법을 보았습니다.