1. 소개
Spring WebFlux 는 웹 애플리케이션에 반응형 프로그래밍을 제공합니다. Reactive 디자인 의 비동기 및 비차단 특성 은 성능과 메모리 사용을 향상시킵니다. Project Reactor 는 데이터 스트림을 효율적으로 관리할 수 있는 기능을 제공합니다.
그러나 배압은 이러한 종류의 응용 프로그램에서 일반적인 문제입니다. 이 예제에서 우리는 그것이 무엇인지 그리고 이를 완화하기 위해 Spring WebFlux에서 배압 메커니즘을 적용하는 방법을 설명할 것입니다.
2. 반응성 스트림의 배압
반응형 프로그래밍의 비차단 특성으로 인해 서버는 전체 스트림을 한 번에 보내지 않습니다. 사용 가능한 즉시 데이터를 동시에 푸시할 수 있습니다. 따라서 클라이언트는 이벤트를 수신하고 처리하기 위해 대기하는 시간이 줄어듭니다. 그러나 극복해야 할 문제가 있습니다.
소프트웨어 시스템의 배압은 트래픽 통신에 과부하를 주는 기능 입니다. 즉, 정보 방출자는 처리할 수 없는 데이터로 소비자를 압도합니다.
결국 사람들은 이 용어를 제어하고 처리하는 메커니즘으로 적용하기도 합니다. 다운스트림 힘을 제어하기 위해 시스템이 취하는 보호 조치입니다.
2.1. 배압이란 무엇입니까?
Reactive Streams에서 배압은 스트림 요소의 전송을 조절하는 방법도 정의합니다 . 즉, 수신자가 소비할 수 있는 요소 수를 제어합니다.
그것이 무엇인지 명확하게 설명하기 위해 예를 들어 보겠습니다.
- 시스템에는 게시자, 소비자 및 그래픽 사용자 인터페이스(GUI)의 세 가지 서비스가 포함되어 있습니다.
- 게시자는 초당 10000개의 이벤트를 소비자에게 보냅니다.
- 소비자는 이를 처리하고 결과를 GUI로 보냅니다.
- GUI는 사용자에게 결과를 표시합니다.
- 소비자는 초당 7500개의 이벤트만 처리할 수 있습니다.
이 속도에서는 소비자가 이벤트( 배압) 를 관리할 수 없습니다 . 결과적으로 시스템이 무너지고 사용자는 결과를 볼 수 없습니다.
2.2. 배압을 사용하여 시스템 장애 방지
여기서 권장 사항은 시스템 오류를 방지하기 위해 일종의 배압 전략을 적용하는 것입니다. 목표는 수신된 추가 이벤트를 효율적으로 관리하는 것입니다.
- 전송된 데이터 스트림을 제어 하는 것이 첫 번째 옵션 입니다. 기본적으로 게시자는 이벤트 속도를 늦춰야 합니다. 따라서 소비자는 과부하되지 않습니다. 안타깝게도 이것이 항상 가능한 것은 아니며 사용 가능한 다른 옵션을 찾아야 합니다.
- 여분의 데이터를 버퍼링하는 것이 두 번째 선택 입니다. 이 접근 방식을 사용하면 소비자는 나머지 이벤트를 처리할 수 있을 때까지 임시로 저장합니다. 여기서 주요 단점은 메모리 충돌을 일으키는 버퍼 바인딩을 해제하는 것입니다.
- 추적하지 못하는 추가 이벤트를 삭제합니다 . 이 솔루션도 이상적이지 않습니다. 이 기술을 사용하면 시스템이 붕괴되지 않습니다.
2.3. 배압 제어
게시자가 내보낸 이벤트를 제어하는 데 중점을 둘 것입니다. 기본적으로 따라야 할 세 가지 전략이 있습니다.
- 가입자가 요청할 때만 새 이벤트를 보냅니다 . 이미터 요청 시 요소를 수집하는 풀 전략입니다.
- 클라이언트 측에서 수신할 이벤트 수를 제한합니다 . 제한된 푸시 전략으로 작동하여 게시자는 한 번에 클라이언트에 최대 항목 수를 보낼 수 있습니다.
- 소비자가 더 이상 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다 . 이 경우 수신자는 언제든지 전송을 중단하고 나중에 다시 스트림을 구독할 수 있습니다.
3. Spring WebFlux에서 배압 처리하기
Spring WebFlux 는 반응형 스트림의 비동기 비차단 흐름을 제공합니다 . Spring WebFlux 내에서 배압을 담당하는 것은 Project Reactor 입니다. 내부적으로 Flux 기능 을 사용하여 이미터에서 생성된 이벤트를 제어하는 메커니즘을 적용합니다.
WebFlux는 TCP 흐름 제어를 사용하여 배압을 바이트 단위로 조절합니다. 그러나 소비자가 받을 수 있는 논리적 요소를 처리하지 않습니다. 후드 아래에서 발생하는 상호 작용 흐름을 살펴보겠습니다.
- WebFlux 프레임워크는 TCP를 통해 이벤트를 전송/수신하기 위해 이벤트를 바이트로 변환하는 역할을 합니다.
- 다음 논리적 요소를 요청하기 전에 소비자가 시작하고 장기 실행 작업이 발생할 수 있습니다.
- 수신자가 이벤트를 처리하는 동안 WebFlux는 새로운 이벤트에 대한 요구가 없기 때문에 확인 없이 바이트를 큐에 넣습니다.
- TCP 프로토콜의 특성으로 인해 새 이벤트가 있으면 게시자가 계속해서 네트워크로 보냅니다.
결론적으로 위의 다이어그램은 논리적 요소의 수요가 소비자와 게시자에 대해 다를 수 있음을 보여줍니다. Spring WebFlux는 전체 시스템으로 상호 작용하는 서비스 간의 배압을 이상적으로 관리하지 않습니다. 소비자와 독립적으로 처리한 다음 게시자와 동일한 방식으로 처리합니다. 그러나 두 서비스 간의 논리적 요구를 고려하지 않습니다.
따라서 Spring WebFlux는 우리가 예상할 수 있는 배압을 처리하지 않습니다 . 다음 섹션에서 Spring WebFlux에서 배압 메커니즘을 구현하는 방법을 살펴보겠습니다!
4. Spring WebFlux로 배압 메커니즘 구현하기
Flux 구현 을 사용하여 수신된 이벤트의 제어를 처리합니다. 따라서 읽기 및 쓰기 측에서 배압 지원을 통해 요청 및 Response body을 노출합니다. 그런 다음 생산자는 소비자의 용량이 확보될 때까지 속도를 늦추거나 중단합니다. 어떻게 하는지 보자!
4.1. 의존성
예제를 구현하기 위해 Spring WebFlux 스타터 및 Reactor 테스트 의존성을 pom.xml 에 추가하기만 하면 됩니다 .
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
4.2. 요구
첫 번째 옵션은 소비자가 처리할 수 있는 이벤트를 제어하는 것 입니다. 따라서 게시자는 수신자가 새 이벤트를 요청할 때까지 기다립니다. 요약하면 클라이언트는 Flux 를 구독 한 다음 수요에 따라 이벤트를 처리합니다.
@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);
request.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("All 50 items have been successfully processed!!!"),
subscription -> {
for (int i = 0; i < 5; i++) {
System.out.println("Requesting the next 10 elements!!!");
subscription.request(10);
}
}
);
StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.thenRequest(10)
.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.thenRequest(10)
.expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
.thenRequest(10)
.expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
.thenRequest(10)
.expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
.verifyComplete();
이 접근 방식을 사용하면 송신기가 Listener를 압도하지 않습니다. 즉, 클라이언트는 필요한 이벤트를 처리하도록 제어됩니다.
StepVerifier 를 사용하여 배압과 관련하여 생산자 동작을 테스트합니다 . thenRequest(n) 가 호출 될 때만 다음 n 항목을 예상합니다 .
4.3. 한계
두 번째 옵션은 Project Reactor 의 limitRange() 연산자를 사용하는 것입니다. 한 번에 프리페치할 항목 수를 설정할 수 있습니다 . 한 가지 흥미로운 기능은 구독자가 처리할 이벤트를 더 요청하는 경우에도 제한이 적용된다는 것 입니다. 이미터는 이벤트를 각 요청에 대한 한도 이상 소비하지 않도록 청크로 분할합니다.
@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
Flux<Integer> limit = Flux.range(1, 25);
limit.limitRate(10);
limit.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("Finished!!"),
subscription -> subscription.request(15)
);
StepVerifier.create(limit)
.expectSubscription()
.thenRequest(15)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectNext(11, 12, 13, 14, 15)
.thenRequest(10)
.expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
.verifyComplete();
}
4.4. 취소
마지막으로 소비자는 수신할 이벤트를 언제든지 취소할 수 있습니다 . 이 예에서는 다른 접근 방식을 사용합니다. Project Reactor를 사용하면 자체 Subscriber 를 구현 하거나 BaseSubscriber 를 확장할 수 있습니다. 따라서 Listener가 언급된 클래스를 재정의하여 언제라도 새 이벤트 수신을 중단할 수 있는 방법을 살펴보겠습니다.
@Test
public void whenCancel_thenSubscriptionFinished() {
Flux<Integer> cancel = Flux.range(1, 10).log();
cancel.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});
StepVerifier.create(cancel)
.expectNext(1, 2, 3)
.thenCancel()
.verify();
}
5. 결론
이 예제에서 우리는 Reactive Programming에서 backpressure가 무엇인지 그리고 그것을 피하는 방법을 보여주었습니다. Spring WebFlux는 Project Reactor를 통해 backpressure를 지원합니다. 따라서 게시자가 너무 많은 이벤트로 소비자를 압도할 때 가용성, 견고성 및 안정성을 제공할 수 있습니다. 요약하면 높은 수요로 인한 시스템 장애를 예방할 수 있습니다.
항상 그렇듯이 코드는 GitHub에서 사용할 수 있습니다 .