1. 개요

이 예제에서는 Flux <DataBuffer> 를 단일 InputStream 으로 읽는 방법에 대한 흥미로운 문제 를 해결하기 위해 Java 반응형 프로그래밍 에 대해 자세히 알아볼 것 입니다.

2. 요청 설정

Flux<DataBuffer> 를 단일 InputStream 으로 읽는 문제를 해결하기 위한 첫 번째 단계로 GET 요청 을 만들기 위해 Spring 반응형 WebClient 를 사용할 것 입니다. 또한 이러한 테스트 시나리오를 위해 gorest.co.in 에서 호스팅하는 공개 API 엔드포인트 중 하나를 사용할 수 있습니다.

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

다음으로 WebClient 클래스 의 새 인스턴스를 가져오기 위한 getWebClient() 메서드를 정의해 보겠습니다.

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

이제 /public/v2/users Endpoints 에 GET 요청을 할 준비가 되었습니다 . 그러나 Response body을 Flux<DataBuffer> 개체로 가져와야 합니다. 따라서 이를 정확하게 수행하기 위해 BodyExtractors 에 대한 다음 섹션으로 이동하겠습니다 .

3. BodyExtractorDataBufferUtils

Response body을 Flux<DataBuffer> 로 추출하기 위해 spring-webflux 에서 사용할 수 있는 BodyExtractors 클래스 toDataBuffers() 메서드를 사용할 수 있습니다 .

AdChoices
광고

계속해서 Flux<DataBuffer> 유형 의 인스턴스로 본문 을 생성해 보겠습니다 .

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

다음으로 이러한 DataBuffer 스트림을 단일 InputStream 으로 수집해야 하므로 이를 달성하기 위한 좋은 전략은 PipedInputStreamPipedOutputStream 을 사용하는 것 입니다.

또한 PipedOutputStream 에 쓰고  결국 PipedInputStream 에서 읽습니다 . 이제 이 두 개의 연결된 스트림을 만드는 방법을 살펴보겠습니다.

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

기본 크기는 1024 바이트입니다. 그러나 Flux<DataBuffer> 에서 수집된 결과 가 기본값을 초과할 수 있다고 예상합니다. 따라서 크기에 더 큰 값을 명시적으로 지정해야 합니다. 이 경우에는 1024*10 입니다.

마지막으로, outputStream 에 게시자로서 본문 을 작성하기 위해 DataBufferUtils 클래스 에서 사용할 수 있는 write() 유틸리티 메서드 를 사용합니다 .

DataBufferUtils.write(body, outputStream).subscribe();

선언할 때 inputStreamoutputStream 에 연결 했다는 점에 유의해야 합니다 . 따라서 inputStream 에서 읽는 것이 좋습니다 . 다음 섹션으로 이동하여 이것이 어떻게 작동하는지 살펴보겠습니다.

4. PipedInputStream 에서 읽기

먼저, InputStreamString 객체 로 읽기 위한 도우미 메서드 readContent() 를 정의해 보겠습니다.

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

다음 으로 다른 스레드에서 PipedInputStream 을 읽는 것이 일반적인 방법이므로 readContent() 메서드를 호출하여 PipedInputStream 에서 String 개체 로 콘텐츠를 읽기 위해 새 스레드를 내부적으로 생성하는 readContentFromPipedInputStream() 메서드 를 만들어 보겠습니다 .

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

이 단계에서 코드는 시뮬레이션에 사용할 준비가 되었습니다. 실제로 살펴보겠습니다.

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

비동기식 시스템을 다룰 때 전체 응답을 볼 수 있도록 스트림에서 읽기 전에 랜덤의 3초 동안 읽기를 지연합니다. 또한 로깅 시 긴 출력을 여러 줄로 나누기 위해 개행 문자를 삽입하고 있습니다.

마지막으로 코드 실행으로 생성된 출력을 확인하겠습니다.

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]

그게 다야! 문제가 해결된 것 같습니다.

5. 결론

이 기사에서는 파이프 스트림의 개념과 BodyExtractorsDataBufferUtils 클래스 에서 사용할 수 있는 유틸리티 메서드를 사용하여 Flux<DataBuffer> 를 단일 InputStream 으로 읽었습니다 .

항상 그렇듯이 사용방법(예제)의 전체 소스 코드는 GitHub에서 사용할 수 있습니다 .

Generic footer banner