Created
February 2, 2021 23:00
-
-
Save paruljain/3324e46b6735a0e404fe3cd9e213ea79 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aws_request_signer import AwsRequestSigner, UNSIGNED_PAYLOAD | |
import http.client | |
import queue | |
from os import scandir | |
import os | |
from urllib.parse import quote | |
from os.path import join | |
import time | |
from threading import Thread | |
import threading | |
MAX_THREADS = 100 | |
AWS_REGION = 'us-east-1' | |
AWS_ACCESS_KEY_ID = '*****************' | |
AWS_SECRET_ACCESS_KEY = '*****************************' | |
AWS_S3_HOST = '1.1.1.1' | |
AWS_S3_PORT = 20000 | |
BUCKET = 'test' | |
FOLDER = 'c:\\python' | |
fileQ = queue.Queue(1000) | |
fileCount = 0 | |
totalSize = 0 | |
filesScanned = 0 | |
threadLock = threading.Lock() | |
# Put files to copy to s3 on the queue | |
# path is the root path from where to recursively list files to copy | |
def scanDir(path): | |
global filesScanned | |
try: | |
for file in scandir(path): | |
fullPath = join(path, file.name) | |
if file.is_file(): | |
fileQ.put(fullPath, True) | |
filesScanned += 1 | |
elif file.is_dir(): | |
scanDir(fullPath) | |
except: | |
pass # Ignore folder access permission errors | |
def upload(): | |
global fileCount | |
global totalSize | |
requestSigner = AwsRequestSigner(AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, 's3') | |
conn = http.client.HTTPConnection(host=AWS_S3_HOST, port=AWS_S3_PORT, blocksize=1048576) | |
while True: | |
try: | |
f = fileQ.get(True, 5) | |
except: | |
conn.close() | |
break | |
k = f.replace('c:\\', '/').replace('\\', '/') | |
URL = 'http://' + AWS_S3_HOST + ':' + str(AWS_S3_PORT) + '/' + BUCKET + quote(k) | |
# The headers we'll provide and want to sign. | |
try: | |
fileSize = os.stat(f).st_size | |
except: | |
continue | |
headers = {"Content-Type": "application/octet-stream", "Content-Length": str(fileSize)} | |
# Add the authentication headers. | |
headers.update(requestSigner.sign_with_headers("PUT", URL, headers, content_hash=UNSIGNED_PAYLOAD)) | |
with open(f, 'rb') as fh: | |
conn.request(method='PUT', url='/' + BUCKET + quote(k), headers=headers, body=fh) | |
res = conn.getresponse() | |
data = res.read() | |
if res.status < 200 or res.status > 299: | |
print('Error connecting to s3:', res.status, data) | |
conn.close() | |
break | |
fileQ.task_done() | |
with threadLock: | |
fileCount += 1 | |
totalSize += fileSize | |
startTime = time.time() | |
# Reports status of the copy job | |
def monitor(): | |
while runMonitor: | |
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded') | |
time.sleep(5) | |
copyOps = [] | |
for i in range(MAX_THREADS): | |
t = Thread(target=upload) | |
copyOps.append(t) | |
t.start() | |
print('Starting ...') | |
# Start the monitoring thread | |
# Because this thread is started as daemon the main thread will not wait for it | |
# to complete | |
runMonitor = True | |
Thread(target=monitor).start() | |
scanDir(FOLDER) | |
# Wait for all copy jobs to finish | |
for copyOp in copyOps: | |
copyOp.join() | |
runMonitor = False | |
timeTakenSeconds = round(time.time() - startTime, 2) | |
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded;', timeTakenSeconds, 'seconds') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment