-
-
Save hnuzhoulin/b77f182ea0b2df6c408c97f181e51bba to your computer and use it in GitHub Desktop.
using python archive thread parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#coding: utf-8 | |
import os | |
import sys | |
import logging | |
from threading import Thread | |
import Queue | |
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) | |
class Worker(Thread): | |
def __init__(self, do_work, task_queue, workid): | |
Thread.__init__(self) | |
self.do_work = do_work | |
self.task_queue = task_queue | |
self.workid = workid | |
def run(self): | |
while True: | |
try: | |
task = self.task_queue.get() | |
if task is None: | |
break | |
self.do_work(task, self.workid) | |
except Exception as e: | |
logging.warning('Failed to execute task: %s' % e) | |
finally: | |
self.task_queue.task_done() | |
class ThreadPool(object): | |
def __init__(self, do_work, nworker=20): | |
self.do_work = do_work | |
self.nworker = nworker | |
self.workid = 0 | |
self.task_queue = Queue.Queue() | |
def start(self): | |
for i in xrange(self.nworker): | |
self.workid = i | |
Worker(self.do_work, self.task_queue, self.workid).start() | |
def put_task(self, task): | |
self.task_queue.put(task) | |
def join(self): | |
self.task_queue.join() | |
# notify all thread to stop | |
for i in xrange(self.nworker): | |
self.task_queue.put(None) | |
class Task(object): | |
def __init__(self, repo_id, repo_version, obj_id): | |
self.repo_id = repo_id | |
self.repo_version = repo_version | |
self.obj_id = obj_id | |
class ObjMigrateWorker(Thread): | |
def __init__(self, top_path, stype, dst_pool): | |
Thread.__init__(self) | |
self.top_path = top_path | |
self.stype = stype | |
self.dst_pool = dst_pool | |
self.thread_pool = ThreadPool(self.do_work) | |
def run(self): | |
logging.info('Start to migrate [%s] object' % self.stype) | |
self.thread_pool.start() | |
self.migrate() | |
self.thread_pool.join() | |
logging.info('Complete migrate [%s] object' % self.stype) | |
def do_work(self, task, workid): | |
print "workerID:",workid,"--task is:",os.path.join(self.stype,task.repo_id,task.obj_id) | |
def migrate(self): | |
#put work to queue | |
top_path = self.top_path | |
for repo_id in os.listdir(top_path): | |
repo_path = os.path.join(top_path, repo_id) | |
for spath in os.listdir(repo_path): | |
obj_path = os.path.join(repo_path, spath) | |
for lpath in os.listdir(obj_path): | |
obj_id = spath + lpath | |
task = Task(repo_id, 1, obj_id) | |
self.thread_pool.put_task(task) | |
def main(): | |
top_path = sys.argv[1] | |
dtypes = {'commits':'commits', 'fs':'fs', 'blocks':'blocks'} | |
for stype in dtypes: | |
ObjMigrateWorker(os.path.join(top_path,stype), stype, dtypes[stype]).start() | |
if __name__ == '__main__': | |
if sys.argv[0] < 1: | |
print "Top storage absolute path is needed." | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment