1. 개요

Apache Kafka 는 강력하고 분산된 내결함성 스트림 처리 시스템입니다. 이전 예제에서 우리는 Spring과 Kafka로 작업하는 방법을 배웠습니다 .

이 사용방법(예제)에서는 이전 버전을 기반으로 하고 실행 중인 외부 Kafka 서버에 의존하지 않는 안정적이고 독립적인 통합 테스트를 작성하는 방법을 배웁니다 .

먼저 시작하지만 Kafka의 포함된 인스턴스를 사용하고 구성하는 방법을 살펴보겠습니다. 그런 다음 테스트에서 인기 있는 프레임워크인 Testcontainers사용하는 방법을 살펴보겠습니다 .

2. 의존성

물론 pom.xml에 표준 spring-kafka 의존성추가해야 합니다 .

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

그런 다음 테스트를 위해 두 가지 의존성이 더 필요합니다 . 먼저 spring-kafka-test 아티팩트를 추가합니다 .

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

마지막으로 Maven Central 에서도 사용할 수 있는 Testcontainers Kafka 의존성을 추가합니다 .

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

필요한 모든 의존성을 구성했으므로 이제 Kafka를 사용하여 간단한 Spring Boot 애플리케이션을 작성할 수 있습니다.

3. 간단한 Kafka 생산자-소비자 애플리케이션

이 예제 전체에서 테스트의 초점은 단순한 생산자-소비자 Spring Boot Kafka 애플리케이션이 될 것입니다.

애플리케이션 진입점을 정의하여 시작하겠습니다.

@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

보시다시피 이것은 표준 Spring Boot 애플리케이션입니다. 가능한 경우 기본 구성 값을 사용하려고 합니다 . 이를 염두에 두고 @EnableAutoConfiguration 어노테이션을 사용하여 애플리케이션을 자동 구성합니다.

3.1. 프로듀서 설정

다음으로 주어진 Kafka 주제에 메시지를 보내는 데 사용할 생산자 빈을 살펴보겠습니다.

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

우리 KafkaProducer의 위에서 정의 된 빈은 단지 래퍼입니다 KafkaTemplate의 클래스입니다. 이 클래스는 제공된 주제로 데이터를 보내는 것과 같은 높은 수준의 스레드로부터 안전한 작업을 제공합니다. 이는 우리가 send 메서드 에서 수행하는 것과 정확히 같습니다 .

3.2. 소비자 설정

마찬가지로 이제 Kafka 주제를 수신하고 메시지를 수신하는 간단한 소비자 빈을 정의합니다.

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }
}

우리의 단순 소비자는 주어진 주제에 대한 메시지를 수신하기 위해 수신 메소드 @KafkaListener 어노테이션을 사용합니다 . 나중에 테스트 에서 test.topic구성하는 방법을 살펴보겠습니다 .

게다가, 수신 메소드는 빈에 메시지 내용을 저장하고 래치 변수 의 수를 감소시킵니다 . 이 변수는 메시지를 성공적으로 수신했는지 확인하기 위해 나중에 테스트에서 사용할 간단한 스레드 안전 카운터 필드입니다 .

이제 Spring Boot를 사용하는 간단한 Kafka 애플리케이션이 구현되었으므로 통합 테스트를 작성하는 방법을 살펴보겠습니다.

4. 테스트에 대한 한마디

일반적으로 깨끗한 통합 테스트를 작성할 때 제어할 수 없거나 갑자기 작동을 멈출 수 있는 외부 서비스에 의존해서는 안 됩니다. 이는 테스트 결과에 부정적인 영향을 미칠 수 있습니다.

마찬가지로 외부 서비스(이 경우 실행 중인 Kafka 브로커)에 의존하는 경우 테스트에서 원하는 방식으로 설정, 제어 및 해제할 수 없습니다.

4.1. 애플리케이션 속성

테스트에서 매우 간단한 애플리케이션 구성 속성 집합을 사용할 것입니다. src/test/resources/application.yml 파일 에서 이러한 속성을 정의 합니다.

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

이것은 포함된 Kafka 인스턴스 또는 로컬 브로커로 작업할 때 필요한 최소 속성 집합입니다.

이들 중 대부분은 자명 하지만 특히 중요하게 강조해야 하는 것은 소비자 속성 auto-offset-reset: early 입니다. 이 속성은 전송이 완료된 후 컨테이너가 시작될 수 있으므로 소비자 그룹이 우리가 보내는 메시지를 가져오도록 합니다.

또한 테스트에서 사용할 주제Embedded-test-topic 값을 사용하여 주제 속성을 구성합니다 .

5. 임베디드 Kafka를 사용한 테스트

이 섹션에서는 메모리 내 Kafka 인스턴스를 사용하여 테스트를 실행하는 방법을 살펴보겠습니다. 임베디드 카프카라고도 합니다.

이전에 추가한 의존성 spring-kafka-test 에는 애플리케이션 테스트를 지원하는 몇 가지 유용한 유틸리티가 포함되어 있습니다. 특히 EmbeddedKafkaBroker  클래스 가 포함되어 있습니다 .

이를 염두에 두고 첫 번째 통합 테스트를 작성해 보겠습니다.

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own simple KafkaProducer");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

