Spring

Kafka 및 Websocket을 사용한 Spring WebFlux

기록만이살길 2021. 3. 17. 15:03
반응형

Kafka 및 Websocket을 사용한 Spring WebFlux

1. 질문(문제점):

지금은 My SpringBoot 애플리케이션에 구현 된 간단한 Kafka Consumer 및 Producer가 있습니다. 다음 작업은 내 소비자가 소비 된 메시지를 받아 모든 구독 클라이언트에 직접 브로드 캐스트하는 것입니다. WebFlux와 함께 STOMP Messaging을 사용할 수 없다는 것을 알았으므로 어떻게이 작업을 수행 할 수 있습니까? 반응 형 WebSocket 구현을 보았지만 소비 된 데이터를 내 웹 소켓으로 보낼 수있는 방법을 알지 못했습니다.

그것은 내 간단한 KafkaProducer입니다.

fun addMessage(message: Message){
        val headers : MutableMap<String, Any> = HashMap()
        headers[KafkaHeaders.TOPIC] = topicName
        kafkaTemplate.send(GenericMessage<Message>(message, headers))
    }

그리고 내 간단한 소비자는 다음과 같습니다.

@KafkaListener(topics = ["mytopic"], groupId = "test-consumer-group")
    fun receiveData(message:Message) :Message{
        //Take consumed data and send to websocket
    }

2. 해결방안:

나는 Sinks.many().multicast().onBackpressureBuffer()글로벌 중간 컨테이너로 고려할 것 입니다. 그런 다음에 receiveData()당신은 싱크 가 원자로 추상화에 데이터를.

WebSocket 연결 세션의 경우 API 를 구현 org.springframework.web.reactive.socket.WebSocketHandler하고 사용 하는 것이 좋습니다 . 이렇게하면 모든 세션이이 WebSocket 서버에 연결되어있는 한 동일한 Kafka 데이터를 소비하게됩니다.Sinks.Many.asFlux()WebSocketSession.send(Publisher<WebSocketMessage> messages)

문서에서 더 많은 정보보기 : https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-websockethandler

최신 정보

여기에서 몇 가지 샘플을 찾을 수 있습니다 : https://github.com/artembilan/sandbox/tree/master/so-65667450

65667450
반응형