Skip to content

Instantly share code, notes, and snippets.

@kuenishi
Last active July 16, 2021 07:04
Show Gist options
  • Save kuenishi/c4407eab99ee57a0d120e075625f42ae to your computer and use it in GitHub Desktop.
Save kuenishi/c4407eab99ee57a0d120e075625f42ae to your computer and use it in GitHub Desktop.

Apache Ozone Multipart Upload

Protocol

InitiateMultipartUpload -> UploadPart (xN) -> CompleteMultipartUpload
                                 +-> AbortMultipartUpload
  • Limits https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
  • Create Multipart Upload (Bucket, Key) -> UploadId
  • Upload Part (B, K, UploadId, PartNumber, Content)
  • Upload Part (Copy) (B, K, UplaodId, PartNumber, Src(B, K))
  • Complete Multipart Upload (B, K, UploadId, Parts)
  • Abort Multipart Upload (B, K, UploadId)
  • List Parts (B, K, UploadId) -> Parts
  • List Multipart Uploads (B, K) -> Multipart Uploads

Data Structure Overview

  • Bucket
    • Key
      • UploadId A
        • PartNumber 1
        • PartNumber 2
        • ...
      • UploadId B
        • PartNumber 1
        • ...

Create Multipart Upload

  • ObjectEndpoint#initializeMultipartUpload <- POST
  • OzoneBucket
  • OzoneManager#initiateMultipartUpload()
  • KeyManagerImpl#initiateMultipartUpload()
    // Outside the critical section
    String uploadID = UUID.randomUUID().toString() + "-" + UniqueId.next();
  • KeyManagerImpl#createMultipartInfo()
  • DBKeyは (volume, bucket, key, uploadId)で作る
        metadataManager.getMultipartInfoTable().putWithBatch(batch,
            multipartKey, multipartKeyInfo);
        metadataManager.getOpenKeyTable().putWithBatch(batch,
            multipartKey, omKeyInfo);
        store.commitBatchOperation(batch);
  • ↑は旧コードで、今のセットアップでは実際には S3InitiateMultipartUploadRequest が呼ばれる(ほぼ同じコードが入っている)
  • OpenKeyTable で普通と違うのは... とくにMulripart Flagが立ってるわけでもなさそう
  • isMultipartKey を立てているところはどこか?
message KeyLocationList {
    optional uint64 version = 1;
    repeated KeyLocation keyLocations = 2;
    optional FileEncryptionInfoProto fileEncryptionInfo = 3;
    optional bool isMultipartKey = 4 [default = false];
}

message KeyInfo {
    required string volumeName = 1;
    required string bucketName = 2;
    required string keyName = 3;
    required uint64 dataSize = 4;
    required hadoop.hdds.ReplicationType type = 5;
    required hadoop.hdds.ReplicationFactor factor = 6;
    repeated KeyLocationList keyLocationList = 7;

UploadPart

  • ObjectEndpoint#put <- HTTP PUT
  • #createMultipartKey
  • OzoneBucket#createMultipartKey
  • ozoneManagerClient#openKey() -> isMultipartKey にして OpenKeyTableにエントリーをつくる
  • prepareMultipartKeyInfo() ->
String multipartKey = metadataManager
        .getMultipartKey(args.getVolumeName(), args.getBucketName(),
            args.getKeyName(), uploadID);
  • これでOpenKeyTableに登録している
  • あとは普通のPut Objectと同じ
  • KeyOutputStream#close()
  • blockOutputStreamEntryPool#commitKey
        commitUploadPartInfo =
            omClient.commitMultipartUploadPart(keyArgs, openID);
  • (OM) KeyManager#commitMultipartUploadPart()
  • OmUtils.prepareKeyForDelete()
  • このPartがまだない場合
            metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
            metadataManager.getMultipartInfoTable().putWithBatch(batch,
                multipartKey, multipartKeyInfo);
  • すでにPartがある場合
            metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo);
            metadataManager.getDeletedTable().putWithBatch(batch,
                oldPartKeyInfo.getPartName(),
                repeatedOmKeyInfo);
            metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
            metadataManager.getMultipartInfoTable().putWithBatch(batch,
                multipartKey, multipartKeyInfo);

CompleteMultipartUpload

  • ObjectEndpoint#completeMultipartUpload <- HTTP POST
  • OzoneBucket#completeMultipartUpload
  • KeyManager#completeMultipartUpload エッ
      //TODO: Actual logic has been removed from this, and the old code has a
      // bug. New code for this is in S3MultipartUploadCompleteRequest.
      // This code will be cleaned up as part of HDDS-2353.

知ってたけど

  • S3MultipartUploadCompleteRequest#validateAndUpdate()
  • Key validattion している
  • ここではCache updateするだけ
  • ちょいちょいこういうコメント入れたくなる気持ちはわかるけど・・・
        } else {
          // Already a version exists, so we should add it as a new version.
          // But now as versioning is not supported, just following the commit
          // key approach. When versioning support comes, then we can uncomment
          // below code keyInfo.addNewVersion(locations);
  • S3MultipartUploadCompleteResponse#addToDBBatch() で db update
  • 簡単なので全部転載
   omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
        multipartKey);
    omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
        multipartKey);

    String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
        omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
    omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKey,
        omKeyInfo);

    if (!partsUnusedList.isEmpty()) {
      // Add unused parts to deleted key table.
      RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager.getDeletedTable()
          .get(ozoneKey);
      if (repeatedOmKeyInfo == null) {
        repeatedOmKeyInfo = new RepeatedOmKeyInfo(partsUnusedList);
      } else {
        repeatedOmKeyInfo.addOmKeyInfo(omKeyInfo);
      }

      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
          ozoneKey, repeatedOmKeyInfo);
    }

AbortMultipartUpload

  • ObjectEndpoint#delete <- HTTP DELETE
  • ...
  • S3MultipartUploadAbortRequest#validateAndUpdateCache()
  • S3MultipartUploadAbortResponse#addToDBBatch() 重要なので全文転載
  public void addToDBBatch(OMMetadataManager omMetadataManager,
      BatchOperation batchOperation) throws IOException {

    // Delete from openKey table and multipart info table.
    omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
        multipartKey);
    omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
        multipartKey);

    // Move all the parts to delete table
    TreeMap<Integer, PartKeyInfo > partKeyInfoMap =
        omMultipartKeyInfo.getPartKeyInfoMap();
    for (Map.Entry<Integer, PartKeyInfo > partKeyInfoEntry :
        partKeyInfoMap.entrySet()) {
      PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
      OmKeyInfo currentKeyPartInfo =
          OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());

      RepeatedOmKeyInfo repeatedOmKeyInfo =
          omMetadataManager.getDeletedTable().get(partKeyInfo.getPartName());

      repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(currentKeyPartInfo,
          repeatedOmKeyInfo, omMultipartKeyInfo.getUpdateID(), isRatisEnabled);

      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
          partKeyInfo.getPartName(), repeatedOmKeyInfo);

      // update bucket usedBytes.
      omMetadataManager.getBucketTable().putWithBatch(batchOperation,
          omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
              omBucketInfo.getBucketName()), omBucketInfo);
    }
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment