Created
August 1, 2022 13:33
-
-
Save mkrizek/68aba4e74f74ff5fa34dba41f042af11 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py | |
index 51799cc7a4e..c9712500249 100644 | |
--- a/lib/ansible/executor/play_iterator.py | |
+++ b/lib/ansible/executor/play_iterator.py | |
@@ -21,7 +21,6 @@ __metaclass__ = type | |
import fnmatch | |
-from collections import deque | |
from enum import IntEnum, IntFlag | |
from ansible import constants as C | |
@@ -57,14 +56,17 @@ class FailedStates(IntFlag): | |
class HostState: | |
- def __init__(self, blocks): | |
+ def __init__(self, blocks, handlers=None): | |
self._blocks = blocks[:] | |
- self._notified_handlers = deque() | |
+ if handlers is None: | |
+ handlers = [] | |
+ self._handlers = handlers[:] | |
self.cur_block = 0 | |
self.cur_regular_task = 0 | |
self.cur_rescue_task = 0 | |
self.cur_always_task = 0 | |
+ self.cur_handler_task = 0 | |
self.run_state = IteratingStates.SETUP | |
self.fail_state = FailedStates.NONE | |
self.pre_flushing_run_state = None | |
@@ -76,14 +78,13 @@ class HostState: | |
self.did_rescue = False | |
self.did_start_at_task = False | |
- self._handlers_sorted = False | |
+ self.update_handlers = True | |
def __repr__(self): | |
- return "HostState(%r, %r)" % (self._blocks, self._notified_handlers) | |
+ return "HostState(%r, %r)" % (self._blocks) | |
def __str__(self): | |
return ("HOST STATE: block=%d, task=%d, rescue=%d, always=%d, run_state=%s, fail_state=%s, pre_flushing_run_state=%s, pending_setup=%s, " | |
- "handlers_sorted=%s, " | |
"tasks child state? (%s), rescue child state? (%s), always child state? (%s), handlers child state? (%s), " | |
"did rescue? %s, did start at task? %s" % ( | |
self.cur_block, | |
@@ -94,7 +95,6 @@ class HostState: | |
self.fail_state, | |
self.pre_flushing_run_state, | |
self.pending_setup, | |
- self._handlers_sorted, | |
self.tasks_child_state, | |
self.rescue_child_state, | |
self.always_child_state, | |
@@ -107,43 +107,25 @@ class HostState: | |
if not isinstance(other, HostState): | |
return False | |
- for attr in ('_blocks', '_notified_handlers', | |
+ for attr in ('_blocks', | |
'cur_block', 'cur_regular_task', 'cur_rescue_task', 'cur_always_task', | |
'run_state', 'fail_state', 'pre_flushing_run_state', 'pending_setup', | |
- 'tasks_child_state', 'rescue_child_state', 'always_child_state', 'handlers_child_state', | |
- '_handlers_sorted'): | |
+ 'tasks_child_state', 'rescue_child_state', 'always_child_state', 'handlers_child_state'): | |
if getattr(self, attr) != getattr(other, attr): | |
return False | |
return True | |
- @property | |
- def handlers(self): | |
- return self._notified_handlers | |
- | |
def get_current_block(self): | |
return self._blocks[self.cur_block] | |
- def pop_current_handler(self): | |
- return self._notified_handlers.popleft() | |
- | |
- def notify_handler(self, handler): | |
- if handler not in self._notified_handlers: | |
- self._notified_handlers.append(handler) | |
- return True | |
- return False | |
- | |
- def notify_include_handler(self, handler_blocks): | |
- # deque.extendleft reverses order | |
- self._notified_handlers.extendleft(reversed(handler_blocks)) | |
- | |
def copy(self): | |
- new_state = HostState(self._blocks) | |
- new_state._notified_handlers = self._notified_handlers.copy() | |
+ new_state = HostState(self._blocks, self._handlers) | |
new_state.cur_block = self.cur_block | |
new_state.cur_regular_task = self.cur_regular_task | |
new_state.cur_rescue_task = self.cur_rescue_task | |
new_state.cur_always_task = self.cur_always_task | |
+ new_state.cur_handler_task = self.cur_handler_task | |
new_state.run_state = self.run_state | |
new_state.fail_state = self.fail_state | |
new_state.pre_flushing_run_state = self.pre_flushing_run_state | |
@@ -158,7 +140,7 @@ class HostState: | |
new_state.always_child_state = self.always_child_state.copy() | |
if self.handlers_child_state is not None: | |
new_state.handlers_child_state = self.handlers_child_state.copy() | |
- new_state._handlers_sorted = self._handlers_sorted | |
+ new_state.update_handlers = self.update_handlers | |
return new_state | |
@@ -205,6 +187,7 @@ class PlayIterator: | |
if new_block.has_tasks(): | |
self._blocks.append(new_block) | |
+ | |
self._host_states = {} | |
start_at_matched = False | |
batch = inventory.get_hosts(self._play.hosts, order=self._play.order) | |
@@ -440,12 +423,9 @@ class PlayIterator: | |
state.cur_always_task += 1 | |
elif state.run_state == IteratingStates.HANDLERS: | |
- if not state._handlers_sorted: | |
- # handlers are executed in the order they are defined, not in the order notified | |
- if state.handlers: | |
- all_handlers = [h for b in self._play.handlers for h in b.block] | |
- state._notified_handlers = deque((h for h in all_handlers if h in state.handlers)) | |
- state._handlers_sorted = True | |
+ if state.update_handlers: | |
+ state._handlers = [h for b in self._play.handlers for h in b.block] | |
+ state.update_handlers = False | |
if state.handlers_child_state: | |
state.handlers_child_state, task = self._get_next_task_from_state(state.handlers_child_state, host=host) | |
@@ -460,13 +440,22 @@ class PlayIterator: | |
if (state.fail_state != FailedStates.NONE and | |
not self._play.force_handlers and | |
state.pre_flushing_run_state not in (IteratingStates.RESCUE, IteratingStates.ALWAYS)): | |
- state._handlers_sorted = False | |
state.run_state = IteratingStates.COMPLETE | |
- elif len(state.handlers) == 0: | |
- state._handlers_sorted = False | |
- state.run_state = state.pre_flushing_run_state | |
else: | |
- task = state.pop_current_handler() | |
+ while True: | |
+ try: | |
+ task = state._handlers[state.cur_handler_task] | |
+ except IndexError: | |
+ task = None | |
+ state.run_state = state.pre_flushing_run_state | |
+ state.cur_handler_task = 0 | |
+ state.update_handlers = True | |
+ break | |
+ else: | |
+ if task.is_host_notified(host): | |
+ break | |
+ state.cur_handler_task += 1 | |
+ | |
if isinstance(task, Block): | |
# TODO allow full blocks with rescue/always for handlers | |
restricted_block = task.copy(exclude_parent=True, exclude_tasks=True) | |
@@ -498,7 +487,7 @@ class PlayIterator: | |
state.run_state = IteratingStates.RESCUE | |
elif state._blocks[state.cur_block].always: | |
state.run_state = IteratingStates.ALWAYS | |
- elif state.handlers and self._play.force_handlers: | |
+ elif self._play.force_handlers: | |
state.run_state = IteratingStates.HANDLERS | |
else: | |
state.run_state = IteratingStates.COMPLETE | |
@@ -509,7 +498,7 @@ class PlayIterator: | |
state.fail_state |= FailedStates.RESCUE | |
if state._blocks[state.cur_block].always: | |
state.run_state = IteratingStates.ALWAYS | |
- elif state.handlers and self._play.force_handlers: | |
+ elif self._play.force_handlers: | |
state.run_state = IteratingStates.HANDLERS | |
else: | |
state.run_state = IteratingStates.COMPLETE | |
@@ -518,7 +507,7 @@ class PlayIterator: | |
state.always_child_state = self._set_failed_state(state.always_child_state) | |
else: | |
state.fail_state |= FailedStates.ALWAYS | |
- if state.handlers and self._play.force_handlers: | |
+ if self._play.force_handlers: | |
state.run_state = IteratingStates.HANDLERS | |
else: | |
state.run_state = IteratingStates.COMPLETE | |
@@ -632,7 +621,7 @@ class PlayIterator: | |
if state.handlers_child_state: | |
state.handlers_child_state = self._insert_tasks_into_state(state.handlers_child_state, task_list) | |
else: | |
- state.notify_include_handler(task_list) | |
+ state._handlers[state.cur_handler_task+1:state.cur_handler_task+1] = [h for b in task_list for h in b.block] | |
return state | |
@@ -660,6 +649,3 @@ class PlayIterator: | |
if not isinstance(fail_state, FailedStates): | |
raise AnsibleAssertionError('Expected fail_state to be a FailedStates but was %s' % (type(fail_state))) | |
self._host_states[hostname].fail_state = fail_state | |
- | |
- def notify_handler(self, host, handler): | |
- return self._host_states[host.name].notify_handler(handler) | |
diff --git a/lib/ansible/playbook/handler.py b/lib/ansible/playbook/handler.py | |
index a68964215cc..9ad8c8a88c9 100644 | |
--- a/lib/ansible/playbook/handler.py | |
+++ b/lib/ansible/playbook/handler.py | |
@@ -28,11 +28,8 @@ class Handler(Task): | |
listen = FieldAttribute(isa='list', default=list, listof=string_types, static=True) | |
- obj_cnt = 0 | |
- | |
def __init__(self, block=None, role=None, task_include=None): | |
- self.lockstep_id = Handler.obj_cnt | |
- Handler.obj_cnt += 1 | |
+ self.notified_hosts = [] | |
self.cached_name = False | |
@@ -47,6 +44,15 @@ class Handler(Task): | |
t = Handler(block=block, role=role, task_include=task_include) | |
return t.load_data(data, variable_manager=variable_manager, loader=loader) | |
+ def notify_host(self, host): | |
+ if not self.is_host_notified(host): | |
+ self.notified_hosts.append(host) | |
+ return True | |
+ return False | |
+ | |
+ def is_host_notified(self, host): | |
+ return host in self.notified_hosts | |
+ | |
def serialize(self): | |
result = super(Handler, self).serialize() | |
result['is_handler'] = True | |
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py | |
index 903987ec008..a141774252b 100644 | |
--- a/lib/ansible/plugins/strategy/__init__.py | |
+++ b/lib/ansible/plugins/strategy/__init__.py | |
@@ -628,7 +628,7 @@ class StrategyBase: | |
target_handler = search_handler_blocks_by_name(handler_name, iterator._play.handlers) | |
if target_handler is not None: | |
found = True | |
- if iterator.notify_handler(original_host, target_handler): | |
+ if target_handler.notify_host(original_host): | |
self._tqm.send_callback('v2_playbook_on_notify', target_handler, original_host) | |
for listening_handler_block in iterator._play.handlers: | |
@@ -645,7 +645,7 @@ class StrategyBase: | |
else: | |
found = True | |
- if iterator.notify_handler(original_host, listening_handler): | |
+ if listening_handler.notify_host(original_host): | |
self._tqm.send_callback('v2_playbook_on_notify', listening_handler, original_host) | |
# and if none were found, then we raise an error | |
diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py | |
index a2515f8e3d7..89e80370252 100644 | |
--- a/lib/ansible/plugins/strategy/free.py | |
+++ b/lib/ansible/plugins/strategy/free.py | |
@@ -147,6 +147,8 @@ class StrategyModule(StrategyBase): | |
# advance the host, mark the host blocked, and queue it | |
self._blocked_hosts[host_name] = True | |
iterator.set_state_for_host(host.name, state) | |
+ if isinstance(task, Handler): | |
+ del task.notified_hosts[task.notified_hosts.index(host)] | |
try: | |
action = action_loader.get(task.action, class_only=True, collection_list=task.collections) | |
@@ -267,6 +269,8 @@ class StrategyModule(StrategyBase): | |
for new_block in new_blocks: | |
if is_handler: | |
+ for task in new_block.block: | |
+ task.notified_hosts = included_file._hosts[:] | |
# TODO filter tags to allow tags on handlers from include_tasks: merge with the else block | |
# also where handlers are inserted from roles/include_role/import_role and regular handlers | |
final_block = new_block | |
diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py | |
index 6245a886bd1..309f5103dd5 100644 | |
--- a/lib/ansible/plugins/strategy/linear.py | |
+++ b/lib/ansible/plugins/strategy/linear.py | |
@@ -196,9 +196,9 @@ class StrategyModule(StrategyBase): | |
if num_handlers: | |
display.debug("advancing hosts in HANDLERS") | |
- lowest_lockstep_id = min(( | |
- t.lockstep_id for h, (s, t) in host_tasks_to_run | |
- if s.run_state != IteratingStates.COMPLETE and isinstance(t, Handler) | |
+ lowest_handler = min(( | |
+ s.cur_handler_task for h, (s, t) in host_tasks_to_run | |
+ if s.run_state == IteratingStates.HANDLERS | |
)) | |
rvals = [] | |
for host in hosts: | |
@@ -208,8 +208,9 @@ class StrategyModule(StrategyBase): | |
(state, task) = host_state_task | |
if task is None: | |
continue | |
- if getattr(task, 'lockstep_id', -1) == lowest_lockstep_id and state.run_state == IteratingStates.HANDLERS: | |
+ if state.cur_handler_task == lowest_handler and state.run_state == IteratingStates.HANDLERS: | |
iterator.set_state_for_host(host.name, state) | |
+ del task.notified_hosts[task.notified_hosts.index(host)] | |
rvals.append((host, task)) | |
else: | |
rvals.append((host, noop_task)) | |
@@ -394,6 +395,8 @@ class StrategyModule(StrategyBase): | |
display.debug("iterating over new_blocks loaded from include file") | |
for new_block in new_blocks: | |
if is_handler: | |
+ for task in new_block.block: | |
+ task.notified_hosts = included_file._hosts[:] | |
# TODO filter tags to allow tags on handlers from include_tasks: merge with the else block | |
# also where handlers are inserted from roles/include_role/import_role and regular handlers | |
final_block = new_block |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment