1. 소개

Ratpack은 Netty 엔진 위에 구축된 프레임워크로 HTTP 애플리케이션을 빠르게 구축할 수 있습니다. 우리는 이미 이전 기사 에서 기본 사용법을 다루었습니다 . 이번에는 스트리밍 API를 사용하여 반응형 애플리케이션을 구현하는 방법을 보여드리겠습니다 .

2. Reactive Streams에 대한 요약

실제 구현에 들어가기 전에 먼저 Reactive Application을 구성하는 요소에 대해 간단히 요약해 보겠습니다. 원래 작성자 에 따르면 이러한 응용 프로그램에는 다음과 같은 속성이 있어야 합니다.

  • 반응형
  • 탄력적
  • 탄력있는
  • 메시지 기반

그렇다면 Reactive Streams는 이러한 속성을 달성하는 데 어떻게 도움이 될까요? 이 맥락에서 메시지 기반 이 반드시 메시징 미들웨어의 사용을 의미하지는 않습니다. 대신, 이 점을 해결하기 위해 실제로 필요한 것은 비동기식 요청 처리 및 비차단 배압 지원입니다 .

Ratpack 반응형 지원은 JVM용 Reactive Streams API 표준을 구현 기반으로 사용합니다. 따라서 Project Reactor 및 RxJava와 같은 다른 호환 프레임워크와의 상호 운용성을 허용 합니다.

3. Ratpacks의 스트림 클래스 사용

Ratpack의 Streams 클래스는 게시자 인스턴스 를 생성하는 몇 가지 유틸리티 메서드를 제공하며 , 이를 사용하여 데이터 처리 파이프라인을 생성할 수 있습니다.

좋은 출발점 은 모든 Iterable 에서 게시자 를 생성하는 데 사용할 수 있는 publish() 메서드입니다 .

Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();

여기에서 LoggingSubscriber 는 게시자가 내보낸 모든 개체를 기록 하는 구독자 인터페이스 의 테스트 구현입니다 . 또한 이름에서 알 수 있듯이 게시자가 모든 개체를 내보내거나 오류를 생성할 때까지 호출자를 차단하는 도우미 메서드 block() 도 포함되어 있습니다.

테스트 사례를 실행하면 예상되는 이벤트 시퀀스를 볼 수 있습니다.

onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908

또 다른 유용한 방법은 yield() 입니다. 여기에는 YieldRequest 객체를 수신하고 내보낼 다음 객체를 반환 하는 단일 Function 매개변수가 있습니다.

@Test
public void whenYield_thenSuccess() {
    
    Publisher<String> pub = Streams.yield((t) -> {
        return t.getRequestNum() < 5 ? "hello" : null;
    });
    
    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

YieldRequest 매개 변수를 사용하면 getRequestNum() 메서드 를 사용하여 지금까지 방출된 객체 수를 기반으로 논리를 구현할 수 있습니다 . 이 예제에서는 이 정보를 사용하여 null을 반환하여 신호를 보내는 종료 조건을 정의합니다 .

이제 주기적 이벤트에 대한 게시자 를 만드는 방법을 살펴보겠습니다 .

@Test
public void whenPeriodic_thenSuccess() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
        return t < 5 ? String.format("hello %d",t): null; 
    });

    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

반환된 게시자는 ScheduledExecutorService 를 사용하여 null을 반환할 때까지 주기적으로 생산자 함수를 호출합니다 . 생산자 함수는 스트림을 종료하는 데 사용하는 이미 방출된 개체 수에 해당하는 정수 값을 받습니다.

4. TransformablePublisher 사용

Streams의 메서드 를 자세히 살펴보면 일반적으로 TransformablePublisher 를 반환한다는 것을 알 수 있습니다 . 이 인터페이스 는 Project Reactor의 FluxMono 에서 볼 수 있는 것과 매우 유사한 여러 유틸리티 메서드로 Publisher 를 확장 하여 개별 단계에서 복잡한 처리 파이프라인을 보다 쉽게 ​​생성할 수 있도록 합니다 .

예를 들어 map 메서드를 사용하여 일련의 정수를 문자열로 변환해 보겠습니다.

@Test
public void whenMap_thenSuccess() throws Exception {
    TransformablePublisher<String> pub = Streams.yield( t -> {
        return t.getRequestNum() < 5 ? t.getRequestNum() : null;
      })
      .map(v -> String.format("item %d", v));
    
    ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
    assertTrue("should succeed", result.isSuccess());
    assertEquals("should have 5 items",5,result.getValue().size());
}

여기에서 실제 실행은 테스트 유틸리티 클래스 ExecHarness 에서 관리하는 스레드 풀 내에서 발생합니다 . yieldSingle() 은 Promise 를 예상 하므로 toList( ) 를 사용 하여 게시자를 조정합니다. 이 메서드는 구독자가 생성한 모든 결과를 수집하여 List 에 저장합니다 .

설명서에 명시된 대로 이 방법을 사용할 때는 주의해야 합니다. 제한되지 않은 게시자에 적용하면 JVM의 메모리가 빠르게 부족해질 수 있습니다! 이러한 상황을 피하려면 대부분 단위 테스트로 사용을 제한해야 합니다 .

map() 외에도 TransformablePublisher 에는 몇 가지 유용한 연산자가 있습니다.

  • filter() : 술어 를 기반으로 업스트림 객체를 필터링합니다.
  • take() : 업스트림 게시자 에서 처음 n개의 개체만 내보냅니다.
  • wiretap() : 데이터와 이벤트가 파이프라인을 통해 흐를 때 검사할 수 있는 관찰 지점을 추가합니다.
  • reduce() : 업스트림 객체를 단일 값으로 줄입니다.
  • transform() : 일반 게시자 를 스트림에 주입합니다.

5. 비준수 게시자 와 함께 buffer() 사용

일부 시나리오에서는 요청한 것보다 더 많은 항목을 구독자에게 보내는 게시자 를 처리해야 합니다 . 이러한 시나리오를 해결하기 위해 Ratpack의 Streams는 버퍼() 메서드를 제공하여 가입자가 사용할 때까지 추가 항목을 메모리에 유지합니다.

이것이 어떻게 작동하는지 설명하기 위해 요청된 항목 수를 무시 하는 단순한 비준수 게시자 를 만들어 보겠습니다. 대신 항상 요청된 것보다 5개 이상의 항목을 생성합니다.

private class NonCompliantPublisher implements Publisher<Integer> {

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        log.info("subscribe");
        subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
    }
    
    private class NonCompliantSubscription implements Subscription {
        private Subscriber<? super Integer> subscriber;
        private int recurseLevel = 0;

        public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            log.info("request: n={}", n);
            if ( recurseLevel > 0 ) {
               return;
            }
            recurseLevel++;
            for (int i = 0 ; i < (n + 5) ; i ++ ) {
                subscriber.onNext(i);
            }
            subscriber.onComplete();
        }

        @Override
        public void cancel() {
        }
    }
}

먼저 LoggingSubscriber  를 사용하여 이 게시자를 테스트해 보겠습니다. take() 연산자를 사용  하여 첫 번째 항목만 받을 것입니다.

@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction(""))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

이 테스트를 실행하면 cancel() 요청 을 받았음에도 불구하고 규정을 준수하지 않는 게시자가 계속해서 새 항목을 생성하는 것을 볼 수 있습니다.

RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145

이제 이 스트림에 buffer() 단계를 추가해 보겠습니다. 그 전에 이벤트를 기록하기 위해 두 가지 도청 단계를 추가 하여 그 효과가 더욱 분명해집니다.

@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction("before buffer"))
      .buffer()
      .wiretap(new LoggingAction("after buffer"))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

이번에는 이 코드를 실행하면 다른 로그 시퀀스가 ​​생성됩니다.

LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214

"버퍼 이전" 메시지는 비준수 게시자가 요청 에 대한 첫 번째 호출 이후 모든 값을 보낼 수 있음을 보여줍니다 . 그럼에도 불구하고 다운스트림 값은 여전히 ​​LoggingSubscriber 에서 요청한 양에 따라 하나씩 전송되었습니다 .

