1. 목표

S3 업로드 에 대한 이전 기사 에서 jclouds의 일반 Blob API를 사용하여 S3에 콘텐츠를 업로드하는 방법을 살펴보았습니다. 이 기사에서는 jclouds의 S3 특정 비동기 API를 사용하여 콘텐츠를 업로드 하고 S3에서 제공 하는 멀티파트 업로드 기능을 활용합니다 .

2. 준비

2.1. 사용자 지정 API 설정

업로드 프로세스의 첫 번째 부분은 jclouds API를 생성하는 것입니다. 이것은 Amazon S3용 사용자 지정 API입니다.

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...

   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);

   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. 콘텐츠의 부품 수 결정

Amazon S3에는 업로드할 각 부분에 대해 5MB 제한이 있습니다. 따라서 가장 먼저 해야 할 일은 이 5MB 제한 미만의 부분이 없도록 콘텐츠를 분할할 수 있는 올바른 부분 수를 결정하는 것입니다.

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2.3. 콘텐츠를 부분으로 나누기

바이트 배열을 정해진 수의 부분으로 나누려고 했습니다.

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }

   return parts;
}

우리는 바이트 배열을 여러 부분으로 나누는 논리를 테스트 할 것입니다. 일부 바이트를 생성하고, 바이트 배열을 분할하고, Guava를 사용하여 다시 구성 하고, 원본을 다시 얻는지 확인 합니다.

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);

   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);

   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length, 
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

데이터를 생성하려면 Random 의 지원을 사용하기만 하면 됩니다 .

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2.4. 페이로드 생성

이제 콘텐츠에 대한 올바른 부분 수를 결정하고 콘텐츠를 부분으로 나누었으므로 jclouds API에 대한 Payload 객체를 생성 해야 합니다.

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. 업로드

업로드 프로세스는 유연한 다단계 프로세스입니다. 이는 다음을 의미합니다.

  • 모든 데이터를 갖기 전에 업로드를 시작할 수 있습니다 . 데이터가 들어오는 대로 업로드할 수 있습니다.
  • 데이터는 청크 로 업로드됩니다 . 이러한 작업 중 하나가 실패하면 간단히 검색할 수 있습니다.
  • 청크 를 병렬 로 업로드할 수 있습니다 . 이는 특히 대용량 파일의 경우 업로드 속도를 크게 높일 수 있습니다.

3.1. 업로드 작업 시작

업로드 작업의 첫 번째 단계는 프로세스를 시작하는 것 입니다. S3에 대한 이 요청은 표준 HTTP 헤더를 포함해야 합니다 . 특히 Content - MD5 헤더를 계산해야 합니다. 여기에서 Guava 해시 함수 지원을 사용하려고 했습니다.

Hashing.md5().hashBytes(byteArray).asBytes();

이것은 아직 부분이 아닌 전체 바이트 배열의 md5 해시 입니다.

업로드 를 시작하고 S3와의 모든 추가 상호 작용을 위해 이전에 생성한 비동기 API인 AWSS3AsyncClient를 사용할 것입니다.

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

는 객체에 할당된 핸들입니다 . 이것은 클라이언트가 지정한 고유 식별자여야 합니다.

또한 API의 비동기 버전을 사용하고 있음에도 불구하고 이 작업의 결과를 차단 하고 있습니다. 이는 앞으로 나아갈 수 있으려면 초기화 결과가 필요하기 때문입니다.

작업의 결과는 S3에서 반환된 업로드 ID 입니다. 이는 전체 수명 주기 동안 업로드를 식별하고 모든 후속 업로드 작업에 표시됩니다.

3.2. 부품 업로드

다음 단계는 부품을 업로드하는 것 입니다. 업로드 부분 작업이 업로드 프로세스의 대부분을 나타내므로 여기에서 우리의 목표는 이러한 요청 을 병렬 로 보내는 것입니다.

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

부품 번호는 연속적이어야 하지만 요청이 전송되는 순서는 관련이 없습니다.

모든 업로드 부분 요청이 제출된 후 각 부분의 개별 ETag 값을 수집할 수 있도록 응답을 기다려야 합니다 .

Function<ListenableFuture<String>, String> getEtagFromOp = 
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

어떤 이유로든 업로드 부분 작업 중 하나가 실패 하면 성공할 때까지 작업을 재시도할 수 있습니다 . 위의 논리에는 재시도 메커니즘이 포함되어 있지 않지만 이를 구축하는 것은 충분히 간단해야 합니다.

3.3. 업로드 작업 완료

업로드 프로세스의 마지막 단계는 멀티파트 작업을 완료하는 것 입니다. S3 API에는 이전 부분 업로드의 응답이 Map 으로 필요합니다. 이제 위에서 얻은 ETag List에서 쉽게 만들 수 있습니다.

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

마지막으로 전체 요청을 보냅니다.

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

이렇게 하면 완성된 개체의 최종 ETag가 반환되고 전체 업로드 프로세스가 완료됩니다.

4. 결론

이 기사에서는 사용자 지정 S3 jclouds API를 사용하여 S3에 대한 멀티파트 지원 완전 병렬 업로드 작업을 구축했습니다. 이 작업은 그대로 사용할 수 있지만 몇 가지 방법으로 개선 할 수 있습니다.

먼저, 업로드 작업 주변에 재시도 논리 를 추가하여 오류를 더 잘 처리해야 합니다.

다음으로, 매우 큰 파일의 경우 메커니즘이 모든 업로드 멀티파트 요청을 병렬로 전송하더라도 조절 메커니즘 은 전송되는 병렬 요청 수를 여전히 제한해야 합니다. 이는 대역폭이 병목 현상이 되는 것을 방지할 뿐만 아니라 Amazon 자체에서 업로드 프로세스에 허용되는 초당 요청 한도를 초과하는 것으로 표시하지 않도록 하기 위한 것입니다. Guava RateLimiter 는 잠재적으로 이에 매우 적합할 수 있습니다.

Cloud footer banner