Created
August 23, 2017 23:29
-
-
Save sagar8192/bb98d0b479b3cf1afcf09785529471fd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/task_processing/plugins/mesos/retrying_executor.py b/task_processing/plugins/mesos/retrying_executor.py | |
index 689a47b..5fe443f 100644 | |
--- a/task_processing/plugins/mesos/retrying_executor.py | |
+++ b/task_processing/plugins/mesos/retrying_executor.py | |
@@ -1,5 +1,6 @@ | |
import logging | |
import time | |
+import uuid | |
from operator import sub | |
from threading import Lock | |
from threading import Thread | |
@@ -13,15 +14,19 @@ log = logging.getLogger(__name__) | |
class RetryingExecutor(TaskExecutor): | |
- def __init__(self, | |
- executor, | |
- retry_pred=lambda e: not e.success, | |
- retries=3): | |
+ def __init__( | |
+ self, | |
+ executor, | |
+ retry_pred=lambda e: not e.success, | |
+ retries=3 | |
+ ): | |
self.executor = executor | |
self.retries = retries | |
self.retry_pred = retry_pred | |
self.task_retries = m() | |
+ # Key=new_task_id, Value=old_task_id | |
+ self.task_id_mappings = m() | |
self.task_retries_lock = Lock() | |
self.src_queue = executor.get_event_queue() | |
@@ -46,14 +51,20 @@ class RetryingExecutor(TaskExecutor): | |
if current_retries <= 0: | |
return False | |
+ current_attempt = 1 + self.retries - current_retries | |
+ | |
log.info( | |
'Retrying task {}, {} of {}, fail event: {}'.format( | |
- event.task_config.name, 1 + self.retries - current_retries, | |
+ event.task_config.name, current_attempt, | |
self.retries, event.raw | |
) | |
) | |
- self.run(event.task_config) | |
+ self.run( | |
+ task_config=event.task_config, | |
+ attempt=current_attempt | |
+ ) | |
+ | |
with self.task_retries_lock: | |
self.task_retries = self.task_retries.update_with( | |
sub, {event.task_id: 1} | |
@@ -64,6 +75,9 @@ class RetryingExecutor(TaskExecutor): | |
while True: | |
while not self.src_queue.empty(): | |
e = self.src_queue.get() | |
+ # This syntax is probably wrong. | |
+ # Restore the original task_id | |
+ e.task_id = self.task_id_mappings[e.task_id] | |
if e.kind != 'task': | |
self.dest_queue.put(e) | |
@@ -80,18 +94,24 @@ class RetryingExecutor(TaskExecutor): | |
self.task_retries = \ | |
self.task_retries.remove(e.task_id) | |
- self.dest_queue.put(e) | |
+ # I feel that we should only propogate events if we have | |
+ # exhausted all the retries. | |
+ self.dest_queue.put(e) | |
if self.stopping: | |
return | |
time.sleep(1) | |
- def run(self, task_config): | |
- if task_config.task_id not in self.task_retries: | |
- with self.task_retries_lock: | |
- self.task_retries = self.task_retries.set( | |
- task_config.task_id, self.retries) | |
+ def run(self, task_config, attempt=0): | |
+ new_task_id = self.generate_new_task_id(task_config.task_id) | |
+ with self.task_retries_lock: | |
+ self.task_retries = self.task_retries.set( | |
+ task_config.task_id, | |
+ attempt | |
+ ) | |
+ # update the task_id | |
+ task_config.task_id = new_task_id | |
self.executor.run(task_config) | |
def kill(self, task_id): | |
@@ -108,3 +128,12 @@ class RetryingExecutor(TaskExecutor): | |
def get_event_queue(self): | |
return self.dest_queue | |
+ | |
+ def generate_new_task_id(self, task_id): | |
+ new_task_id = uuid.uuid4() | |
+ with self.task_retries_lock: | |
+ self.task_id_mappings = self.task_retries.set( | |
+ new_task_id, | |
+ task_id | |
+ ) | |
+ return new_task_id |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment