InitiateMultipartUpload -> UploadPart (xN) -> CompleteMultipartUpload
+-> AbortMultipartUpload
- Limits https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
- Create Multipart Upload (Bucket, Key) -> UploadId
- Upload Part (B, K, UploadId, PartNumber, Content)
- Upload Part (Copy) (B, K, UplaodId, PartNumber, Src(B, K))
- Complete Multipart Upload (B, K, UploadId, Parts)
- Abort Multipart Upload (B, K, UploadId)
- List Parts (B, K, UploadId) -> Parts
- List Multipart Uploads (B, K) -> Multipart Uploads
Data Structure Overview
- Bucket
- Key
- UploadId A
- PartNumber 1
- PartNumber 2
- ...
- UploadId B
- PartNumber 1
- ...
- UploadId A
- Key
- ObjectEndpoint#initializeMultipartUpload <-
POST
- OzoneBucket
- OzoneManager#initiateMultipartUpload()
- KeyManagerImpl#initiateMultipartUpload()
// Outside the critical section
String uploadID = UUID.randomUUID().toString() + "-" + UniqueId.next();
- KeyManagerImpl#createMultipartInfo()
- DBKeyは (volume, bucket, key, uploadId)で作る
metadataManager.getMultipartInfoTable().putWithBatch(batch,
multipartKey, multipartKeyInfo);
metadataManager.getOpenKeyTable().putWithBatch(batch,
multipartKey, omKeyInfo);
store.commitBatchOperation(batch);
- ↑は旧コードで、今のセットアップでは実際には
S3InitiateMultipartUploadRequest
が呼ばれる(ほぼ同じコードが入っている) - OpenKeyTable で普通と違うのは... とくにMulripart Flagが立ってるわけでもなさそう
- isMultipartKey を立てているところはどこか?
message KeyLocationList {
optional uint64 version = 1;
repeated KeyLocation keyLocations = 2;
optional FileEncryptionInfoProto fileEncryptionInfo = 3;
optional bool isMultipartKey = 4 [default = false];
}
message KeyInfo {
required string volumeName = 1;
required string bucketName = 2;
required string keyName = 3;
required uint64 dataSize = 4;
required hadoop.hdds.ReplicationType type = 5;
required hadoop.hdds.ReplicationFactor factor = 6;
repeated KeyLocationList keyLocationList = 7;
- ObjectEndpoint#put <- HTTP PUT
- #createMultipartKey
- OzoneBucket#createMultipartKey
- ozoneManagerClient#openKey() -> isMultipartKey にして OpenKeyTableにエントリーをつくる
- prepareMultipartKeyInfo() ->
String multipartKey = metadataManager
.getMultipartKey(args.getVolumeName(), args.getBucketName(),
args.getKeyName(), uploadID);
- これでOpenKeyTableに登録している
- あとは普通のPut Objectと同じ
- KeyOutputStream#close()
- blockOutputStreamEntryPool#commitKey
commitUploadPartInfo =
omClient.commitMultipartUploadPart(keyArgs, openID);
- (OM) KeyManager#commitMultipartUploadPart()
- OmUtils.prepareKeyForDelete()
- このPartがまだない場合
metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
metadataManager.getMultipartInfoTable().putWithBatch(batch,
multipartKey, multipartKeyInfo);
- すでにPartがある場合
metadataManager.getDeletedTable().put(partName, repeatedOmKeyInfo);
metadataManager.getDeletedTable().putWithBatch(batch,
oldPartKeyInfo.getPartName(),
repeatedOmKeyInfo);
metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
metadataManager.getMultipartInfoTable().putWithBatch(batch,
multipartKey, multipartKeyInfo);
- ObjectEndpoint#completeMultipartUpload <- HTTP POST
- OzoneBucket#completeMultipartUpload
- KeyManager#completeMultipartUpload エッ
//TODO: Actual logic has been removed from this, and the old code has a
// bug. New code for this is in S3MultipartUploadCompleteRequest.
// This code will be cleaned up as part of HDDS-2353.
知ってたけど
- S3MultipartUploadCompleteRequest#validateAndUpdate()
- Key validattion している
- ここではCache updateするだけ
- ちょいちょいこういうコメント入れたくなる気持ちはわかるけど・・・
} else {
// Already a version exists, so we should add it as a new version.
// But now as versioning is not supported, just following the commit
// key approach. When versioning support comes, then we can uncomment
// below code keyInfo.addNewVersion(locations);
- S3MultipartUploadCompleteResponse#addToDBBatch() で db update
- 簡単なので全部転載
omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
multipartKey);
omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
multipartKey);
String ozoneKey = omMetadataManager.getOzoneKey(omKeyInfo.getVolumeName(),
omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKey,
omKeyInfo);
if (!partsUnusedList.isEmpty()) {
// Add unused parts to deleted key table.
RepeatedOmKeyInfo repeatedOmKeyInfo = omMetadataManager.getDeletedTable()
.get(ozoneKey);
if (repeatedOmKeyInfo == null) {
repeatedOmKeyInfo = new RepeatedOmKeyInfo(partsUnusedList);
} else {
repeatedOmKeyInfo.addOmKeyInfo(omKeyInfo);
}
omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
ozoneKey, repeatedOmKeyInfo);
}
- ObjectEndpoint#delete <- HTTP DELETE
- ...
- S3MultipartUploadAbortRequest#validateAndUpdateCache()
- S3MultipartUploadAbortResponse#addToDBBatch() 重要なので全文転載
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
// Delete from openKey table and multipart info table.
omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
multipartKey);
omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
multipartKey);
// Move all the parts to delete table
TreeMap<Integer, PartKeyInfo > partKeyInfoMap =
omMultipartKeyInfo.getPartKeyInfoMap();
for (Map.Entry<Integer, PartKeyInfo > partKeyInfoEntry :
partKeyInfoMap.entrySet()) {
PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
OmKeyInfo currentKeyPartInfo =
OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
RepeatedOmKeyInfo repeatedOmKeyInfo =
omMetadataManager.getDeletedTable().get(partKeyInfo.getPartName());
repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(currentKeyPartInfo,
repeatedOmKeyInfo, omMultipartKeyInfo.getUpdateID(), isRatisEnabled);
omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
partKeyInfo.getPartName(), repeatedOmKeyInfo);
// update bucket usedBytes.
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
omBucketInfo.getBucketName()), omBucketInfo);
}
}