6. 느린 구독자와 함께 batch() 사용

애플리케이션의 처리량을 감소시킬 수 있는 또 다른 시나리오는 다운스트림 구독자가 소량의 데이터를 요청하는 경우입니다. LoggingSubscriber 가 좋은 예입니다. 한 에 하나의 항목만 요청합니다.

실제 응용 프로그램에서는 이로 인해 많은 컨텍스트 전환이 발생하여 전체 성능이 저하될 수 있습니다. 더 나은 방법은 한 번에 더 많은 수의 항목을 요청하는 것입니다. batch() 메서드를 사용하면 업스트림 게시자 는 보다 효율적인 요청 크기를 사용하고 다운스트림 구독자는 더 작은 요청 크기를 사용할 수 있습니다.

이것이 실제로 어떻게 작동하는지 봅시다. 이전과 마찬가지로 batch 없이 스트림으로 시작합니다 .

@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction(""));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

여기에서 CompliantPublisher 는 생성자에 전달된 값을 제외하고 최대 정수를 생성 하는 테스트 게시자 입니다. 일괄 처리되지 않은 동작을 확인하기 위해 실행해 보겠습니다.

CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331

출력은 생산자가 값을 하나씩 내보내는 것을 보여줍니다 . 이제 파이프라인에 step batch() 를 추가하여 업스트림 게시자가 한 번에 최대 5개의 항목을 생성하도록 하겠습니다.

@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
    
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction("before batch"))
      .batch(5, Action.noop())
      .wiretap(new LoggingAction("after batch"));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

batch() 메서드는 각 request() 호출에서 요청된 항목 수와 폐기된 항목(즉, 요청되었지만 소비되지 않은 항목)을 처리하기 위한 Action 의 두 가지 인수를 사용합니다. 이 상황은 오류가 있거나 다운스트림 구독자가 cancel() 을 호출하는 경우에 발생할 수 있습니다 . 결과 실행 로그를 보겠습니다.

LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690

이제 게시자가 매번 5개 항목에 대한 요청을 받는 것을 볼 수 있습니다 . 이 테스트 시나리오에서는 로깅 구독자가 첫 번째 항목을 가져오기 전에도 생산자에 대한 두 개의 요청을 볼 수 있습니다. 그 이유는 이 테스트 시나리오에서 단일 스레드 실행이 있으므로 batch ()가 onComplete() 신호 를 받을 때까지 항목을 계속 버퍼링하기 때문 입니다.

7. 웹 애플리케이션에서 스트림 사용

Ratpack은 비동기식 웹 프레임워크와 함께 반응형 스트림 사용을 지원합니다.

7.1. 데이터 스트림 수신

들어오는 데이터의 경우 핸들러의 Context 를 통해 사용할 수 있는 Request 개체 는 ByteBuf 개체의 TransformablePublisher 를 반환하는 getBodyStream() 메서드를 제공 합니다.

이 게시자로부터 처리 파이프라인을 구축할 수 있습니다.

@Bean
public Action<Chain> uploadFile() {
    
    return chain -> chain.post("upload", ctx -> {
        TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
        pub.subscribe(new Subscriber<ByteBuf>() {
            private Subscription sub;
            @Override
            public void onSubscribe(Subscription sub) {
                this.sub = sub;
                sub.request(1);
            }

            @Override
            public void onNext(ByteBuf t) {
                try {
                    ... do something useful with received data
                    sub.request(1);
                }
                finally {
                    // DO NOT FORGET to RELEASE !
                    t.release();
                }
            }

            @Override
            public void onError(Throwable t) {
                ctx.getResponse().status(500);
            }

            @Override
            public void onComplete() {
                ctx.getResponse().status(202);
            }
        }); 
    });
}

