Created
February 3, 2021 02:45
-
-
Save paruljain/af74109886f35be0c67ebb0edf893b4d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aws_request_signer import AwsRequestSigner, UNSIGNED_PAYLOAD | |
import http.client | |
import queue | |
from os import scandir | |
import os | |
from urllib.parse import quote | |
from os.path import join | |
import time | |
from threading import Thread | |
MAX_THREADS = 100 | |
BLOCKSIZE = 1048576 | |
AWS_REGION = 'us-east-1' | |
AWS_ACCESS_KEY_ID = '**************' | |
AWS_SECRET_ACCESS_KEY = '*****************************' | |
AWS_S3_HOST = '1.1.1.1' | |
AWS_S3_PORT = 20000 | |
BUCKET = 'test' | |
FOLDER = 'c:\\python' | |
fileQ = queue.Queue(10000) | |
fileCount = 0 | |
totalSize = 0 | |
filesScanned = 0 | |
shutdown = False | |
class FileReader: | |
def __init__(self, filePath, blocksize=1048576): | |
self.fd = open(filePath, 'rb') | |
self.blocksize = blocksize | |
self.chunkQ = queue.Queue(2) | |
def run(self): | |
self.runWorker = True | |
Thread(target=self.getFileChunk).start() | |
def getFileChunk(self): | |
while self.runWorker: | |
chunk = self.fd.read(self.blocksize) | |
if not chunk: | |
self.fd.close() | |
self.chunkQ.put('') | |
break | |
self.chunkQ.put(chunk) | |
def read(self, size): | |
return self.chunkQ.get() | |
def close(self): | |
self.runWorker = False | |
self.fd.close() | |
# Put files to copy to s3 on the queue | |
# path is the root path from where to recursively list files to copy | |
def scanDir(path): | |
global filesScanned | |
if shutdown: | |
return | |
try: | |
for file in scandir(path): | |
if shutdown: | |
return | |
fullPath = join(path, file.name) | |
if file.is_file(): | |
fileQ.put(fullPath, True) | |
filesScanned += 1 | |
elif file.is_dir(): | |
scanDir(fullPath) | |
except: | |
pass # Ignore folder access permission errors | |
def upload(): | |
global fileCount | |
global totalSize | |
requestSigner = AwsRequestSigner(AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, 's3') | |
conn = http.client.HTTPConnection(host=AWS_S3_HOST, port=AWS_S3_PORT, blocksize=BLOCKSIZE) | |
while True: | |
if shutdown: | |
break | |
try: | |
f = fileQ.get(True, 5) | |
except Exception: | |
conn.close() | |
break | |
k = f.replace('c:\\', '/').replace('\\', '/') | |
URL = 'http://' + AWS_S3_HOST + ':' + str(AWS_S3_PORT) + '/' + BUCKET + quote(k) | |
# The headers we'll provide and want to sign. | |
try: | |
fileSize = os.stat(f).st_size | |
except Exception: | |
continue | |
headers = {"Content-Type": "application/octet-stream", "Content-Length": str(fileSize)} | |
# Add the authentication headers. | |
headers.update(requestSigner.sign_with_headers("PUT", URL, headers, content_hash=UNSIGNED_PAYLOAD)) | |
try: | |
fh = FileReader(f, BLOCKSIZE) | |
except Exception: | |
continue | |
fh.run() | |
try: | |
conn.request(method='PUT', url='/' + BUCKET + quote(k), headers=headers, body=fh) | |
res = conn.getresponse() | |
data = res.read() | |
except Exception as err: | |
print("s3 connection error:", err) | |
conn.close() | |
fh.close() | |
break | |
if res.status < 200 or res.status > 299: | |
print('Error connecting to s3:', res.status, data) | |
conn.close() | |
fh.close() | |
break | |
fileQ.task_done() | |
fileCount += 1 | |
totalSize += fileSize | |
startTime = time.time() | |
# Reports status of the copy job | |
def monitor(): | |
while runMonitor: | |
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded') | |
time.sleep(5) | |
copyOps = [] | |
for _ in range(MAX_THREADS): | |
t = Thread(target=upload) | |
copyOps.append(t) | |
t.start() | |
print('Starting ...') | |
# Start the monitoring thread | |
runMonitor = True | |
Thread(target=monitor).start() | |
scanDir(FOLDER) | |
# Wait for all copy jobs to finish | |
for copyOp in copyOps: | |
copyOp.join() | |
runMonitor = False | |
timeTakenSeconds = round(time.time() - startTime, 2) | |
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded;', timeTakenSeconds, 'seconds') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment