1. 소개

이 예제에서는 Spring WebFlux 로 작성된 반응형 프로그램의 동시성을 탐색합니다 .

리액티브 프로그래밍과 관련된 동시성에 대해 논의하는 것으로 시작하겠습니다. 그런 다음 Spring WebFlux가 다양한 반응형 서버 라이브러리에 대해 동시성 추상화를 제공하는 방법을 알아봅니다.

2. 리액티브 프로그래밍의 동기

일반적인 웹 애플리케이션은 복잡하고 상호 작용하는 여러 부분으로 구성 됩니다. 데이터를 가져오거나 업데이트하기 위한 데이터베이스 호출과 관련된 상호 작용과 같이 이러한 상호 작용의 대부분은 본질적으로 차단됩니다 . 그러나 다른 몇 가지는 독립적이며 동시에 수행할 수 있습니다 . 가능하면 병렬로 수행할 수 있습니다.

예를 들어 웹 서버에 대한 두 개의 사용자 요청을 서로 다른 스레드에서 처리할 수 있습니다. 멀티 코어 플랫폼에서 이는 전체 응답 시간 측면에서 분명한 이점이 있습니다. 따라서 이 동시성 모델은 요청당 스레드 모델로 알려져 있습니다 .

요청 모델당 스레드

위의 다이어그램에서 각 스레드는 한 번에 하나의 요청을 처리합니다.

스레드 기반 동시성은 우리를 위해 문제의 일부를 해결하지만 단일 스레드 내에서 대부분의 상호 작용이 여전히 차단되고 있다는 사실을 해결하는 데는 아무런 도움이 되지 않습니다 . 또한 Java에서 동시성을 달성하기 위해 사용하는 기본 스레드는 컨텍스트 전환 측면에서 상당한 비용이 듭니다.

한편, 웹 애플리케이션이 점점 더 많은 요청에 직면함에 따라 요청 당 스레드 모델 은 기대에 미치지 못하기 시작합니다 .

결과적으로 상대적으로 적은 수의 스레드로 점점 더 많은 요청을 처리하는 데 도움이 되는 동시성 모델이 필요합니다 . 이것은 리액티브 프로그래밍 을 채택하는 주요 동기 중 하나입니다 .

3. 리액티브 프로그래밍의 동시성

리액티브 프로그래밍은 데이터 흐름과 이를 통한 변경 전파 측면에서 프로그램을 구성하는 데 도움이 됩니다 . 완전히 차단되지 않는 환경에서 이를 통해 더 나은 리소스 활용으로 더 높은 동시성을 달성할 수 있습니다.

그러나 리액티브 프로그래밍은 스레드 기반 동시성에서 완전히 벗어났습니까? 이것은 강력한 진술이지만, 리액티브 프로그래밍은 확실히 동시성을 달성하기 위해 스레드를 사용하는 접근 방식이 매우 다릅니다 . 따라서 리액티브 프로그래밍이 가져오는 근본적인 차이점은 비동기성입니다.

즉, 프로그램 흐름은 일련의 동기 작업에서 비동기 이벤트 스트림으로 변환됩니다.

예를 들어 반응형 모델에서 데이터베이스에 대한 읽기 호출은 데이터를 가져오는 동안 호출 스레드를 차단하지 않습니다. 호출은 다른 사람이 구독할 수 있는 게시자를 즉시 ​​반환합니다 . 구독자는 이벤트가 발생한 후 이벤트를 처리할 수 있으며 이벤트 자체를 추가로 생성할 수도 있습니다.

반응형 모델

무엇보다 리액티브 프로그래밍은 어떤 스레드 이벤트를 생성하고 소비해야 하는지 강조하지 않습니다. 오히려 프로그램을 비동기 이벤트 스트림으로 구성하는 데 중점을 둡니다 .

여기서 게시자와 구독자는 동일한 스레드의 일부일 필요가 없습니다. 이를 통해 사용 가능한 스레드의 활용도를 높이고 전체 동시성을 높일 수 있습니다.

4. 이벤트 루프

동시성에 대한 반응적 접근 방식을 설명하는 여러 가지 프로그래밍 모델이 있습니다 .

이 섹션에서는 리액티브 프로그래밍이 더 적은 수의 스레드로 더 높은 동시성을 달성하는 방법을 이해하기 위해 몇 가지를 살펴보겠습니다.

서버에 대한 반응형 비동기 프로그래밍 모델 중 하나는 이벤트 루프 모델입니다 .

이벤트 루프

위는 반응성 비동기 프로그래밍의 아이디어를 나타내는 이벤트 루프 의 추상적인 디자인입니다 .

  • 이벤트 루프단일 스레드에서 계속 실행 되지만 사용 가능한 코어 수 만큼 많은 이벤트 루프 를 가질 수 있습니다.
  • 이벤트 루프이벤트 큐 의 이벤트를 순차적으로 처리하고 콜백플랫폼 에 등록한 후 즉시 반환합니다.
  • 플랫폼데이터베이스 호출 또는 외부 서비스 호출과 같은 작업 완료를 트리거할 수 있습니다.
  • 이벤트 루프작업 완료 알림에서 콜백트리거 하고 결과를 원래 호출자에게 다시 보낼 수 있습니다.

이벤트 루프 모델Node.js , NettyNgnix 를 비롯한 여러 플랫폼에서 구현됩니다 . Apache HTTP Server , Tomcat 또는 JBoss 와 같은 기존 플랫폼보다 훨씬 뛰어난 확장성을 제공합니다 .

5. Spring WebFlux를 사용한 반응형 프로그래밍

이제 우리는 Spring WebFlux에서 주제를 탐색할 수 있는 반응형 프로그래밍 및 동시성 모델에 대한 충분한 통찰력을 얻었습니다.

웹플럭스는 버전 5.0에 추가된 Spring 반응형 스택 웹 프레임워크 .

Spring WebFlux의 서버 측 스택을 탐색하여 Spring에서 기존 웹 스택을 보완하는 방법을 이해해 보겠습니다.

스프링 웹 스택

보시다시피 Spring WebFlux는 Spring의 기존 웹 프레임워크와 병렬로 위치하며 반드시 이를 대체하지는 않습니다 .

여기서 주목해야 할 몇 가지 중요한 사항이 있습니다.

  • Spring WebFlux는 기존의 어노테이션 기반 프로그래밍 모델을 기능적 라우팅으로 확장합니다.
  • 또한 기본 HTTP 런타임을 Reactive Streams API에 맞게 조정하여 런타임을 상호 운용할 수 있도록 합니다.
  • Tomcat, Reactor, Netty 또는 Undertow 와 같은 Servlet 3.1+ 컨테이너를 포함하여 다양한 반응형 런타임을 지원할 수 있습니다.
  • 마지막으로 기능적이고 유창한 API를 제공하는 HTTP 요청을 위한 반응형 비차단 클라이언트인 WebClient 가 포함되어 있습니다.

6. 지원되는 런타임의 스레딩 모델

앞서 논의한 것처럼 반응형 프로그램은 몇 개의 스레드로 작업하고 이를 최대한 활용하는 경향이 있습니다. 그러나 스레드의 수와 특성은 우리가 선택한 실제 Reactive Stream API 런타임에 따라 다릅니다.

명확히 하기 위해 Spring WebFlux는 HttpHandler 에서 제공하는 공통 API를 통해 다양한 런타임에 적응할 수 있습니다 . 이 API는 Reactor Netty, Servlet 3.1 API 또는 Undertow API와 같은 다양한 서버 API에 대한 추상화를 제공하는 단 하나의 메서드가 포함된 간단한 계약입니다.

그 중 몇 가지에 구현된 스레딩 모델을 살펴보겠습니다.

Netty는 WebFlux 애플리케이션의 기본 서버이지만 지원되는 다른 서버로 전환하기 위해 올바른 의존성을 선언하는 문제일 뿐입니다 .

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

여러 가지 방법으로 JVM(Java Virtual Machine)에서 생성된 스레드를 관찰할 수 있지만 Thread 클래스 자체 에서 가져오는 것은 매우 쉽습니다 .

Thread.getAllStackTraces()
  .keySet()
  .stream()
  .collect(Collectors.toList());

6.1. 리액터 네티

우리가 말했듯이 Reactor Netty 는 Spring Boot WebFlux 스타터의 기본 임베디드 서버입니다. Netty가 기본적으로 생성하는 스레드를 살펴보겠습니다. 시작하려면 다른 의존성을 추가하거나 WebClient를 사용하지 않습니다. 따라서 SpringBoot 스타터를 사용하여 생성된 Spring WebFlux 애플리케이션을 시작하면 생성되는 몇 가지 기본 스레드를 볼 수 있습니다.

1

Netty 는 서버의 일반 스레드와 별개로 요청 처리를 위한 여러 작업자 스레드를 생성합니다 . 이들은 일반적으로 사용 가능한 CPU 코어입니다. 이것은 쿼드 코어 머신의 출력입니다. 또한 JVM 환경에 일반적으로 사용되는 하우스키핑 스레드도 많이 볼 수 있지만 여기서는 중요하지 않습니다.

Netty는 이벤트 루프 모델을 사용하여 반응형 비동기 방식으로 확장성이 뛰어난 동시성을 제공합니다. Netty가 Java NIO를 활용하여 이 확장성을 제공 하는 이벤트 루프를 구현하는 방법을 살펴보겠습니다 .

Netty 스레딩 모델

여기에서 EventLoopGroup 은 지속적으로 실행되어야 하는 하나 이상의 EventLoop를 관리합니다 . 따라서 사용 가능한 코어 수보다 많은 EventLoop 를 생성하지 않는 것이 좋습니다 .

EventLoopGroup새로 생성된 각각 의 Channel 에 EventLoop 를 추가로 할당합니다 . 따라서 Channel 의 수명 동안 모든 작업은 동일한 스레드에 의해 실행됩니다.

6.2. 아파치 톰캣

Spring WebFlux는 Apache Tomcat 과 같은 전통적인 서블릿 컨테이너에서도 지원됩니다 .

WebFlux는 비차단 I/O가 있는 Servlet 3.1 API에 의존합니다 . 하위 수준 어댑터 뒤에서 Servlet API를 사용하는 동안 Servlet API는 직접 사용할 수 없습니다.

Tomcat에서 실행되는 WebFlux 애플리케이션에서 예상되는 스레드 종류를 살펴보겠습니다.

2

여기서 볼 수 있는 스레드의 수와 유형은 이전에 관찰한 것과는 상당히 다릅니다.

우선 Tomcat은 더 많은 작업자 스레드로 시작하며 기본값은 10 입니다. 물론 JVM과 Catalina 컨테이너에 일반적인 몇 가지 하우스키핑 스레드도 표시되지만 이 논의에서는 무시할 수 있습니다.

Java NIO를 사용하는 Tomcat의 아키텍처를 위에서 본 스레드와 연관시키기 위해 이해해야 합니다.

Tomcat 5 이상에서는 주로 요청 수신을 담당하는 커넥터 구성 요소에서 NIO를 지원합니다 .

다른 Tomcat 구성 요소는 컨테이너 관리 기능을 담당하는 컨테이너 구성 요소입니다.

여기서 우리가 관심을 두는 부분은 커넥터 구성 요소가 NIO를 지원하기 위해 구현하는 스레딩 모델입니다. NioEndpoint 모듈 의 일부로 Acceptor , PollerWorker 로 구성 됩니다 .

톰캣 NIO 커넥터

Tomcat은 일반적으로 Worker 전용 스레드 풀과 함께 Acceptor , PollerWorker 에 대한 하나 이상의 스레드를 생성합니다 .

Tomcat 아키텍처에 대한 자세한 논의는 이 기사의 범위를 벗어나지만 이제 이전에 본 스레드를 이해할 수 있는 충분한 통찰력을 얻었을 것입니다.

7. WebClient 의 스레딩 모델

WebClient Spring WebFlux의 일부인 반응형 HTTP 클라이언트입니다. REST 기반 통신이 필요할 때마다 사용할 수 있으므로 종단 간 반응애플리케이션을 만들 수 있습니다.

이전에 본 것처럼 반응형 애플리케이션은 몇 개의 스레드로 작동하므로 애플리케이션의 어떤 부분도 스레드를 차단할 여유가 없습니다. 따라서 WebClient 는 WebFlux의 잠재력을 실현하는 데 중요한 역할을 합니다.

7.1. 웹 클라이언트 사용

WebClient 사용 도 매우 간단합니다. Spring WebFlux의 일부이므로 특정 의존성을 포함할 필요가 없습니다 .

Mono 를 반환하는 간단한 REST Endpoints을 만들어 보겠습니다 .

@GetMapping("/index")
public Mono<String> getIndex() {
    return Mono.just("Hello World!");
}

그런 다음 WebClient 를 사용하여 이 REST Endpoints을 호출하고 반응적으로 데이터를 사용합니다.

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .doOnNext(s -> printThreads());

여기서 우리는 앞서 논의한 방법을 사용하여 생성된 스레드도 인쇄합니다.

7.2. 스레딩 모델 이해

그렇다면 WebClient 의 경우 스레딩 모델은 어떻게 작동 합니까?

당연히 WebClient 는 이벤트 루프 모델 을 사용하여 동시성을 구현합니다 . 물론 기본 런타임에 의존하여 필요한 인프라를 제공합니다.

Reactor Netty에서 WebClient 를 실행하는 경우 Netty가 서버에 사용하는 이벤트 루프를 공유합니다 . 따라서 이 경우 생성되는 스레드에서 큰 차이를 느끼지 못할 수 있습니다.

그러나 WebClient 는 Jetty와 같은 Servlet 3.1+ 컨테이너에서도 지원되지만 작동 방식이 다릅니다 .

WebClient 를 사용하거나 사용하지 않고 Jetty 를 실행하는 WebFlux 애플리케이션에서 생성된 스레드를 비교하면 몇 가지 추가 스레드가 있음을 알 수 있습니다.

여기서 WebClient이벤트 루프 를 생성해야 합니다 . 따라서 이 이벤트 루프가 생성하는 고정된 수의 처리 스레드를 볼 수 있습니다.

삼

경우에 따라 클라이언트와 서버에 대해 별도의 스레드 풀이 있으면 더 나은 성능을 제공할 수 있습니다 . Netty의 기본 동작은 아니지만 필요한 경우 항상 WebClient 전용 스레드 풀을 선언할 수 있습니다.

이후 섹션에서 이것이 어떻게 가능한지 살펴보겠습니다.

8. 데이터 액세스 라이브러리의 스레딩 모델

앞서 살펴본 것처럼 단순한 애플리케이션도 일반적으로 연결해야 하는 여러 부분으로 구성됩니다.

이러한 부분의 일반적인 예로는 데이터베이스와 메시지 브로커가 있습니다. 그들 중 다수와 연결하기 위한 기존 라이브러리는 여전히 차단되고 있지만 빠르게 변화하고 있습니다.

현재 연결을 위한 반응성 라이브러리를 제공하는 여러 데이터베이스가 있습니다 . 이러한 라이브러리 중 다수는 Spring Data 내에서 사용할 수 있으며 다른 라이브러리도 직접 사용할 수 있습니다.

이러한 라이브러리에서 사용하는 스레딩 모델은 우리에게 특히 중요합니다.

8.1. 스프링 데이터 몽고DB

Spring Data MongoDBMongoDB Reactive Streams 드라이버 위에 구축된 MongoDB를 위한 반응형 리포지토리 지원을 제공합니다 . 특히 이 드라이버 는 Reactive Streams API를 완벽하게 구현 하여 비차단 배압으로 비동기 스트림 처리를 제공합니다 .

Spring Boot 애플리케이션에서 MongoDB에 대한 반응형 리포지토리에 대한 지원을 설정하는 것은 의존성을 추가하는 것만큼 간단합니다.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

이를 통해 리포지토리를 생성하고 이를 사용하여 비차단 방식으로 MongoDB에서 몇 가지 기본 작업을 수행할 수 있습니다.

public interface PersonRepository extends ReactiveMongoRepository<Person, ObjectId> {
}
.....
personRepository.findAll().doOnComplete(this::printThreads);

그렇다면 Netty 서버에서 이 애플리케이션을 실행할 때 어떤 종류의 스레드를 볼 수 있을까요?

Spring Data 리포지토리가 서버에서 사용할 수 있는 것과 동일한 이벤트 루프를 사용하기 때문에 당연히 큰 차이는 없을 것 입니다.

8.2. 반응기 카프카

Spring은 여전히 ​​반응형 Kafka에 대한 완전한 지원을 구축하는 과정에 있습니다. 그러나 Spring 외부에서 사용할 수 있는 옵션이 있습니다.

Reactor Kafka 는 Reactor를 기반으로 하는 Kafka용 반응형 API입니다 . Reactor Kafka는 기능적 API를 사용하여 메시지를 게시하고 사용할 수 있으며 비차단 배압 도 있습니다.

먼저 Reactor Kafka 사용을 시작하려면 애플리케이션에 필요한 의존성을 추가해야 합니다.

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.3.10</version>
</dependency>

이렇게 하면 비차단 방식으로 Kafka에 메시지를 생성할 수 있습니다.

// producerProps: Map of Standard Kafka Producer Configurations
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender =  KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux
  .range(1, 10)
  .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();

마찬가지로 비차단 방식으로도 Kafka의 메시지를 사용할 수 있어야 합니다.

// consumerProps: Map of Standard Kafka Consumer Configurations
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads)

이것은 매우 간단하고 자명합니다.

우리는 Kafka에서 주제 반응형 테스트 를 구독하고 메시지 흐름 을 받고 있습니다.

흥미로운 점은 생성되는 스레드 입니다 .

4

Netty 서버에 일반적이지 않은 몇 가지 스레드를 볼 수 있습니다 .

이것은 Reactor Kafka가 Kafka 메시지 처리에 독점적으로 참여하는 몇 개의 작업자 스레드와 함께 자체 스레드 풀을 관리함을 나타냅니다. 물론 무시할 수 있는 Netty 및 JVM과 관련된 다른 많은 스레드를 보게 될 것입니다.

Kafka 생산자는 브로커에 요청을 보내기 위해 별도의 네트워크 스레드를 사용합니다. 또한 단일 스레드 풀 스케줄러 에서 애플리케이션에 대한 응답을 제공합니다 .

반면 Kafka 소비자는 수신 메시지 수신을 차단하는 소비자 그룹당 하나의 스레드를 가집니다. 수신 메시지는 다른 스레드 풀에서 처리하도록 예약됩니다.

9. WebFlux의 스케줄링 옵션

지금까지 우리는 리액티브 프로그래밍이 단 몇 개의 스레드로 완전히 차단되지 않는 환경에서 실제로 빛을 발한다 는 것을 보았습니다 . 그러나 이것은 실제로 차단되는 부분이 있으면 훨씬 더 나쁜 성능을 초래한다는 의미이기도 합니다. 이는 차단 작업이 이벤트 루프를 완전히 정지시킬 수 있기 때문입니다.

그렇다면 리액티브 프로그래밍에서 장기 실행 프로세스 또는 차단 작업을 어떻게 처리합니까?

솔직히, 최선의 선택은 그들을 피하는 것입니다. 그러나 이것이 항상 가능한 것은 아니며 애플리케이션의 해당 부분에 대한 전용 스케줄링 전략이 필요할 수 있습니다 .

Spring WebFlux 는 데이터 흐름 체인 사이에서 처리를 다른 스레드 풀로 전환하는 메커니즘을 제공합니다 . 이를 통해 특정 작업에 대해 원하는 일정 전략을 정확하게 제어할 수 있습니다. 물론 WebFlux 는 기본 반응 라이브러리에서 사용 가능한 스케줄러로 알려진 스레드 풀 추상화를 기반으로 이를 제공할 수 있습니다.

9.1. 반응기

Reactor 에서 Scheduler 클래스는 실행 모델과 실행 위치를 정의 합니다 .

Schedulers 클래스 는 Immediate , single , elasticparallel 과 같은 여러 실행 컨텍스트를 제공 합니다 . 이들은 다양한 작업에 유용할 수 있는 다양한 유형의 스레드 풀을 제공합니다. 또한 기존 ExecutorService 를 사용하여 항상 자체 스케줄러 를 만들 수 있습니다 .

스케줄러 는 여러 실행 컨텍스트를 제공 하지만 Reactor 는 실행 컨텍스트를 전환하는 다양한 방법도 제공합니다 . 이것이 publishOnsubscribeOn 메소드 입니다.

우리는 체인의 어느 곳에서나 스케줄러 와 함께 publishOn 을 사용할 수 있으며 해당 스케줄러 는 모든 후속 연산자에 영향을 미칩니다.

체인의 어느 곳에서나 스케줄러 와 함께 subscribeOn 을 사용할 수도 있지만 방출 소스의 컨텍스트에만 영향을 미칩니다.