가입자를 구현할 때 고려해야 할 몇 가지 세부 사항이 있습니다. 먼저 우리는 어느 시점에서 ByteBufrelease() 메서드를 호출하는지 확인해야 합니다. 그렇게 하지 않으면 메모리 누수가 발생 합니다. 둘째, 모든 비동기 처리는 Ratpack의 프리미티브만 사용해야 합니다. 여기에는 Promise , Blocking 및 유사한 구조가 포함됩니다.

7.2. 데이터 스트림 보내기

데이터 스트림을 보내는 가장 직접적인 방법은 Response.sendStream() 을 사용하는 것 입니다. 이 메서드는 ByteBuf 게시자 인수를 사용하여 클라이언트에 데이터를 전송하고 오버플로를 방지하기 위해 필요에 따라 배압을 적용합니다.

@Bean
public Action<Chain> download() {
    return chain -> chain.get("download", ctx -> {
        ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
    });
}

간단하지만 이 방법을 사용할 때 단점이 있습니다. 클라이언트에 문제가 될 수 있는 Content-Length 를 포함하여 헤더를 자체적으로 설정하지 않습니다 .

$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted

또는 더 나은 방법은 핸들의 Context render() 메서드를 사용하여 ResponseChunks 객체 를 전달하는 것 입니다. 이 경우 응답은 " chunked ' 전송 인코딩 방법을 사용합니다. ResponseChunks 인스턴스 를 만드는 가장 간단한 방법 은 이 클래스에서 사용할 수 있는 정적 메서드 중 하나를 사용하는 것입니다.

@Bean
public Action<Chain> downloadChunks() {
    return chain -> chain.get("downloadChunks", ctx -> {
        ctx.render(ResponseChunks.bufferChunks("application/octetstream",
          new RandomBytesPublisher(1024,512)));
    });
}

이 변경으로 이제 응답에 콘텐츠 유형 헤더가 포함됩니다.

$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted

7.3. 서버 측 이벤트 사용

SSE(Server-Side Events) 지원도 render() 메서드를 사용합니다. 그러나 이 경우에는 ServerSentEvents 를 사용하여 Producer 에서 오는 항목을 이벤트 페이로드와 함께 일부 메타데이터를 포함하는 Event 개체 로 조정합니다.

@Bean
public Action<Chain> quotes() {
    ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
        evt
          .id(Long.toString(idSeq.incrementAndGet()))
          .event("quote")
          .data( q -> q.toString());
    });
    
    return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}

여기서 QuotesService 는 일정한 간격으로 무작위 인용을 생성하는 Publisher 를 생성하는 샘플 서비스일 뿐입니다 . 두 번째 인수는 보낼 이벤트를 준비하는 함수입니다. 여기에는 id , 이벤트 유형 및 페이로드 자체 추가가 포함됩니다.

curl 을 사용 하여 이 메서드를 테스트하여 이벤트 메타데이터와 함께 랜덤의 따옴표 시퀀스를 표시하는 출력을 생성할 수 있습니다.

$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]

... more quotes

7.4. Websocket 데이터 브로드캐스팅

Websockets.websocketBroadcast () 를 사용하여 모든 Publisher 에서 WebSocket 연결로 데이터를 파이프할 수 있습니다 .

@Bean
public Action<Chain> quotesWS() {
    Publisher<String> pub = Streams.transformable(quotesService.newTicker())
      .map(Quote::toString);
    return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}

여기에서 우리 는 이전에 보았던 것과 동일한 QuotesService 를 클라이언트에게 방송하는 이벤트 소스로 사용합니다. curl 을 다시 사용하여 WebSocket 클라이언트를 시뮬레이션해 보겠습니다 .

$ curl --include -v \
     --no-buffer \
     --header "Connection: Upgrade" \
     --header "Upgrade: websocket" \
     --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
     --header "Sec-WebSocket-Version: 13" \
     http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted

8. 결론

이 기사에서는 반응형 스트림에 대한 Ratpack의 지원과 이를 다양한 시나리오에 적용하는 방법을 살펴보았습니다.

늘 그렇듯이 예제의 전체 소스 코드는 GitHub 에서 찾을 수 있습니다 .

res – REST with Spring (eBook) (everywhere)