애플리케이션에서는 외부 HTTP 엔드 포인트에 긴 폴링을 사용합니다. 나는 Spring의 반응을 사용하여 이것을한다 WebClient
. 애플리케이션이 중지 될 때 깔끔하게 종료하기 위해 (그리고 추악한 Netty 스택 추적을 방지하기 위해) Spring이 내 빈을 중지 할 때 호출 takeUntil()
하는의 인스턴스와 함께 사용합니다 (I 구현 ).EmitterProcessor
onNext()
SmartLifecycle
모든 것이 다음과 같이 작동합니다.
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
지금은 전체 메커니즘이 잘 작동합니다. 그러나 EmitterProcessor
로 표시되고 @Deprecated
javadoc은 Sink
대신 a를 사용하도록 말합니다 . 인터페이스를 Sink
구현하지 않으므로에 Publisher
전달할 수 없습니다 takeUntilOther()
.
Project Reactor <3.5에 갇히지 않고이 문제를 해결하려면 어떻게해야합니까?