Created
March 15, 2017 08:14
-
-
Save s1rc0/5ee709ef4bff653fcb52f2bd8c3bced6 to your computer and use it in GitHub Desktop.
async upload python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cat async.py | |
import time, logging, os, asyncio, aiohttp | |
from azure.storage.blob import ContentSettings | |
from azure.storage.blob import BlockBlobService | |
block_blob_service = BlockBlobService(account_name='', | |
account_key='') | |
class Uploader: | |
def __init__(self, links, concurrency=2, verbose=True): | |
self.links = links | |
self.queue = asyncio.Queue() | |
self.concurrency = concurrency | |
logging.basicConfig(level='INFO') | |
self.log = logging.getLogger() | |
if not verbose: | |
self.log.disabled = True | |
@asyncio.coroutine | |
def upload(self, file): | |
yield from asyncio.sleep(0) | |
block_blob_service.create_blob_from_path( | |
'stat', | |
file, | |
file | |
) | |
#logging.info('File {} uploaded'.format(file)) | |
async def worker(self): | |
#logging.info('Starting worker') | |
while True: | |
link = await self.queue.get() | |
try: | |
#logging.info('PROCESSING {}'.format(link)) | |
await self.upload(link) | |
logging.info('REMAINED {}'.format(self.queue.qsize())) | |
except Exception: | |
logging.error('An error has occurred during uploading {}'. | |
format(link), exc_info=True) | |
finally: | |
self.queue.task_done() | |
async def run(self): | |
start = time.time() | |
print('Starting upload') | |
await asyncio.wait([self.queue.put(link) for link in self.links]) | |
tasks = [asyncio.ensure_future(self.worker()) | |
for _ in range(self.concurrency)] | |
await self.queue.join() | |
logging.info('Finishing...') | |
for task in tasks: | |
task.cancel() | |
end = time.time() | |
print('FINISHED AT {} secs'.format(end-start)) | |
============================================== | |
cat runner.py | |
import os | |
import asyncio | |
from async import Uploader | |
fi = open('files.txt', 'w') | |
fi.truncate() | |
for folder, subfolders, files in os.walk('data-static'): | |
for file in files: | |
filePath = os.path.join(folder, file) | |
fi.writelines(filePath + "\n") | |
fi.close() | |
def get_files(): | |
# Any custom logic that represents list with links | |
with open('files.txt') as file: | |
return (line.strip() for line in file.readlines()) | |
if __name__ == '__main__': | |
files = get_files() | |
uploader = Uploader(files, concurrency=8) | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(uploader.run()) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment