1. 소개

Project Reactor는 JVM을 위한 완전한 비차단 프로그래밍 기반을 제공합니다. Reactive Streams 사양 구현을 제공하고 Flux와 같은 구성 가능한 비동기 API를 제공합니다. Flux는 여러 반응 연산자가 있는 반응 스트림 게시자입니다. 0에서 N까지의 요소를 내보낸 다음 성공적으로 또는 오류와 함께 완료됩니다. 필요에 따라 여러 가지 방법으로 만들 수 있습니다 .

2. 플럭스의 이해

Flux는 0에서 N개의 요소를 내보낼 수 있는 Reactive Stream 게시자입니다 . Flux 시퀀스를 생성, 오케스트레이션 및 변환하는 데 사용되는 여러 연산자가 있습니다. Flux는 성공적으로 완료되거나 오류와 함께 완료될 수 있습니다.

Flux API는 소스를 만들거나 여러 콜백 유형에서 생성하기 위해 Flux에서 여러 가지 정적 팩터리 메서드를 제공합니다. 또한 비동기 처리 파이프라인을 구축하기 위한 인스턴스 메서드 및 연산자를 제공합니다. 이 파이프라인은 비동기 시퀀스를 생성합니다.

다음 섹션에서는 Flux generate()create() 메서드의 몇 가지 사용법을 살펴보겠습니다.

3. 메이븐 의존성

reactor-core  및  react-test Maven 의존성 이 필요합니다 .

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.17</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.4.17</version>
    <scope>test</scope>
</dependency>

4. 플럭스 생성

Flux API 의 generate() 메서드는 Flux를 생성하는 간단하고 직관적인 프로그래밍 방식을 제공합니다. generate() 메서드는 생성기 함수를 사용하여 일련의 항목을 생성합니다 .

생성 방법에는 세 가지 변형이 있습니다.

  • generate(소비자<SynchronousSink<T>> 생성기)
  • 생성(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> 생성기)
  • generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

generate 메소드는 필요에 따라 값을 계산하고 내보냅니다 . 다운스트림에서 사용할 수 없는 요소를 계산하는 데 비용이 너무 많이 드는 경우에 사용하는 것이 좋습니다. 방출된 이벤트가 애플리케이션 상태의 영향을 받는 경우에도 사용할 수 있습니다.

4.1.

이 예제에서, Flux 를 생성하기 위해 generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 를 사용하자 :

public class CharacterGenerator {
    
    public Flux<Character> generateCharacters() {
        
        return Flux.generate(() -> 97, (state, sink) -> {
            char value = (char) state.intValue();
            sink.next(value);
            if (value == 'z') {
                sink.complete();
            }
            return state + 1;
        });
    }
}

generate() 메소드 에서 두 개의 함수를 인수로 제공합니다.

  • 첫 번째는 Callable 함수입니다. 이 함수는 값이 97인 생성기의 초기 상태를 정의합니다.
  • 두 번째는 BiFunction입니다. SynchronousSink 를 사용하는 생성기 함수입니다 . 이 SynchronousSink는 싱크의 next 메서드가 호출 될 때마다 항목을 반환합니다.

이름에 따라 SynchronousSink 인스턴스는 동기식으로 작동합니다. 그러나 생성기 호출당 SynchronousSink 개체의 next 메서드를 두 번 이상 호출할 수 없습니다 .

StepVerifier 로 생성된 시퀀스를 검증해  보겠습니다 .

@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
    CharacterGenerator characterGenerator = new CharacterGenerator();
    Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);

    StepVerifier.create(characterFlux)
      .expectNext('a', 'b', 'c')
      .expectComplete()
      .verify();
}

이 예에서 구독자는 세 개의 항목만 요청합니다. 따라서 생성된 시퀀스는 a,b 및 c의 세 문자를 방출하여 끝납니다. expectNext () 는 Flux에서 기대하는 요소를 예상합니다. expectComplete ()는 Flux에서 요소 방출이 완료되었음을 나타냅니다 .

5. 플럭스 생성

Flux 의 create() 메서드 는 응용 프로그램의 상태에 영향을 받지 않는 여러 값(0에서 무한대까지)을 계산하려는 경우에 사용됩니다 . 이는 Flux create() 메서드의 기본 메서드가 요소를 계속 계산하기 때문입니다.

게다가 다운스트림 시스템은 필요한 요소 수를 결정합니다. 따라서 다운스트림 시스템이 따라갈 수 없는 경우 이미 방출된 요소가 버퍼링되거나 제거됩니다.

기본적으로 내보낸 요소는 다운스트림 시스템이 더 많은 요소를 요청할 때까지 버퍼링됩니다.

5.1.

이제 create() 메서드의 예를 보여드리겠습니다.

public class CharacterCreator {
    public Consumer<List<Character>> consumer;

    public Flux<Character> createCharacterSequence() {
        return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

create  연산자가  generate () 에서 사용되는 SynchronousSink  대신  FluxSink 를 요청 하는 것을 알 수 있습니다 .  이 경우 항목 List에 있는 모든 항목에 대해  next () 를 호출 하여 하나씩 방출합니다.

이제 CharacterCreator  를 두 개의 문자 시퀀스와 함께 사용하겠습니다.

@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    CharacterGenerator characterGenerator = new CharacterGenerator();
    List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
    List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
}

위의 코드 스니펫에서 sequence1sequence2 라는 두 개의 시퀀스를 만들었습니다 . 이러한 시퀀스는 캐릭터 항목의 소스 역할을 합니다. 문자 시퀀스를 가져오기 위해 CharacterGenerator 인스턴스를 사용하고 있음에 유의하십시오 .

이제 characterCreator 인스턴스 와 두 개의 스레드 인스턴스 를 정의하겠습니다 .

CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));

이제 게시자에게 요소를 제공할 두 개의 스레드 인스턴스를 만들고 있습니다. 수락 연산자가 호출되면 문자 요소가 시퀀스 소스로 흐르기 시작합니다. 다음으로 새로운 통합 시퀀스를 구독 합니다.

List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);

createCharacterSequence 는 우리가 구독한 Flux를 반환하고 통합 List 의 요소를 소비 한다는 점에 유의 하십시오. 다음으로 두 개의 서로 다른 스레드에서 항목이 이동하는 것을 보는 전체 프로세스를 트리거하겠습니다.

producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();

마지막으로 작업 결과를 확인하겠습니다.

assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');

수신된 시퀀스의 처음 세 문자는 sequence1에서 가져옵니다 . 마지막 두 문자는 sequence2 에서 가져온 것 입니다. 이는 비동기 작업이므로 해당 시퀀스의 요소 순서가 보장되지 않습니다.

6. 플럭스 생성 VS 플럭스 생성

다음은 만들기 작업과 생성 작업 간의 몇 가지 차이점입니다.

플럭스 생성 플럭스 생성
이 메서드는 Consumer<FluxSink> 인스턴스를 허용합니다. 이 메서드는 Consumer<SynchronousSink> 의 인스턴스를 허용합니다.
Create 메서드는 소비자를 한 번만 호출합니다. Generate 메서드는 다운스트림 애플리케이션의 필요에 따라 소비자 메서드를 여러 번 호출합니다.
소비자는 0..N 요소를 즉시 방출할 수 있습니다. 하나의 요소만 방출할 수 있습니다.
게시자는 다운스트림 상태를 인식하지 못합니다. 따라서 create는 흐름 제어를 위한 추가 매개변수로 오버플로 전략을 허용합니다. 게시자는 다운스트림 애플리케이션 요구 사항을 기반으로 요소를 생성합니다.
FluxSink 를 사용하면 필요한 경우 여러 스레드를 사용하여 요소를 방출할 수 있습니다 . 한 번에 하나의 요소만 방출하므로 여러 스레드에 유용하지 않습니다.

7. 결론

이 기사에서는 Flux API의 생성 및 생성 방법의 차이점에 대해 논의했습니다.

먼저 리액티브 프로그래밍의 개념을 소개하고 Flux API에 대해 이야기했습니다. 그런 다음 Flux API의 생성 및 생성 방법에 대해 논의했습니다. 마지막으로 Flux API의 생성 및 생성 메서드 간의 차이점 List을 제공했습니다.

이 사용방법(예제)의 소스 코드는  GitHub에서 사용할 수 있습니다 .

Generic footer banner