Netty 의 WebClient 는 기본 동작으로 서버용으로 생성된 동일한 이벤트 루프 를 공유합니다. 그러나 WebClient 전용 스레드 풀을 만들어야 하는 타당한 이유가 있을 수 있습니다.

WebFlux의 기본 반응 라이브러리인 Reactor에서 어떻게 이를 달성할 수 있는지 살펴보겠습니다.

Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .publishOn(scheduler)
  .doOnNext(s -> printThreads());

이전에는 WebClient 를 사용하거나 사용하지 않고 Netty에서 생성된 스레드의 차이를 관찰하지 못했습니다 . 그러나 이제 위의 코드를 실행하면 몇 가지 새 스레드가 생성되는 것을 볼 수 있습니다 .

5

여기 에서 제한된 탄성 스레드 풀 의 일부로 생성된 스레드를 볼 수 있습니다 . 여기에서 WebClient 의 응답이 구독되면 게시됩니다.

이렇게 하면 서버 요청을 처리하기 위한 기본 스레드 풀이 남습니다.

9.2. RxJava

RxJava 의 기본 동작은 Reactor의 기본 동작과 크게 다르지 않습니다 .

Observable 과 우리가 적용한 일련의 연산자는 작업을 수행하고 구독이 호출된 동일한 스레드에서 관찰자에게 알립니다 . 또한 Reactor와 마찬가지로 RxJava 는 접두사 또는 사용자 정의 스케줄링 전략을 체인에 도입하는 방법을 제공합니다.

RxJava는 또한 Observable 체인 에 대한 여러 실행 모델을 제공하는 스케줄러 클래스 제공합니다 . 여기에는 new thread , Immediate , trampoline , io , computationtest 가 포함 됩니다. 물론 Java Executor 에서 스케줄러 를 정의할 수도 있습니다 .

또한 RxJava는 이를 달성하기 위해 두 가지 확장 메서드subscribeOnobserveOn 도 제공합니다 .

subscribeOn 메서드는 Observable이 작동해야 하는 다른 스케줄러를 지정하여 기본 동작 변경 합니다 . 반면 에 observeOn 메서드는 Observable 이 관찰자에게 알림을 보내는 데 사용할 수 있는 다른 스케줄러를 지정합니다 .

이전에 논의한 것처럼 Spring WebFlux는 기본적으로 반응 라이브러리로 Reactor를 사용합니다. 그러나 Reactive Streams API와 완벽하게 호환되므로 RxJava (Reactive Streams 어댑터가 있는 RxJava 1.x용 )와 같은 다른 Reactive Streams 구현으로 전환할 수 있습니다 .

의존성을 명시적으로 추가해야 합니다.

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

그런 다음 애플리케이션에서 RxJava 특정 스케줄러 와 함께 Observable 과 같은 RxJava 유형을 사용할 수 있습니다 .

io.reactivex.Observable
  .fromIterable(Arrays.asList("Tom", "Sawyer"))
  .map(s -> s.toUpperCase())
  .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
  .doOnComplete(this::printThreads);

결과적으로 이 애플리케이션을 실행하면 일반 Netty 및 JVM 관련 스레드와 별도로 RxJava 스케줄러 와 관련된 몇 가지 스레드가 표시됩니다 .

6

10. 결론

이 기사에서 우리는 동시성의 맥락에서 리액티브 프로그래밍의 전제를 탐구했습니다. 기존 프로그래밍과 반응형 프로그래밍에서 동시성 모델의 차이를 관찰했습니다. 이를 통해 Spring WebFlux의 동시성 모델과 이를 달성하기 위한 스레딩 모델을 검토할 수 있었습니다.

그런 다음 다양한 HTTP 런타임 및 반응성 라이브러리와 함께 WebFlux의 스레딩 모델을 탐색했습니다. 또한 WebClient 와 데이터 액세스 라이브러리 를 사용할 때 스레딩 모델이 어떻게 다른지 배웠습니다 .

마지막으로 WebFlux 내의 반응형 프로그램에서 스케줄링 전략을 제어하는 ​​옵션을 다루었습니다.

항상 그렇듯이 이 기사의 소스 코드는 GitHub 에서 찾을 수 있습니다 .

Generic footer banner