Created
August 20, 2020 16:09
-
-
Save jazzl0ver/4d558b3739c1b11439c085ed7b638a7d to your computer and use it in GitHub Desktop.
Replicate objects within or between regions in Oracle Cloud Object Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# The script requires that the OCI CLI be installed | |
# | |
# | |
# Edit these values to configure the script | |
# | |
oci_cli_profile_name = 'DEFAULT' | |
source_namespace = 'ocitsammut' | |
source_region = 'us-ashburn-1' | |
source_bucket = 'repl_source_small' | |
# Set source_prefix to None to copy all objects in source bucket | |
#source_prefix = 'foo/' | |
source_prefix = None | |
# Set source_prefix_exclude to exclude source objects by prefix | |
#source_prefix_exclude = 'bar/zaz/' | |
source_prefix_exclude = None | |
destination_namespace = 'ocitsammut' | |
destination_region = 'us-ashburn-1' | |
destination_bucket = 'repl_destination' | |
# Set this to lambda to mutate the destination object name, or None to disable | |
#destination_object_name_mutation = lambda x: "%s%s" % ('2/', x[2:]) | |
destination_object_name_mutation = None | |
# | |
# Should not need to edit below here | |
# | |
import cPickle as pickle | |
import threading | |
import time | |
import Queue | |
import oci | |
request_worker_count = 50 | |
status_worker_count = 50 | |
status_interval = 60 | |
state_file = 'copy_status' | |
base_retry_timeout = 2 | |
max_retry_timeout = 16**2 | |
# | |
# Should REALLY not need to edit below here | |
# | |
data = {} | |
data_lock = threading.Lock() | |
known_q = Queue.Queue() | |
update_q = Queue.Queue() | |
config = oci.config.from_file(profile_name=oci_cli_profile_name) | |
object_storage_client = oci.object_storage.ObjectStorageClient(config) | |
def copy_request_worker(): | |
while True: | |
object_ = known_q.get() | |
state = get_state_for_object(object_) | |
interval_exp = base_retry_timeout | |
while True: | |
try: | |
response = copy_object(source_namespace, source_region, source_bucket, object_, | |
destination_namespace, destination_region, destination_bucket, object_) | |
break | |
except Exception: | |
if interval_exp > max_retry_timeout: | |
raise | |
print(" Received %s from API for object %s, will wait %s seconds before retrying." % ( | |
response.status, object_, interval_exp)) | |
time.sleep(interval_exp) | |
interval_exp **= 2 | |
continue | |
state['work-request-id'] = response.headers.get('opc-work-request-id') | |
state['status'] = 'REQUESTED' | |
set_state_for_object(object_, state, persist=False) | |
known_q.task_done() | |
def work_request_status_worker(): | |
while True: | |
object_ = update_q.get() | |
state = get_state_for_object(object_) | |
interval_exp = base_retry_timeout | |
while True: | |
try: | |
response = object_storage_client.get_work_request(state['work-request-id']) | |
state['status'] = response.data.status | |
break | |
except Exception: | |
if interval_exp > max_retry_timeout: | |
raise | |
print(" Received %s from API for work request %s, will wait %s seconds before retrying." % ( | |
response.status, state['work-request-id'], interval_exp)) | |
time.sleep(interval_exp) | |
interval_exp **= 2 | |
continue | |
set_state_for_object(object_, state, persist=False) | |
update_q.task_done() | |
def add_objects_to_queue(ns, bucket): | |
global known_q | |
count = 0 | |
next_starts_with = None | |
while True: | |
response = object_storage_client.list_objects(ns, bucket, | |
start=next_starts_with) | |
next_starts_with = response.data.next_start_with | |
for object_ in response.data.objects: | |
if source_prefix and not object_.name.startswith(source_prefix): | |
continue | |
if source_prefix_exclude and object_.name.startswith(source_prefix_exclude): | |
continue | |
set_state_for_object(object_.name, {'status': 'KNOWN'}, persist=False) | |
known_q.put(object_.name) | |
count += 1 | |
if not next_starts_with: | |
break | |
save_all_state() | |
return count | |
def set_state_for_object(object_, state, persist=True): | |
global data | |
data_lock.acquire() | |
data[object_] = state | |
if persist: | |
with open(state_file, 'wb') as sf: | |
pickle.dump(data, sf, protocol=pickle.HIGHEST_PROTOCOL) | |
data_lock.release() | |
return data[object_] | |
def save_all_state(): | |
data_lock.acquire() | |
with open(state_file, 'wb') as sf: | |
pickle.dump(data, sf, protocol=pickle.HIGHEST_PROTOCOL) | |
data_lock.release() | |
def get_state_for_object(object_): | |
return data[object_] | |
def get_work_request_count_by_status(status): | |
return len([x for x in data.keys() if data[x].get('status') == status]) | |
def copy_object(src_ns, src_r, src_b, src_o, dst_ns, dst_r, dst_b, dst_o): | |
copy_request = oci.object_storage.models.copy_object_details.CopyObjectDetails() | |
# src_r not currently used here | |
copy_request.source_object_name = src_o | |
copy_request.destination_namespace = dst_ns | |
copy_request.destination_region = dst_r | |
copy_request.destination_bucket = dst_b | |
if destination_object_name_mutation: | |
copy_request.destination_object_name = destination_object_name_mutation(dst_o) | |
else: | |
copy_request.destination_object_name = dst_o | |
return object_storage_client.copy_object(src_ns, src_b, copy_request) | |
def update_all_work_requests_status(ns, bucket): | |
for object_ in data.keys(): | |
state = get_state_for_object(object_) | |
if state['status'] not in ('KNOWN', 'COMPLETED', 'FAILED', 'CANCELED'): | |
update_q.put(object_) | |
update_q.join() | |
save_all_state() | |
def main(): | |
print("Creating %s copy request workers." % (request_worker_count)) | |
for i in range(request_worker_count): | |
worker = threading.Thread(target=copy_request_worker) | |
worker.daemon = True | |
worker.start() | |
print("Creating %s status workers." % (status_worker_count)) | |
for i in range(status_worker_count): | |
worker = threading.Thread(target=work_request_status_worker) | |
worker.daemon = True | |
worker.start() | |
print("Getting list of objects from source bucket (%s). Copies will start immediately." % (source_bucket)) | |
count = add_objects_to_queue(source_namespace, source_bucket) | |
print("Enqueued %s objects to be copied" % (count)) | |
while True: | |
print("Waiting %s seconds before checking status." % (status_interval)) | |
time.sleep(status_interval) | |
if get_work_request_count_by_status('KNOWN') > 0 or \ | |
get_work_request_count_by_status('REQUESTED') > 0: | |
print("Determining copy status") | |
update_all_work_requests_status(source_namespace, source_bucket) | |
data_lock.acquire() | |
print(" KNOWN: %s, REQUESTED: %s, COMPLETED: %s, FAILED: %s, CANCELED: %s" % ( | |
get_work_request_count_by_status('KNOWN'), | |
get_work_request_count_by_status('REQUESTED'), | |
get_work_request_count_by_status('COMPLETED'), | |
get_work_request_count_by_status('FAILED'), | |
get_work_request_count_by_status('CANCELED'))) | |
if get_work_request_count_by_status('KNOWN') == 0 and \ | |
get_work_request_count_by_status('REQUESTED') == 0: | |
data_lock.release() | |
break | |
else: | |
data_lock.release() | |
known_q.join() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can be started within OCI Cloud Shell