Skip to content

Instantly share code, notes, and snippets.

@52617365
Created August 31, 2025 17:24
Show Gist options
  • Select an option

  • Save 52617365/cf2769a3c2055552f7a98c0b459454c8 to your computer and use it in GitHub Desktop.

Select an option

Save 52617365/cf2769a3c2055552f7a98c0b459454c8 to your computer and use it in GitHub Desktop.
Turning Ansible changes into errors.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
from ansible.plugins.callback import CallbackBase
from ansible import constants as C
import sys
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'aggregate'
CALLBACK_NAME = 'check_monitor_state'
def __init__(self):
super(CallbackModule, self).__init__()
self.task_changed_errors = []
def v2_runner_on_ok(self, result):
if result.is_changed():
# Playbook changed something, and it's an error in this callback.
host_label = self.host_label(result)
task_name = result._task.name
# Appending so we can give all of them in the end.
error_string = f"[ERROR]: Task: [{task_name}] - [{host_label}] was not in desired state"
self.task_changed_errors.append(error_string)
# sys.exit(1)
else:
pass
def v2_playbook_on_play_start(self, playbook):
print("Running idempotent checks on destination machines")
pass
def v2_runner_on_skipped(self, result):
host_label = self.host_label(result)
self._display.display(
"[SUCCESS]: idempotent checks were successful on destination machine %s" % host_label,
color=C.COLOR_OK)
def v2_playbook_on_stats(self, stats):
if self.task_changed_errors:
print("Idempotent checks failed on the following hosts:")
for error in self.task_changed_errors:
self._display.display(
error,
color=C.COLOR_ERROR)
# print(error)
sys.exit(1)
else:
print("Idempotent checks were successful on all hosts")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment