Created
September 6, 2016 18:46
-
-
Save wido/4accce6623381f23079479f5ac06bae0 to your computer and use it in GitHub Desktop.
Multi threaded RBD copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Python script to multi-threaded copy a RBD image from one cluster to another | |
This script requires a configuration file and a RBD image to copy. | |
It will copy the RBD image from the source pool to destination pool as | |
specified in the configuration file. | |
It assumes the destination image already exists and is at least the size of the | |
source image. | |
Example config (rbd-export-import.conf): | |
[source] | |
pool = rbd | |
mon_host = 2001:db8::100 | |
id = admin | |
key = AQAI1s5XK0q0ChAA9FDEBluyt4E0OI26oGVsGQ== | |
[destination] | |
pool = rbd | |
mon_host = 2001:db8::200 | |
id = admin | |
key = AQCuASJXa27KDhAAg3Ww25Jn/ZLmlDcwhVLNXg== | |
Author: Wido den Hollander <[email protected]> | |
""" | |
import rbd | |
import rados | |
import argparse | |
import logging | |
import sys | |
import threading | |
import time | |
import Queue | |
from ConfigParser import ConfigParser | |
LOGGER = logging.getLogger() | |
# Read in 1GB segment sizes | |
SEGMENT_SIZE = 1073741824 | |
def rbd_copy_worker(i, q, source, dest, chunk_size): | |
while True: | |
chunk = q.get() | |
offset = chunk['offset'] | |
length = chunk['length'] | |
LOGGER.debug('Worker %d offset: %d length: %d', i, offset, length) | |
done = 0 | |
while done < length: | |
data = source.read(offset, chunk_size) | |
read = len(data) | |
dest.write(data, offset) | |
offset += read | |
done += read | |
q.task_done() | |
def create_rados_connection(mon_host, rados_id, key): | |
conn = rados.Rados(rados_id=rados_id) | |
conn.conf_set('mon_host', mon_host) | |
conn.conf_set('key', key) | |
conn.connect() | |
return conn | |
def main(config, img_source, img_dest, workers, chunk_size): | |
source = create_rados_connection(config.get('source', 'mon_host'), | |
config.get('source', 'id'), | |
config.get('source', 'key')) | |
dest = create_rados_connection(config.get('destination', 'mon_host'), | |
config.get('destination', 'id'), | |
config.get('destination', 'key')) | |
LOGGER.info('Spawning %d workers to copy %s to %s', workers, img_source, | |
img_dest) | |
LOGGER.debug('Creating RADOS IoCTX') | |
source_io = source.open_ioctx(config.get('source', 'pool')) | |
dest_io = dest.open_ioctx(config.get('destination', 'pool')) | |
LOGGER.debug('Opening RBD images') | |
source_rbd = rbd.Image(source_io, img_source, read_only=True) | |
dest_rbd = rbd.Image(dest_io, img_dest) | |
size = source_rbd.size() | |
LOGGER.info('Size of source image is %d', size) | |
if dest_rbd.size() < source_rbd.size(): | |
raise Exception('Destination image is small than source') | |
LOGGER.info('Will use %d byte chunks to read and write', chunk_size) | |
LOGGER.info('Splitting up into %d sized segments', SEGMENT_SIZE) | |
threads = [] | |
chunk_queue = Queue.Queue() | |
for i in range(workers): | |
worker = threading.Thread(target=rbd_copy_worker, | |
args=(i, chunk_queue, source_rbd, dest_rbd, | |
chunk_size,)) | |
threads.append(worker) | |
worker.daemon = True | |
worker.start() | |
offset = 0 | |
while offset < size: | |
if offset + SEGMENT_SIZE > size: | |
length = size - (offset + SEGMENT_SIZE) | |
else: | |
length = SEGMENT_SIZE | |
chunk = {'offset': offset, 'length': length} | |
chunk_queue.put(chunk) | |
offset += length | |
LOGGER.info('Waiting for Queue to be empty') | |
chunk_queue.join() | |
LOGGER.debug('Closing RBD images') | |
source_rbd.close() | |
dest_rbd.close() | |
LOGGER.debug('Closing RADOS IoCTX') | |
source_io.close() | |
dest_io.close() | |
LOGGER.debug('Closing RADOS connections') | |
source.shutdown() | |
dest.shutdown() | |
if __name__ == "__main__": | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
parser = argparse.ArgumentParser(description='Ceph RBD import/export') | |
parser.add_argument('--config', action='store', dest='conffile', | |
default='rbd-export-import.conf', | |
help='Configuration file') | |
parser.add_argument('--source-image', action='store', dest='img_source', | |
help='The source image') | |
parser.add_argument('--dest-image', action='store', dest='img_dest', | |
help='The destination image') | |
parser.add_argument('--workers', action='store', dest='workers', type='int', | |
help='Number of worker threads to run', default=10) | |
parser.add_argument('--chunk-size', action='store', dest='chunk_size', | |
type='int', default=262144) | |
args = parser.parse_args() | |
conf = ConfigParser() | |
conf.readfp(open(args.conffile)) | |
main(conf, args.img_source, args.img_dest, args.workers, args.chunk_size) |
Your code is well worth learning, but it runs very slowly. When rbd export and import needs 200s, your method needs 700s. How should I adjust it?——from CEPH enthusiasts
I have never tested the performance. This was a PoC for me. Maybe you can tune the SEGMENT_SIZE
您的代码非常值得学习,但运行速度非常慢。当rbd导出导入需要200s时,你的方法需要700s。应该如何调整?——来自CEPH爱好者
我从未测试过性能。这对我来说是一个 PoC。也许你可以调整 SEGMENT_SIZE
Thank you very much for this reply. I will continue to study export and import.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Your code is well worth learning, but it runs very slowly. When rbd export and import needs 200s, your method needs 700s. How should I adjust it?——from CEPH enthusiasts