1. 소개
AWS는 공식 SDK를 사용하여 Java에서 액세스할 수 있는 많은 API를 통해 많은 서비스를 제공합니다. 그러나 최근까지 이 SDK는 반응적 작업에 대한 지원을 제공하지 않았으며 비동기식 액세스에 대한 지원이 제한적이었습니다.
AWS SDK for Java 2.0이 출시되면서 이제 Reactive Streams 표준을 채택한 덕분 에 완전한 비차단 I/O 모드에서 이러한 API를 사용할 수 있습니다 .
이 사용방법(예제)에서는 잘 알려진 S3 서비스를 저장소 백엔드로 사용하는 Spring Boot에서 간단한 Blob 저장소 REST API를 구현하여 이러한 새로운 기능을 살펴보겠습니다.
2. AWS S3 운영 개요
구현에 들어가기 전에 여기에서 달성하고자 하는 것에 대한 간략한 개요를 살펴보겠습니다. 일반적인 Blob 저장소 서비스는 최종 사용자가 오디오, 사진 및 문서와 같은 여러 유형의 콘텐츠를 업로드, 나열, 다운로드 및 삭제할 수 있도록 프런트 엔드 애플리케이션에서 사용하는 CRUD 작업을 노출합니다.
전통적인 구현이 처리해야 하는 일반적인 문제는 대용량 파일이나 느린 연결을 효율적으로 처리하는 방법 입니다. 초기 버전(servlet 3.0 이전)에서는 JavaEE 사양이 제공해야 하는 모든 것이 차단 API였으므로 각 동시 Blob 저장소 클라이언트에 대한 스레드가 필요했습니다. 이 모델에는 더 많은 서버 리소스(ergo, 더 큰 시스템)가 필요하고 DoS 유형 공격에 더 취약하다는 단점이 있습니다.
반응형 스택을 사용하면 같은 수의 클라이언트에 대해 리소스를 덜 사용하는 서비스를 만들 수 있습니다 . 리액터 구현은 읽을 새 데이터의 가용성 또는 이전 쓰기 완료 와 같은 I/O 완료 이벤트에 대한 응답으로 발송되는 적은 수의 스레드를 사용합니다 .
이는 동일한 스레드가 더 이상 사용할 수 있는 작업이 없을 때까지 활성 클라이언트 연결에서 발생할 수 있는 해당 이벤트를 계속 처리함을 의미합니다. 이 접근 방식은 상당히 비용이 많이 드는 작업인 컨텍스트 스위치의 수를 크게 줄이고 사용 가능한 리소스를 매우 효율적으로 사용할 수 있도록 합니다.
3. 프로젝트 설정
데모 프로젝트는 Lombok 및 JUnit과 같은 일반적인 지원 의존성을 포함 하는 표준 Spring Boot WebFlux 애플리케이션입니다.
이러한 라이브러리 외에도 AWS SDK for Java V2 의존성을 가져와야 합니다.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.10.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<artifactId>netty-nio-client</artifactId>
<groupId>software.amazon.awssdk</groupId>
<scope>compile</scope>
</dependency>
</dependencies>
AWS SDK 는 모든 의존성에 필요한 버전을 정의하는 BOM 을 제공 하므로 POM 파일의 의존성 섹션에 지정할 필요가 없습니다.
SDK에서 다른 핵심 의존성을 가져올 S3 클라이언트 라이브러리를 추가했습니다. AWS와 상호작용하기 위해 비동기 API를 사용할 것이기 때문에 필요한 Netty 클라이언트 라이브러리도 필요합니다.
공식 AWS 설명서 에는 사용 가능한 전송에 대한 자세한 내용이 포함되어 있습니다.
4. AWS S3 클라이언트 생성
S3 작업의 진입점은 새 API 호출을 시작하는 데 사용할 S3AsyncClient 클래스 입니다.
이 클래스의 단일 인스턴스만 필요하므로 @Bean 메서드를 사용하여 @Configuration 클래스를 만들어 필요한 곳에 주입할 수 있도록 합시다.
@Configuration
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
public class S3ClientConfiguration {
@Bean
public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props,
AwsCredentialsProvider credentialsProvider) {
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ZERO)
.maxConcurrency(64)
.build();
S3Configuration serviceConfiguration = S3Configuration.builder()
.checksumValidationEnabled(false)
.chunkedEncodingEnabled(true)
.build();
S3AsyncClientBuilder b = S3AsyncClient.builder().httpClient(httpClient)
.region(s3props.getRegion())
.credentialsProvider(credentialsProvider)
.serviceConfiguration(serviceConfiguration);
if (s3props.getEndpoint() != null) {
b = b.endpointOverride(s3props.getEndpoint());
}
return b.build();
}
}
이 데모에서는 S3 서비스에 액세스하는 데 필요한 다음 정보를 포함하는 최소한의 @ConfigurationProperties 클래스(리포지토리에서 사용 가능)를 사용하고 있습니다.
- 지역: us-east-1 과 같은 유효한 AWS 지역 식별자
- accessKeyId/secretAccessKey : AWS API 키 및 식별자
- endpoint: S3의 기본 서비스 엔드포인트를 재정의하는 데 사용할 수 있는 선택적 URI입니다. 주요 사용 사례는 S3 호환 API를 제공하는 대체 스토리지 솔루션과 함께 데모 코드를 사용하는 것입니다(예: minio 및 localstack).
- 버킷 : 업로드된 파일을 저장할 버킷 이름
클라이언트의 초기화 코드에 대해 언급할 가치가 있는 몇 가지 사항이 있습니다. 첫째, 쓰기 제한 시간을 비활성화하고 최대 동시성을 높이고 있으므로 대역폭이 낮은 상황에서도 업로드가 완료될 수 있습니다.
둘째, 체크섬 유효성 검사를 비활성화하고 청크 분할 인코딩을 활성화합니다. 데이터가 스트리밍 방식으로 서비스에 도착하는 즉시 버킷에 데이터 업로드를 시작하려고 하기 때문에 이렇게 합니다.
마지막으로 관리자가 버킷을 이미 만들고 구성했다고 가정하므로 버킷 생성 자체를 다루지 않습니다.
자격 증명의 경우 Spring 속성에서 자격 증명을 복구할 수 있는 사용자 지정 AwsCredentialsProvider 를 제공합니다. 이렇게 하면 Spring의 환경 추상화 및 Vault 또는 구성 서버와 같은 지원되는 모든 PropertySource 구현 을 통해 해당 값을 주입할 수 있는 가능성이 열립니다 .
@Bean
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
if (StringUtils.isBlank(s3props.getAccessKeyId())) {
return DefaultCredentialsProvider.create();
} else {
return () -> {
return AwsBasicCredentials.create(
s3props.getAccessKeyId(),
s3props.getSecretAccessKey());
};
}
}
5. 업로드 서비스 개요
이제 /inbox 경로 에서 연결할 수 있는 업로드 서비스를 구현할 것 입니다.
이 리소스 경로에 대한 POST 는 임의로 생성된 키 아래 S3 버킷에 파일을 저장합니다. 원본 파일 이름을 메타데이터 키로 저장하여 브라우저에 적합한 HTTP 다운로드 헤더를 생성하는 데 사용할 수 있습니다.
단순 업로드와 다중 파트 업로드라는 두 가지 시나리오를 처리해야 합니다. 계속해서 @RestController 를 만들고 이러한 시나리오를 처리하는 메서드를 추가해 보겠습니다.
@RestController
@RequestMapping("/inbox")
@Slf4j
public class UploadResource {
private final S3AsyncClient s3client;
private final S3ClientConfigurarionProperties s3config;
public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
this.s3client = s3client;
this.s3config = s3config;
}
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... see section 6
}
@RequestMapping(
consumes = MediaType.MULTIPART_FORM_DATA_VALUE,
method = {RequestMethod.POST, RequestMethod.PUT})
public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(
@RequestHeader HttpHeaders headers,
@RequestBody Flux<Part> parts ) {
// ... see section 7
}
}
처리기 서명은 두 경우의 주요 차이점을 반영합니다. 간단한 경우 본문에는 파일 콘텐츠 자체가 포함되는 반면 다중 부분의 경우에는 각각 파일 또는 양식 데이터에 해당하는 여러 "부분"이 있을 수 있습니다.
편의상 POST 또는 PUT 방식을 사용하는 멀티파트 업로드를 지원합니다. 그 이유는 일부 도구( 특히 cURL )는 -F 옵션 으로 파일을 업로드할 때 기본적으로 후자를 사용하기 때문 입니다.
두 경우 모두 작업 결과와 클라이언트가 원본 파일을 복구하는 데 사용해야 하는 생성된 파일 키를 포함 하는 UploadResult 를 반환합니다 . 자세한 내용은 나중에 설명합니다!
6. 단일 파일 업로드
이 경우 클라이언트는 원시 데이터를 포함하는 요청 본문과 함께 간단한 POST 작업으로 콘텐츠를 보냅니다. 반응형 웹 애플리케이션에서 이 콘텐츠를 수신하려면 Flux<ByteBuffer> 인수 를 사용하는 @PostMapping 메서드 를 선언하기만 하면 됩니다.
이 경우 이 플럭스를 새 S3 파일로 스트리밍하는 것은 간단합니다.
생성된 키, 파일 길이, MIME 콘텐츠 유형으로 PutObjectRequest 를 빌드 하고 S3 클라이언트 의 putObject() 메서드에 전달하기만 하면 됩니다.
@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {
// ... some validation code omitted
String fileKey = UUID.randomUUID().toString();
MediaType mediaType = headers.getContentType();
if (mediaType == null) {
mediaType = MediaType.APPLICATION_OCTET_STREAM;
}
CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(mediaType.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));
return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}
여기서 중요한 점은 들어오는 Flux 를 putObject() 메서드에 전달하는 방법입니다.
이 메서드는 요청 시 콘텐츠를 제공하는 AsyncRequestBody 객체를 기대합니다. 기본적으로 몇 가지 추가 편의 방법 이 있는 일반 게시자 입니다. 우리의 경우에는 Flux 를 필요한 유형으로 변환하기 위해 fromPublisher () 메서드를 활용할 것입니다.
또한 클라이언트가 올바른 값과 함께 Content-Length HTTP 헤더를 보낼 것이라고 가정합니다 . 필수 필드이므로 이 정보가 없으면 호출이 실패합니다.
SDK V2의 비동기 메서드는 항상 CompletableFuture 개체를 반환합니다. fromFuture() 팩토리 메서드 를 사용하여 Mono 에 적용합니다 . 이것은 최종 UploadResult 개체에 매핑됩니다.
7. 여러 파일 업로드
특히 우리를 위해 모든 세부 사항을 처리하는 라이브러리를 사용할 때 multipart/form-data 업로드 를 처리하는 것이 쉬워 보일 수 있습니다 . 그렇다면 업로드된 각 파일에 대해 이전 방법을 간단히 사용할 수 있습니까? 네, 하지만 여기에는 대가가 따릅니다. 바로 버퍼링입니다.
이전 방법을 사용하려면 부분의 길이가 필요하지만 청크 분할 파일 전송에는 이 정보가 항상 포함되지는 않습니다. 한 가지 접근 방식은 파트를 임시 파일에 저장한 다음 AWS로 보내는 것이지만 이렇게 하면 전체 업로드 시간이 느려집니다. 또한 서버를 위한 추가 스토리지를 의미합니다.
대안으로 여기서는 AWS 멀티파트 업로드를 사용합니다 . 이 기능을 사용하면 단일 파일의 업로드를 병렬 및 비순차적으로 보낼 수 있는 여러 청크로 분할할 수 있습니다.
전송해야 하는 단계는 다음과 같습니다.
- createMultipartUpload 요청 – AWS 는 다음 호출에서 사용할 uploadId 로 응답합니다.
- uploadId , 시퀀스 번호 및 콘텐츠 를 포함하는 파일 청크 – AWS는 각 부분에 대한 ETag 식별자로 응답합니다.
- 받은 uploadId 및 모든 ETag 를 포함 하는 completeUpload 요청
참고: 수신 된 각 FilePart 에 대해 이러한 단계를 반복합니다 !
7.1. 최상위 파이프라인
@Controller 클래스 의 multipartUploadHandler 는 멀티파트 파일 업로드 처리를 담당합니다. 이 컨텍스트에서 각 부분은 MIME 유형으로 식별되는 모든 종류의 데이터를 가질 수 있습니다. 반응형 웹 프레임워크는 이러한 부분을 우리 가 차례로 처리할 Part 인터페이스 를 구현하는 개체 의 Flux 로 핸들러에 전달합니다.
return parts
.ofType(FilePart.class)
.flatMap((part)-> saveFile(headers, part))
.collect(Collectors.toList())
.map((keys)-> new UploadResult(HttpStatus.CREATED, keys)));
이 파이프라인은 항상 FilePart 인터페이스를 구현하는 개체인 실제 업로드된 파일에 해당하는 부분을 필터링하는 것으로 시작합니다. 그런 다음 각 부분은 단일 파일의 실제 업로드를 처리하고 생성된 파일 키를 반환하는 saveFile 메서드로 전달됩니다.
List 의 모든 키를 수집하고 마지막으로 최종 UploadResult 를 빌드합니다 . 우리는 항상 새 리소스를 생성하므로 일반 OK 대신 더 설명적인 CREATED 상태(202)를 반환합니다.
7.2. 단일 파일 업로드 처리
AWS의 멀티파트 방법을 사용하여 파일을 업로드하는 데 필요한 단계를 이미 설명했습니다. 하지만 문제가 있습니다. S3 서비스에서는 마지막 부분을 제외한 각 부분이 지정된 최소 크기(현재 5MB)를 가져야 합니다.
즉, 수신된 청크를 바로 가져와서 보낼 수는 없습니다. 대신 최소 크기 또는 데이터 끝에 도달할 때까지 로컬에서 버퍼링해야 합니다. 전송한 부품 수와 결과 CompletedPart 결과 를 추적할 장소도 필요하므로 이 상태를 유지하기 위해 간단한 UploadState 내부 클래스를 만듭니다.
class UploadState {
String bucket;
String filekey;
String uploadId;
int partCounter;
Map<Integer, CompletedPart> completedParts = new HashMap<>();
int buffered = 0;
// ... getters/setters omitted
UploadState(String bucket, String filekey) {
this.bucket = bucket;
this.filekey = filekey;
}
}
필요한 단계와 버퍼링이 주어지면 언뜻보기에 약간 위협적으로 보일 수 있는 구현으로 끝납니다.
Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
String filekey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();
String filename = part.filename();
if ( filename == null ) {
filename = filekey;
}
metadata.put("filename", filename);
MediaType mt = part.headers().getContentType();
if ( mt == null ) {
mt = MediaType.APPLICATION_OCTET_STREAM;
}
UploadState uploadState = new UploadState(bucket,filekey);
CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
.createMultipartUpload(CreateMultipartUploadRequest.builder()
.contentType(mt.toString())
.key(filekey)
.metadata(metadata)
.bucket(bucket)
.build());
return Mono
.fromFuture(uploadRequest)
.flatMapMany((response) -> {
checkResult(response);
uploadState.uploadId = response.uploadId();
return part.content();
})
.bufferUntil((buffer) -> {
uploadState.buffered += buffer.readableByteCount();
if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
uploadState.buffered = 0;
return true;
} else {
return false;
}
})
.map((buffers) -> concatBuffers(buffers))
.flatMap((buffer) -> uploadPart(uploadState,buffer))
.reduce(uploadState,(state,completedPart) -> {
state.completedParts.put(completedPart.partNumber(), completedPart);
return state;
})
.flatMap((state) -> completeUpload(state))
.map((response) -> {
checkResult(response);
return uploadState.filekey;
});
}
먼저 일부 파일 메타데이터를 수집하고 이를 사용하여 createMultipartUpload() API 호출에 필요한 요청 객체를 생성합니다. 이 호출은 스트리밍 파이프라인의 시작점인 CompletableFuture 를 반환합니다.
이 파이프라인의 각 단계에서 수행하는 작업을 검토해 보겠습니다.
- S3에서 생성된 uploadId 가 포함된 초기 결과를 수신한 후 업로드 상태 개체에 저장하고 파일 본문 스트리밍을 시작합니다. Mono 를 Flux 로 바꾸는 flatMapMany 의 사용에 주목 하십시오.
- 필요한 바이트 수를 축적하기 위해 bufferUntil() 을 사용 합니다. 이 시점에서 파이프라인 은 DataBuffer 개체의 Flux 에서 List<DataBuffer> 개체 의 Flux 로 변경됩니다.
- 각 List<DataBuffer> 를 ByteBuffer 로 변환
- ByteBuffer 를 S3로 보내고 (다음 섹션 참조) 결과 CompletedPart 값 다운스트림 을 반환합니다.
- 결과 CompletedPart 값을 uploadState 로 줄입니다.
- 업로드가 완료되었음을 S3에 알립니다(자세한 내용은 나중에 설명).
- 생성된 파일 키 반환
7.3. 파일 부분 업로드
다시 한 번 여기서 "파일 부분"은 단일 파일의 일부 (예: 100MB 파일의 첫 5MB)를 의미하며 우연히 파일이 되는 메시지의 일부가 아님 을 분명히 합시다. 최상위 스트림!
파일 업로드 파이프라인 은 업로드 상태와 ByteBuffer 라는 두 가지 인수를 사용하여 uploadPart() 메서드를 호출합니다 . 여기에서 UploadPartRequest 인스턴스 를 빌드하고 S3AsyncClient 에서 사용할 수 있는 uploadPart() 메서드 를 사용 하여 데이터를 보냅니다.
private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
final int partNumber = ++uploadState.partCounter;
CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
.bucket(uploadState.bucket)
.key(uploadState.filekey)
.partNumber(partNumber)
.uploadId(uploadState.uploadId)
.contentLength((long) buffer.capacity())
.build(),
AsyncRequestBody.fromPublisher(Mono.just(buffer)));
return Mono
.fromFuture(request)
.map((uploadPartResult) -> {
checkResult(uploadPartResult);
return CompletedPart.builder()
.eTag(uploadPartResult.eTag())
.partNumber(partNumber)
.build();
});
}
여기서는 uploadPart() 요청의 반환 값을 사용하여 CompletedPart 인스턴스를 빌드합니다. 이는 나중에 업로드를 닫는 최종 요청을 작성할 때 필요한 AWS SDK 유형입니다.
7.4. 업로드 완료
마지막 으로 S3 에 completeMultipartUpload() 요청을 전송하여 멀티파트 파일 업로드를 완료해야 합니다 . 업로드 파이프라인이 인수로 필요한 모든 정보를 전달한다는 점을 고려하면 이는 매우 쉽습니다.
private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
.parts(state.completedParts.values())
.build();
return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(state.bucket)
.uploadId(state.uploadId)
.multipartUpload(multipartUpload)
.key(state.filekey)
.build()));
}
8. AWS에서 파일 다운로드
멀티파트 업로드와 비교할 때 S3 버킷에서 객체를 다운로드하는 작업은 훨씬 간단 합니다. 이 경우 청크나 그와 유사한 것에 대해 걱정할 필요가 없습니다. SDK API는 두 개의 인수를 사용하는 getObject() 메서드를 제공합니다.
- 요청된 버킷과 파일 키가 포함 된 GetObjectRequest 객체
- 들어오는 스트리밍 응답을 다른 것에 매핑할 수 있는 AsyncResponseTransformer
SDK는 Flux에 스트림을 적용할 수 있도록 하는 후자의 구현을 몇 가지 제공 하지만 비용도 발생 합니다. 내부적으로 배열 버퍼에 데이터를 버퍼링 합니다. 이 버퍼링으로 인해 데모 서비스 클라이언트에 대한 응답 시간이 느려지므로 자체 어댑터를 구현할 것입니다. 이는 큰 문제가 아닙니다.
8.1. 컨트롤러 다운로드
우리의 다운로드 컨트롤러는 다운로드 요청을 처리 하는 단일 @GetMapping 메서드 가 있는 표준 Spring Reactive @RestController 입니다. @PathVariable 인수 를 통해 파일 키를 예상하고 파일 콘텐츠와 함께 비동기 ResponseEntity 를 반환합니다.
@GetMapping(path="/{filekey}")
Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3config.getBucket())
.key(filekey)
.build();
return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
.map(response -> {
checkResult(response.response());
String filename = getMetadataItem(response.response(),"filename",filekey);
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, response.response().contentType())
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response().contentLength()))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
.body(Flux.from(response));
});
}
여기서 getMetadataItem() 은 응답에서 지정된 메타데이터 키를 대소문자를 구분하지 않는 방식으로 조회하는 도우미 메서드일 뿐입니다.
이것은 중요한 세부 사항입니다. S3는 특수 HTTP 헤더를 사용하여 메타데이터 정보를 반환하지만 해당 헤더는 대소문자를 구분하지 않습니다( RFC 7230, 섹션 3.2 참조) . 이것은 구현이 주어진 항목에 대한 대소문자를 마음대로 변경할 수 있음을 의미하며 이는 실제로 MinIO 를 사용할 때 발생합니다 .
9. 결론
이 사용방법(예제)에서는 AWS SDK V2 라이브러리에서 사용할 수 있는 반응형 확장을 사용하는 기본 사항을 다뤘습니다. 여기에서 우리의 초점은 AWS S3 서비스였지만 동일한 기술을 DynamoDB와 같은 다른 반응 지원 서비스로 확장할 수 있습니다.
평소와 같이 모든 코드는 GitHub 에서 사용할 수 있습니다 .