카테고리 없음

서비스 활성화 기가 입력 채널로 kafka 성공 채널에 매핑되었지만 kafka에서 실행되지 않음 성공

기록만이살길 2021. 2. 23. 17:05
반응형

서비스 활성화 기가 입력 채널로 kafka 성공 채널에 매핑되었지만 kafka에서 실행되지 않음 성공

1. 질문(문제점):

그래서 성공과 실패를위한 채널이있는 Kafka 아웃 바운드 메시지 어댑터를 구성 했으므로 kafka 게시 결과에 따라 사후 처리를 수행 할 수 있습니다.

@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
    handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
    handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
    handler.setSendFailureChannel(kafkaFailuresChannel());
    return handler;
} 

그런 다음 성공적으로 보낸 메시지를 메시지 저장소에도 저장하는이 성공 채널에 서비스 활성화기를 연결합니다.

@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
    return MessageChannels.direct("kafkaSuccessChannel").get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
    mongoHandler.setMongoConverter(mongoConverter);
    mongoHandler.setLoggingEnabled(TRUE);
    SpelExpressionParser parser = new SpelExpressionParser();
    mongoHandler.setCollectionNameExpression(
            parser.parseExpression(
                    "headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
    return mongoHandler;
}

성공하지 못한 게시의 경우 서비스 활성화 기가 호출되기를 기대합니다.

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    //publish messsge to KAfka TOpic
      ...
    //assert message saved in MongoDB
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
            .containsOnly("JRASERVER-2000");
}

마지막 어설 션이 실패하고 로그에서 제작자가 주제에 게시 한 후 성공 채널에 대한 호출이 표시되지 않습니다.

2. 해결방안:

Gary가 그의 의견에서 말했듯이, sendSuccessChannel메인 JUnit 러너와 다른 스레드에서 비동기 적으로 호출됩니다. 실제로 FutureKafka 클라이언트에서 완료 콜백입니다 .

따라서 모든 것이 Kafka로 전송 된 후 MongoDB에 도착하도록하려면 일반 findAll(). 다른 스레드가 해당 채널에 메시지를 보내고 MongoDb 컬렉션에 문서를 저장하는 작업을 수행했는지 확인하려면 일정 기간 동안 이러한 호출을 여러 번 반복해야합니다.

이를 위해 우리가 실제로 테스트에서 사용하는 Awaitility 도구를 제안 할 수 있습니다 : https://github.com/awaitility/awaitility

65816777
반응형