Created
September 18, 2024 12:58
-
-
Save jpic/49dc2d1bb078a7d83786375571fa4897 to your computer and use it in GitHub Desktop.
Base action plugin class for ansible
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fcntl | |
import hashlib | |
import os | |
import re | |
import subprocess | |
from ansible.plugins.action import ActionBase, display | |
from ansible.plugins.filter.core import to_nice_yaml | |
# colors: | |
# black | |
# bright gray | |
# blue | |
# white | |
# green | |
# cyan | |
# bright green | |
# red | |
# bright cyan | |
# purple | |
# bright red | |
# yellow | |
# bright purple | |
# dark gray | |
# magenta | |
# bright magenta | |
# normal | |
# 7-bit C1 ANSI sequences | |
ansi_escape = re.compile(r''' | |
\x1B # ESC | |
(?: # 7-bit C1 Fe (except CSI) | |
[@-Z\\-_] | |
| # or [ for CSI, followed by a control sequence | |
\[ | |
[0-?]* # Parameter bytes | |
[ -/]* # Intermediate bytes | |
[@-~] # Final byte | |
) | |
''', re.VERBOSE) | |
class ActionBase(ActionBase): | |
display = display | |
class Locker: | |
def __init__(self, lock_path, blocking=True): | |
self.lock_path = lock_path | |
self.blocking = blocking | |
self.acquired = True | |
def __enter__(self): | |
display.vv(f'Aquiring lock for {self.lock_path}...') | |
self.fp = open(self.lock_path, 'w+') | |
if self.blocking: | |
display.vv(f'Acquired lock for {self.lock_path}...') | |
fcntl.flock(self.fp.fileno(), fcntl.LOCK_EX) | |
else: | |
try: | |
fcntl.flock( | |
self.fp.fileno(), | |
fcntl.LOCK_EX | fcntl.LOCK_NB, | |
) | |
except BlockingIOError: | |
self.acquired = False | |
else: | |
display.vv(f'Acquired lock for {self.lock_path}...') | |
return self | |
def __exit__(self, _type, value, tb): | |
display.vv(f'Releasing lock for {self.lock_path}...') | |
if self.acquired or not self.blocking: | |
fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN) | |
self.fp.close() | |
display.vv(f'Released lock for {self.lock_path}...') | |
if self.acquired: | |
try: | |
os.unlink(self.lock_path) | |
except FileNotFoundError: | |
pass # already deleted, or locked by another process | |
else: | |
display.vv(f'Deleted {self.lock_path}...') | |
namespace = '' | |
@property | |
def path_separator(self): | |
if self.task_vars['ansible_os_family'] == 'Windows': | |
return '\\' | |
return '/' | |
def path_join(self, *parts): | |
return self.path_separator.join(parts) | |
@property | |
def win(self): | |
if self.task_vars['ansible_system'].lower() != 'linux': | |
return 'win_' | |
return '' | |
def run_action(self, name, task_vars=None, **kwargs): | |
new_task = self._task.copy() | |
new_task.args = kwargs | |
task_vars = task_vars or self.task_vars.copy() | |
action = self._shared_loader_obj.action_loader.get( | |
name, | |
task=new_task, | |
connection=self._connection, | |
play_context=self._play_context, | |
loader=self._loader, | |
templar=self._templar, | |
shared_loader_obj=self._shared_loader_obj | |
) | |
result = action.run(task_vars=task_vars) | |
self.print_result(name, result) | |
return result | |
def subprocess(self, *args, change=True, **kwargs): | |
if isinstance(args[0], (list, tuple)): | |
msg = " ".join(args[0]) | |
else: | |
msg = args[0] | |
self.print_starting_local(msg) | |
kwargs.setdefault('stdout', subprocess.PIPE) | |
kwargs.setdefault('stderr', subprocess.PIPE) | |
result = dict(stdout_lines=[], stderr_lines=[], args=args) | |
proc = subprocess.Popen( | |
*args, | |
**kwargs, | |
) | |
os.set_blocking(proc.stdout.fileno(), False) | |
os.set_blocking(proc.stderr.fileno(), False) | |
while True: | |
rc = proc.poll() | |
for handler in ('stdout', 'stderr'): | |
read = getattr(proc, handler).read() | |
if read: | |
read = read.decode() | |
if read: | |
display.display(read, newline=False) | |
result[f'{handler}_lines'].append(read) | |
if rc is not None: | |
break # subprocess has terminated | |
for handler in ('stdout', 'stderr'): | |
result[handler] = ''.join(result.pop(f'{handler}_lines')) | |
stdout, stderr = proc.communicate() | |
result['rc'] = proc.returncode | |
if proc.returncode: | |
result['failed'] = True | |
else: | |
if change: | |
result['changed'] = True | |
self.print_starting_local(f'\n{msg.split(" ")[0]} rc: {result["rc"]}') | |
return result | |
def wget(self, url): | |
self.print_starting_local(f'wget({url})') | |
if not os.path.isdir(self.local_path): | |
os.makedirs(self.local_path) | |
file_name = url.split('/')[-1] | |
local_file = os.path.join(self.local_path, file_name) | |
lock_file = os.path.join(self.local_path, f'.{file_name}.lock') | |
# TODO: is this pre-locker era code? why no context manager here?? | |
if os.path.exists(local_file) and not os.path.exists(lock_file): | |
if not os.path.exists(lock_file): | |
# consider the file is not currently being downloaded | |
self.display.v(f'{local_file} already downloaded') | |
return dict(changed=False) | |
with self.Locker(lock_file): | |
if os.path.exists(local_file): | |
# the file is currently being written by another process, wait | |
# for lock and return | |
self.display.v(f'{local_file} downloaded by another process') | |
return dict(changed=False) | |
tries = 10 | |
while tries: | |
result = self._wget(url, local_file) | |
if result == 'retry': | |
tries -= 1 | |
else: | |
break | |
return result | |
def _wget(self, url, local_file): | |
cmd = [ | |
'/usr/bin/wget', | |
'--output-document', | |
local_file, | |
'--continue', | |
'--progress=bar:force', | |
] | |
if self.headers: | |
for key, value in self.headers.items(): | |
cmd += ['--header', f'{key}: {value}'] | |
cmd.append(url) | |
result = self.subprocess(cmd) | |
if result['rc']: | |
# remove output file if wget failed | |
os.unlink(local_file) | |
return result | |
# check file if possible | |
self.print_starting_local('checking downloaded file md5...') | |
curl = self.subprocess(['/usr/bin/curl', '--fail', '-I', url]) | |
md5_checksum = None | |
for line in curl['stdout'].split('\n'): | |
if 'checksum' not in line: | |
continue | |
key, value = line.split(': ') | |
if key.lower() == 'x-checksum-md5': | |
md5_checksum = value.strip() | |
if not md5_checksum: | |
# can't check | |
return result | |
md5 = hashlib.md5() | |
# for large files | |
# https://stackoverflow.com/questions/22058048/hashing-a-file-in-python | |
BUF_SIZE = 65536 # lets read stuff in 64kb chunks! | |
with open(local_file, 'rb') as f: | |
while True: | |
data = f.read(BUF_SIZE) | |
if not data: | |
break | |
md5.update(data) | |
if md5_checksum != md5.hexdigest(): | |
self.print_starting_local( | |
f'Downloaded {md5_checksum} != {md5.hexdigest()}', | |
) | |
self.print_starting_local(f'Removing {local_file}') | |
os.unlink(local_file) | |
return 'retry' | |
result['md5'] = md5_checksum | |
return result | |
def subprocess_remote(self, cmd, **kwargs): | |
new_task = self._task.copy() | |
new_task.args = dict(_raw_params=cmd, **kwargs) | |
self.print_config(shell=cmd) | |
shell_action = self._shared_loader_obj.action_loader.get( | |
'ansible.builtin.shell', | |
task=new_task, | |
connection=self._connection, | |
play_context=self._play_context, | |
loader=self._loader, | |
templar=self._templar, | |
shared_loader_obj=self._shared_loader_obj, | |
) | |
_result = shell_action.run(task_vars=self.task_vars.copy()) | |
self.print_shell_result(_result) | |
return _result | |
def print(self, *args, **kwargs): | |
display.display(*args, **kwargs) | |
def print_result(self, module, result): | |
self.print_yaml( | |
{module: result}, | |
color='yellow' if result.get('changed') else 'green' | |
) | |
def print_shell_result(self, result): | |
self.print_starting_remote(result['cmd']) | |
if 'stderr_lines' in result: | |
self.print('\n'.join(result['stderr_lines'])) | |
if 'stdout_lines' in result: | |
self.print('\n'.join(result['stdout_lines'])) | |
def print_config(self, **kwargs): | |
self.print_yaml(kwargs, color='blue') | |
def print_yaml(self, result, color=None): | |
if 'invocation' in result: | |
result.pop('invocation') | |
display.display(to_nice_yaml(result), color=color) | |
def print_starting_remote(self, content): | |
display.display(content, color='blue') | |
def print_starting_local(self, content): | |
display.display(content, color='bright purple') | |
def print_ongoing_remote(self, content): | |
display.display(content, color='cyan') | |
def print_ongoing_local(self, content): | |
display.display(content, color='purple') | |
def print_changer(self, content): | |
display.display(content, color='yellow') | |
def print_ok(self, content): | |
display.display(content, color='green') | |
def print_error(self, content): | |
display.display(content, color='red') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment