1. 소개
Reactor Core 는 반응형 프로그래밍 모델을 구현하는 Java 8 라이브러리입니다. 반응형 애플리케이션 구축을 위한 표준인 Reactive Streams 사양 위에 구축되었습니다 .
비반응형 Java 개발의 배경에서 반응형으로 가는 것은 상당히 가파른 학습 곡선일 수 있습니다. 이는 Java 8 Stream API와 비교할 때 동일한 상위 수준 추상화로 오인될 수 있으므로 더욱 어려워집니다 .
이 기사에서 우리는 이 패러다임을 이해하려고 시도할 것입니다. 리액티브 코드를 작성하는 방법에 대한 그림을 구축할 때까지 Reactor를 통해 작은 단계를 수행하여 이후 시리즈에서 제공할 고급 기사의 토대를 마련할 것입니다.
2. 리액티브 스트림 사양
Reactor를 살펴보기 전에 Reactive Streams 사양을 살펴봐야 합니다. 이것이 Reactor가 구현하는 것이며 라이브러리의 토대를 마련합니다.
기본적으로 Reactive Streams는 비동기 스트림 처리를 위한 사양입니다.
즉, 많은 이벤트가 비동기적으로 생성되고 소비되는 시스템입니다. 초당 수천 개의 주식 업데이트 스트림이 금융 애플리케이션에 들어오고 이러한 업데이트에 적시에 응답해야 하는 경우를 생각해 보십시오.
이것의 주요 목표 중 하나는 배압 문제를 해결하는 것입니다. 이벤트를 처리할 수 있는 속도보다 빠르게 소비자에게 이벤트를 내보내는 생산자가 있는 경우 결국 소비자는 이벤트에 압도되어 시스템 리소스가 부족해집니다.
배압은 소비자가 이를 방지하기 위해 얼마나 많은 데이터를 보낼지 생산자에게 알려줄 수 있어야 한다는 것을 의미하며 이것이 사양에 나와 있습니다.
3. 메이븐 의존성
시작하기 전에 Maven 의존성을 추가해 보겠습니다.
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.16</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>
또한 Logback 을 의존성으로 추가하고 있습니다. 이는 데이터 흐름을 더 잘 이해하기 위해 Reactor의 출력을 기록하기 때문입니다.
4. 데이터 스트림 생성
애플리케이션이 반응적이 되려면 먼저 데이터 스트림을 생성할 수 있어야 합니다.
이전에 제공한 주식 업데이트 예제와 같은 것일 수 있습니다. 이 데이터가 없으면 반응할 것이 없기 때문에 이것이 논리적인 첫 번째 단계입니다.
Reactive Core는 이를 가능하게 하는 두 가지 데이터 유형을 제공합니다.
4.1. 유량
이를 수행하는 첫 번째 방법은 Flux 를 사용하는 것 입니다. 0..n 요소 를 내보낼 수 있는 스트림입니다 . 간단한 것을 만들어 봅시다:
Flux<Integer> just = Flux.just(1, 2, 3, 4);
이 경우 네 가지 요소의 정적 스트림이 있습니다.
4.2. 단핵증
이를 수행하는 두 번째 방법은 0..1 요소 의 스트림인 Mono 를 사용하는 것입니다. 하나를 인스턴스화해 보겠습니다.
Mono<Integer> just = Mono.just(1);
이것은 Flux 와 거의 동일하게 보이고 동작하지만 이번에는 하나 이상의 요소로 제한됩니다.
4.3. Flux만이 아닌 이유는 무엇입니까?
추가 실험을 하기 전에 이 두 가지 데이터 유형이 있는 이유를 강조할 가치가 있습니다.
첫째, Flux 와 Mono 는 모두 Reactive Streams Publisher 인터페이스 의 구현 이라는 점에 유의해야 합니다 . 두 클래스 모두 사양을 준수하며 대신 이 인터페이스를 사용할 수 있습니다.
Publisher<String> just = Mono.just("foo");
하지만 실제로 이 카디널리티를 아는 것은 유용합니다. 이는 몇 가지 작업이 두 유형 중 하나에 대해서만 의미가 있고 표현력이 더 많기 때문입니다 (저장소의 findOne() 을 상상해 보십시오).
5. 스트림 구독
이제 우리는 데이터 스트림을 생성하는 방법에 대한 높은 수준의 개요를 가지고 있으며 요소를 내보내려면 구독해야 합니다.
5.1. 요소 수집
스트림의 모든 요소를 수집하기 위해 subscribe() 메서드를 사용합시다 .
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);
assertThat(elements).containsExactly(1, 2, 3, 4);
구독할 때까지 데이터 흐름이 시작되지 않습니다. 몇 가지 로깅도 추가했음을 주목하십시오. 이것은 뒤에서 무슨 일이 일어나고 있는지 볼 때 도움이 될 것입니다.
5.2. 요소의 흐름
로그인을 사용하면 스트림을 통해 데이터가 흐르는 방식을 시각화하는 데 사용할 수 있습니다.
20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()
우선 모든 것이 메인 스레드에서 실행됩니다. 이 기사의 뒷부분에서 동시성에 대해 더 자세히 살펴볼 것이므로 이에 대한 자세한 내용은 다루지 않겠습니다. 하지만 모든 것을 순서대로 처리할 수 있으므로 일이 간단해집니다.
이제 우리가 기록한 시퀀스를 하나씩 살펴보겠습니다.
- onSubscribe() – 스트림을 구독할 때 호출됩니다.
- request(unbounded) – subscribe 를 호출하면 뒤에서 Subscription 을 생성합니다 . 이 구독은 스트림에서 요소를 요청합니다. 이 경우 기본적으로 제한되지 않음으로 사용 가능한 모든 단일 요소를 요청합니다 .
- onNext() – 모든 단일 요소에서 호출됩니다.
- onComplete() – 마지막 요소를 수신한 후 마지막으로 호출됩니다. 실제로 예외가 있는 경우 호출되는 onError() 도 있지만 이 경우에는 예외가 없습니다.
이것은 Reactive Streams 사양의 일부로 구독자 인터페이스에 배치된 흐름이며 실제로는 onSubscribe() 에 대한 호출의 배후에서 인스턴스화된 것 입니다. 이것은 유용한 방법이지만 무슨 일이 일어나고 있는지 더 잘 이해하기 위해 구독자 인터페이스를 직접 제공해 보겠습니다.
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
위 흐름에서 가능한 각 단계는 구독자 구현의 메서드에 매핑되는 것을 볼 수 있습니다. Flux 는 이러한 장황함 을 줄이기 위한 도우미 메서드를 제공했습니다.
5.3. Java 8 스트림 과의 비교
여전히 수집을 수행 하는 Java 8 스트림 과 동의어가 있는 것처럼 보일 수 있습니다 .
List<Integer> collected = Stream.of(1, 2, 3, 4)
.collect(toList());
우리는 그렇지 않습니다.
핵심 차이점은 Reactive는 푸시 모델인 반면 Java 8 Streams 는 풀 모델이라는 것입니다. 반응적 접근 방식에서는 이벤트가 수신되면 구독자에게 푸시 됩니다.
다음으로 주목해야 할 것은 Streams 터미널 연산자가 모든 데이터를 가져오고 결과를 반환하는 터미널이라는 것입니다. Reactive를 사용하면 여러 구독자가 임시로 연결 및 제거되면서 외부 리소스에서 들어오는 무한 스트림을 가질 수 있습니다. 또한 스트림 결합, 스트림 조절 및 배압 적용과 같은 작업을 수행할 수 있습니다. 이에 대해서는 다음에 다룰 것입니다.
6. 배압
다음으로 고려해야 할 사항은 배압입니다. 이 예에서 구독자는 생산자에게 모든 단일 요소를 한 번에 푸시하라고 지시합니다. 이것은 구독자에게 부담이 되어 모든 리소스를 소비하게 될 수 있습니다.
배압은 다운스트림이 업스트림에 압도당하는 것을 방지하기 위해 더 적은 데이터를 보내도록 지시할 수 있는 경우 입니다.
배압을 적용하도록 구독자 구현을 수정할 수 있습니다 . request() 를 사용하여 업스트림에 한 번에 두 개의 요소만 보내도록 지시해 보겠습니다 .
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
이제 코드를 다시 실행하면 request(2) 가 호출되고 두 번의 onNext() 호출이 이어지고 request(2) 가 다시 호출되는 것을 볼 수 있습니다.
23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()
본질적으로 이것은 반응성 당김 배압입니다. 우리는 준비가 되었을 때만 일정량의 요소만 푸시하도록 업스트림에 요청하고 있습니다.
우리가 Twitter에서 스트리밍 트윗을 받고 있다고 상상한다면 무엇을 해야할지 결정하는 것은 업스트림에 달려 있습니다. 트윗이 들어오지만 다운스트림에서 요청이 없으면 업스트림에서 항목을 삭제하거나 버퍼에 저장하거나 다른 전략을 사용할 수 있습니다.
7. 스트림에서 작동
또한 스트림의 데이터에 대한 작업을 수행하여 적절하다고 판단되는 이벤트에 응답할 수 있습니다.
7.1. 스트림의 데이터 매핑
수행할 수 있는 간단한 작업은 변환을 적용하는 것입니다. 이 경우 스트림의 모든 숫자를 두 배로 늘리겠습니다.
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
map() 은 onNext() 가 호출 될 때 적용됩니다 .
7.2. 두 스트림 결합
그런 다음 다른 스트림을 이 스트림과 결합하여 더 흥미롭게 만들 수 있습니다. zip() 함수 를 사용하여 시도해 봅시다 .
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
.subscribe(elements::add);
assertThat(elements).containsExactly(
"First Flux: 2, Second Flux: 0",
"First Flux: 4, Second Flux: 1",
"First Flux: 6, Second Flux: 2",
"First Flux: 8, Second Flux: 3");
여기에서 우리는 하나 더 증가하고 원본과 함께 스트리밍하는 또 다른 Flux 를 생성하고 있습니다. 로그를 검사하여 이들이 어떻게 함께 작동하는지 확인할 수 있습니다.
20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()
이제 Flux 당 하나의 구독이 있음에 유의하십시오 . onNext () 호출도 번갈아 진행되므로 zip() 함수 를 적용할 때 스트림의 각 요소 인덱스가 일치 합니다.
8. 핫 스트림
현재 우리는 주로 콜드 스트림에 집중했습니다. 이들은 다루기 쉬운 정적 고정 길이 스트림입니다. 리액티브에 대한 보다 현실적인 사용 사례는 무한히 발생하는 것일 수 있습니다.
예를 들어 지속적으로 반응해야 하는 마우스 움직임의 흐름이나 Twitter 피드가 있을 수 있습니다. 이러한 유형의 스트림은 항상 실행 중이고 데이터의 시작 부분이 누락되어 언제든지 구독할 수 있으므로 핫 스트림이라고 합니다.
8.1. ConnectableFlux 생성
핫 스트림을 생성하는 한 가지 방법은 콜드 스트림을 하나로 변환하는 것입니다. 영원히 지속되는 Flux 를 만들어 결과를 콘솔에 출력하면 외부 리소스에서 들어오는 무한한 데이터 스트림을 시뮬레이션합니다.
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.publish();
publish() 를 호출 하면 ConnectableFlux 가 제공 됩니다. 즉, subscribe() 를 호출 해도 방출이 시작되지 않으므로 여러 구독을 추가할 수 있습니다.
publish.subscribe(System.out::println);
publish.subscribe(System.out::println);
이 코드를 실행하면 아무 일도 일어나지 않습니다. 우리가 connect() 를 호출하기 전까지 는 Flux 가 발광을 시작할 것입니다:
publish.connect();
8.2. 쓰로틀링
코드를 실행하면 콘솔이 로깅에 압도될 것입니다. 이것은 너무 많은 데이터가 소비자에게 전달되는 상황을 시뮬레이션하고 있습니다. 스로틀링으로 이 문제를 해결해 보겠습니다.
ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
while(true) {
fluxSink.next(System.currentTimeMillis());
}
})
.sample(ofSeconds(2))
.publish();
여기에서는 2초 간격으로 sample() 메서드를 도입했습니다. 이제 값은 2초마다 구독자에게 푸시되므로 콘솔이 훨씬 덜 바쁠 것입니다.
물론 윈도우잉 및 버퍼링과 같이 다운스트림으로 전송되는 데이터의 양을 줄이기 위한 여러 가지 전략이 있지만 이 기사에서는 다루지 않습니다.
9. 동시성
위의 모든 예제는 현재 기본 스레드에서 실행됩니다. 그러나 원하는 경우 코드가 실행되는 스레드를 제어할 수 있습니다. Scheduler 인터페이스 는 많은 구현이 제공되는 비동기 코드에 대한 추상화를 제공합니다. 메인에 다른 스레드를 구독해 봅시다:
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
병렬 스케줄러는 구독 이 다른 스레드에서 실행되도록 하며 로그를 확인하여 이를 증명할 수 있습니다. 첫 번째 항목이 메인 스레드에서 나오고 Flux가 parallel-1 이라는 다른 스레드에서 실행되는 것을 볼 수 있습니다.
20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()
동시성은 이것보다 더 흥미롭고 다른 기사에서 살펴볼 가치가 있습니다.
10. 결론
이 기사에서는 Reactive Core에 대한 높은 수준의 엔드 투 엔드 개요를 제공했습니다. 스트림을 게시 및 구독하고, 배압을 적용하고, 스트림에서 작업하고, 데이터를 비동기적으로 처리하는 방법을 설명했습니다. 이것은 우리가 반응형 애플리케이션을 작성할 수 있는 기반이 되기를 바랍니다.
이 시리즈의 이후 기사에서는 고급 동시성 및 기타 반응 개념을 다룰 것입니다. Reactor with Spring 을 다루는 또 다른 기사도 있습니다 .
우리 애플리케이션의 소스 코드는 GitHub에서 사용할 수 있습니다 .