1. 개요
이 예제에서는 Flux <DataBuffer> 를 단일 InputStream 으로 읽는 방법에 대한 흥미로운 문제 를 해결하기 위해 Java 반응형 프로그래밍 에 대해 자세히 알아볼 것 입니다.
2. 요청 설정
Flux<DataBuffer> 를 단일 InputStream 으로 읽는 문제를 해결하기 위한 첫 번째 단계로 GET 요청 을 만들기 위해 Spring 반응형 WebClient 를 사용할 것 입니다. 또한 이러한 테스트 시나리오를 위해 gorest.co.in 에서 호스팅하는 공개 API 엔드포인트 중 하나를 사용할 수 있습니다.
String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";
다음으로 WebClient 클래스 의 새 인스턴스를 가져오기 위한 getWebClient() 메서드를 정의해 보겠습니다.
static WebClient getWebClient() {
WebClient.Builder webClientBuilder = WebClient.builder();
return webClientBuilder.build();
}
이제 /public/v2/users Endpoints 에 GET 요청을 할 준비가 되었습니다 . 그러나 Response body을 Flux<DataBuffer> 개체로 가져와야 합니다. 따라서 이를 정확하게 수행하기 위해 BodyExtractors 에 대한 다음 섹션으로 이동하겠습니다 .
3. BodyExtractor 및 DataBufferUtils
Response body을 Flux<DataBuffer> 로 추출하기 위해 spring-webflux 에서 사용할 수 있는 BodyExtractors 클래스 의 toDataBuffers() 메서드를 사용할 수 있습니다 .
계속해서 Flux<DataBuffer> 유형 의 인스턴스로 본문 을 생성해 보겠습니다 .
Flux<DataBuffer> body = client
.get(
.uri(REQUEST_ENDPOINT)
.exchangeToFlux( clientResponse -> {
return clientResponse.body(BodyExtractors.toDataBuffers());
});
다음으로 이러한 DataBuffer 스트림을 단일 InputStream 으로 수집해야 하므로 이를 달성하기 위한 좋은 전략은 PipedInputStream 및 PipedOutputStream 을 사용하는 것 입니다.
또한 PipedOutputStream 에 쓰고 결국 PipedInputStream 에서 읽습니다 . 이제 이 두 개의 연결된 스트림을 만드는 방법을 살펴보겠습니다.
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);
기본 크기는 1024 바이트입니다. 그러나 Flux<DataBuffer> 에서 수집된 결과 가 기본값을 초과할 수 있다고 예상합니다. 따라서 크기에 더 큰 값을 명시적으로 지정해야 합니다. 이 경우에는 1024*10 입니다.
마지막으로, outputStream 에 게시자로서 본문 을 작성하기 위해 DataBufferUtils 클래스 에서 사용할 수 있는 write() 유틸리티 메서드 를 사용합니다 .
DataBufferUtils.write(body, outputStream).subscribe();
선언할 때 inputStream 을 outputStream 에 연결 했다는 점에 유의해야 합니다 . 따라서 inputStream 에서 읽는 것이 좋습니다 . 다음 섹션으로 이동하여 이것이 어떻게 작동하는지 살펴보겠습니다.
4. PipedInputStream 에서 읽기
먼저, InputStream 을 String 객체 로 읽기 위한 도우미 메서드 readContent() 를 정의해 보겠습니다.
String readContent(InputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
byte[] tmp = new byte[stream.available()];
int byteCount = stream.read(tmp, 0, tmp.length);
contentStringBuffer.append(new String(tmp));
return String.valueOf(contentStringBuffer);
}
다음 으로 다른 스레드에서 PipedInputStream 을 읽는 것이 일반적인 방법이므로 readContent() 메서드를 호출하여 PipedInputStream 에서 String 개체 로 콘텐츠를 읽기 위해 새 스레드를 내부적으로 생성하는 readContentFromPipedInputStream() 메서드 를 만들어 보겠습니다 .
String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
try {
Thread pipeReader = new Thread(() -> {
try {
contentStringBuffer.append(readContent(stream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
pipeReader.start();
pipeReader.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stream.close();
}
return String.valueOf(contentStringBuffer);
}
이 단계에서 코드는 시뮬레이션에 사용할 준비가 되었습니다. 실제로 살펴보겠습니다.
WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));
비동기식 시스템을 다룰 때 전체 응답을 볼 수 있도록 스트림에서 읽기 전에 랜덤의 3초 동안 읽기를 지연합니다. 또한 로깅 시 긴 출력을 여러 줄로 나누기 위해 개행 문자를 삽입하고 있습니다.
마지막으로 코드 실행으로 생성된 출력을 확인하겠습니다.
20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content:
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]
그게 다야! 문제가 해결된 것 같습니다.
5. 결론
이 기사에서는 파이프 스트림의 개념과 BodyExtractors 및 DataBufferUtils 클래스 에서 사용할 수 있는 유틸리티 메서드를 사용하여 Flux<DataBuffer> 를 단일 InputStream 으로 읽었습니다 .
항상 그렇듯이 사용방법(예제)의 전체 소스 코드는 GitHub에서 사용할 수 있습니다 .