그래서 성공과 실패를위한 채널이있는 Kafka 아웃 바운드 메시지 어댑터를 구성 했으므로 kafka 게시 결과에 따라 사후 처리를 수행 할 수 있습니다.
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
그런 다음 성공적으로 보낸 메시지를 메시지 저장소에도 저장하는이 성공 채널에 서비스 활성화기를 연결합니다.
@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
return MessageChannels.direct("kafkaSuccessChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoHandler.setMongoConverter(mongoConverter);
mongoHandler.setLoggingEnabled(TRUE);
SpelExpressionParser parser = new SpelExpressionParser();
mongoHandler.setCollectionNameExpression(
parser.parseExpression(
"headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
return mongoHandler;
}
성공하지 못한 게시의 경우 서비스 활성화 기가 호출되기를 기대합니다.
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
//publish messsge to KAfka TOpic
...
//assert message saved in MongoDB
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
.containsOnly("JRASERVER-2000");
}
마지막 어설 션이 실패하고 로그에서 제작자가 주제에 게시 한 후 성공 채널에 대한 호출이 표시되지 않습니다.