카테고리 없음

외부 이벤트에 대한 구독 종료

기록만이살길 2021. 2. 20. 13:31
반응형

외부 이벤트에 대한 구독 종료

1. 질문(문제점):

애플리케이션에서는 외부 HTTP 엔드 포인트에 긴 폴링을 사용합니다. 나는 Spring의 반응을 사용하여 이것을한다 WebClient. 애플리케이션이 중지 될 때 깔끔하게 종료하기 위해 (그리고 추악한 Netty 스택 추적을 방지하기 위해) Spring이 내 빈을 중지 할 때 호출 takeUntil()하는의 인스턴스와 함께 사용합니다 (I 구현 ).EmitterProcessoronNext()SmartLifecycle

모든 것이 다음과 같이 작동합니다.

@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
    private boolean running = true;

    private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();

    private final BackendMessageReceiver backendMessageReceiver;

    public void waitForMessages() {
        Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
            .repeat()
            .takeUntilOther(shutdown)
            .subscribe(event -> {
                // do something when the http endpoint answers
            });
    }

    @Override
    public int getPhase() {
        // We need to cancel the subscriptions before Reactor/Netty shuts down.
        // Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
        return 0;
    }

    @Override
    public void start() {
        // Not needed
    }

    @Override
    public void stop() {
        log.info("Stopping message subscriptions");
        shutdown.onNext(true);
        shutdown.onComplete();
        running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

지금은 전체 메커니즘이 잘 작동합니다. 그러나 EmitterProcessor로 표시되고 @Deprecatedjavadoc은 Sink대신 a를 사용하도록 말합니다 . 인터페이스를 Sink구현하지 않으므로에 Publisher전달할 수 없습니다 takeUntilOther().

Project Reactor <3.5에 갇히지 않고이 문제를 해결하려면 어떻게해야합니까?

2. 해결방안:

Sinks프로그래밍 방식으로 반응 이벤트를 트리거하기위한 개발자 용 API로 사용됩니다. 이것은 전형적인 것으로 Flux또는 Mono나머지 응용 프로그램에 이를 제시 할 방법이 없다면 그다지 유용하지 않을 것 입니다.

Sinks.ManyasFlux()그 효과에 대한 견해가 있습니다. 유사하게, Sinks.One그리고 Sinks.EmptyasMono()보기.

그것이 당신이 takeUntilOther.

65920864
반응형