테스트의 주요 부분을 살펴보겠습니다. 먼저 두 개의 꽤 표준적인 Spring 어노테이션으로 테스트 클래스를 장식하는 것으로 시작합니다.

  • @SpringBootTest 어노테이션은 우리의 테스트는 Spring 애플리케이션 컨텍스트를 부트 스트랩 보장합니다
  • 또한 @DirtiesContext 어노테이션을 사용하여 이 컨텍스트가 다른 테스트 간에 정리되고 재설정되는지 확인합니다.

여기에 중요한 부분이 있습니다. @EmbeddedKafka 어노테이션을 사용하여 EmbeddedKafkaBroker 의 인스턴스를 테스트 에 삽입합니다 . 또한 포함된 Kafka 노드를 구성하는 데 사용할 수 있는 몇 가지 속성이 있습니다.

  • 파티션 – 주제당 사용되는 파티션 수입니다. 일을 멋지고 단순하게 유지하기 위해 우리는 테스트에서 하나만 사용하기를 원합니다.
  • BrokerProperties – Kafka 브로커에 대한 추가 속성입니다. 다시 우리는 일을 단순하게 유지하고 일반 텍스트 리스너와 포트 번호를 지정합니다.

다음으로 소비자생산자 클래스를 자동 연결 하고 application.properties 의 값을 사용하도록 주제를 구성합니다 .

퍼즐의 마지막 부분의 경우 테스트 주제에 메시지를 보내고 메시지가 수신되었으며 테스트 주제의 이름이 포함되어 있는지 확인합니다 .

테스트를 실행할 때 장황한 Spring 출력을 볼 수 있습니다.

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = 우리만의 간단한 KafkaProducer로 보내기)'

이것은 우리의 테스트가 제대로 작동하고 있음을 확인시켜줍니다. 엄청난! 이제 메모리 내 Kafka 브로커를 사용하여 독립적이고 독립적인 통합 테스트를 작성할 수 있습니다.

6. TestContainers로 카프카 테스트하기

때때로 우리는 실제 외부 서비스와 테스트 목적으로 특별히 제공된 서비스의 임베디드 인메모리 인스턴스 사이에 작은 차이를 볼 수 있습니다. 가능성은 낮지만 테스트에서 사용된 포트가 점유되어 실패할 수도 있습니다 .

이를 염두에 두고 이 섹션에서는 Testcontainers 프레임워크를 사용하여 테스트하는 이전 접근 방식의 변형을 볼 수 있습니다 . 통합 테스트에서 Docker 컨테이너 내부에서 호스팅되는 외부 Apache Kafka 브로커를 인스턴스화하고 관리하는 방법을 살펴보겠습니다 .

이전 섹션에서 본 것과 매우 유사한 또 다른 통합 테스트를 정의해 보겠습니다.

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own controller");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

이번에는 차이점을 살펴보겠습니다. 표준 JUnit @ClassRule 인 kafka 필드를 선언하고 있습니다. 이 필드는 Kafka를 실행하는 컨테이너의 수명 주기를 준비하고 관리할 KafkaContainer 클래스 의 인스턴스입니다 .

포트 충돌을 피하기 위해 Testcontainers는 도커 컨테이너가 시작될 때 동적으로 포트 번호를 할당합니다. 이러한 이유로 KafkaTestContainersConfiguration 클래스를 사용하여 사용자 지정 소비자 및 생산자 팩토리 구성을 제공합니다 .

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

그런 다음 테스트 시작 시 @Import 어노테이션을 통해 이 구성을 참조합니다 .

그 이유는 앞에서 언급했듯이 동적으로 생성되는 서버 주소를 응용 프로그램에 주입하는 방법이 필요하기 때문입니다. 부트스트랩 서버 위치를 반환하는 getBootstrapServers() 메서드 를 호출하여 이를 달성합니다 .

bootstrap.servers = [PLAINTEXT://localhost:32789]

이제 테스트를 실행할 때 Testcontainers가 몇 가지 작업을 수행하는 것을 볼 수 있습니다.

  • 로컬 Docker 설정을 확인합니다.
  • 필요한 경우 confluentinc/cp-kafka:5.4.3 도커 이미지를 가져 옵니다 .
  • 새 컨테이너를 시작하고 준비될 때까지 기다립니다.
  • 마지막으로 테스트가 완료된 후 컨테이너를 종료하고 삭제합니다.

다시 말하지만, 이것은 테스트 출력을 검사하여 확인됩니다.

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

프레스토 악장! Kafka 도커 컨테이너를 사용하여 작동하는 통합 테스트.

7. 결론

이 기사에서 우리는 Spring Boot로 Kafka 애플리케이션을 테스트하기 위한 몇 가지 접근 방식에 대해 배웠습니다. 첫 번째 접근 방식에서는 로컬 메모리 내 Kafka 브로커를 구성하고 사용하는 방법을 살펴보았습니다.

그런 다음 테스트에서 도커 컨테이너 내부에서 실행되는 외부 Kafka 브로커를 설정하기 위해 Testcontainers를 사용하는 방법을 보았습니다.

항상 그렇듯이 기사의 전체 소스 코드는 GitHub에서 사용할 수  있습니다 .

Junit footer banner