-
-
Save orklann/03120f52ec603feec13ed369737bf17a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
1. Store the image and create an instance of ImagefileProcessHistory with status NOT_STARTED | |
ImageFileProcessHistory | |
- local_path | |
- file_name | |
- status # enum: NOT_STARTED, STARTED, SUCCEEDED, FAILED | |
- bytes_processed | |
- bytes_total | |
""" | |
#2. Call a celery task with the id of just created instance which will be responsible for actual image processing. Reply http response with the instance id `image_process_history_id` | |
@shared_task | |
process_image(image_process_history_id): | |
mngr = ImageFileProcessingManager.load(image_process_history_id) | |
# | |
current_status = mngr.process() | |
return current_status, image_process_history_id | |
# 3. Then you can periodically poll your server via json rest service to get the current progress of the history object. ProgressTracker will upgrade it depending on PROGRESS_GRANULARITY (say each 10000 bytes). See below: | |
# ... Django rest service | |
# 4. You need a specified manager ImageFileProcessingManager and processor ImageProcessor itself which will orchestrate the actual processing: | |
class ImageFileProcessingManager(object): | |
history_obj = None | |
trackers = None | |
@classmethod | |
def load(cls, history_id): | |
history_obj = ImageFileProcessHistory.objects().get_obj(history_id) | |
if history_obj is None: | |
raise DoesNotExist("...") | |
return cls(history_id) | |
def __init__(self, history_obj): | |
self.history_obj = history_obj | |
def register_progress_tracker(): | |
self.trackers.append(ProgressTracker(self.history_obj)) | |
def process(self): | |
# orchestrate your processing here | |
# on your use case | |
processor = ImageProcessor() | |
processor.register_progress_tracker(ProgressTracker(self.history_obj)) | |
try: | |
self.set_status(statuses.STARTED) | |
return processor.process(self.history_obj.get_full_path()) | |
except YouRCustomException: | |
self.set_status(statuses.FAILED) | |
finally: | |
self.set_status(statuses.COMPLETED) | |
return self.history_obj.status | |
class ImageProcessor(object): | |
def __init__(self): | |
self.trackers = [] | |
def register_progress_tracker(self, tracker): | |
self.trackers.append(tracker) | |
def process(self, file_path): | |
# load file | |
# do something computationally intensive | |
# update your progress tracker regularly - could be upgraded each 1000 bytes or depending on your scenario | |
# return prog | |
# 5. In case you need some clue about ProgressTracker | |
class ProgressTracker(object): | |
def __init__(self, history_obj): | |
""" | |
Creates the progress tracker | |
:param history_obj: Import history with the progress that should be updated | |
""" | |
self.history_obj = history_obj | |
self.counter = 0 | |
def tell(self, progress): | |
""" | |
Updates the import history progress | |
:param progress: Progress in bytes | |
:type progress: int | |
""" | |
self.counter += 1 | |
if self.counter % settings.PROGRESS_GRANULARITY == 0: | |
self.history_obj.bytes_imported = progress | |
self.history_obj.save() | |
def finished(self): | |
self.history_obj.bytes_imported = self.history_obj.bytes_total | |
self.history_obj.save() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment