지금은 My SpringBoot 애플리케이션에 구현 된 간단한 Kafka Consumer 및 Producer가 있습니다. 다음 작업은 내 소비자가 소비 된 메시지를 받아 모든 구독 클라이언트에 직접 브로드 캐스트하는 것입니다. WebFlux와 함께 STOMP Messaging을 사용할 수 없다는 것을 알았으므로 어떻게이 작업을 수행 할 수 있습니까? 반응 형 WebSocket 구현을 보았지만 소비 된 데이터를 내 웹 소켓으로 보낼 수있는 방법을 알지 못했습니다.
그것은 내 간단한 KafkaProducer입니다.
fun addMessage(message: Message){
val headers : MutableMap<String, Any> = HashMap()
headers[KafkaHeaders.TOPIC] = topicName
kafkaTemplate.send(GenericMessage<Message>(message, headers))
}
그리고 내 간단한 소비자는 다음과 같습니다.
@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
fun receiveData(message:Message) :Message{
//Take consumed data and send to websocket
}