Skip to content

Instantly share code, notes, and snippets.

@jpic
Created September 18, 2024 12:58
Show Gist options
  • Save jpic/49dc2d1bb078a7d83786375571fa4897 to your computer and use it in GitHub Desktop.
Save jpic/49dc2d1bb078a7d83786375571fa4897 to your computer and use it in GitHub Desktop.
Base action plugin class for ansible
import fcntl
import hashlib
import os
import re
import subprocess
from ansible.plugins.action import ActionBase, display
from ansible.plugins.filter.core import to_nice_yaml
# colors:
# black
# bright gray
# blue
# white
# green
# cyan
# bright green
# red
# bright cyan
# purple
# bright red
# yellow
# bright purple
# dark gray
# magenta
# bright magenta
# normal
# 7-bit C1 ANSI sequences
ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
class ActionBase(ActionBase):
display = display
class Locker:
def __init__(self, lock_path, blocking=True):
self.lock_path = lock_path
self.blocking = blocking
self.acquired = True
def __enter__(self):
display.vv(f'Aquiring lock for {self.lock_path}...')
self.fp = open(self.lock_path, 'w+')
if self.blocking:
display.vv(f'Acquired lock for {self.lock_path}...')
fcntl.flock(self.fp.fileno(), fcntl.LOCK_EX)
else:
try:
fcntl.flock(
self.fp.fileno(),
fcntl.LOCK_EX | fcntl.LOCK_NB,
)
except BlockingIOError:
self.acquired = False
else:
display.vv(f'Acquired lock for {self.lock_path}...')
return self
def __exit__(self, _type, value, tb):
display.vv(f'Releasing lock for {self.lock_path}...')
if self.acquired or not self.blocking:
fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN)
self.fp.close()
display.vv(f'Released lock for {self.lock_path}...')
if self.acquired:
try:
os.unlink(self.lock_path)
except FileNotFoundError:
pass # already deleted, or locked by another process
else:
display.vv(f'Deleted {self.lock_path}...')
namespace = ''
@property
def path_separator(self):
if self.task_vars['ansible_os_family'] == 'Windows':
return '\\'
return '/'
def path_join(self, *parts):
return self.path_separator.join(parts)
@property
def win(self):
if self.task_vars['ansible_system'].lower() != 'linux':
return 'win_'
return ''
def run_action(self, name, task_vars=None, **kwargs):
new_task = self._task.copy()
new_task.args = kwargs
task_vars = task_vars or self.task_vars.copy()
action = self._shared_loader_obj.action_loader.get(
name,
task=new_task,
connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=self._templar,
shared_loader_obj=self._shared_loader_obj
)
result = action.run(task_vars=task_vars)
self.print_result(name, result)
return result
def subprocess(self, *args, change=True, **kwargs):
if isinstance(args[0], (list, tuple)):
msg = " ".join(args[0])
else:
msg = args[0]
self.print_starting_local(msg)
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.PIPE)
result = dict(stdout_lines=[], stderr_lines=[], args=args)
proc = subprocess.Popen(
*args,
**kwargs,
)
os.set_blocking(proc.stdout.fileno(), False)
os.set_blocking(proc.stderr.fileno(), False)
while True:
rc = proc.poll()
for handler in ('stdout', 'stderr'):
read = getattr(proc, handler).read()
if read:
read = read.decode()
if read:
display.display(read, newline=False)
result[f'{handler}_lines'].append(read)
if rc is not None:
break # subprocess has terminated
for handler in ('stdout', 'stderr'):
result[handler] = ''.join(result.pop(f'{handler}_lines'))
stdout, stderr = proc.communicate()
result['rc'] = proc.returncode
if proc.returncode:
result['failed'] = True
else:
if change:
result['changed'] = True
self.print_starting_local(f'\n{msg.split(" ")[0]} rc: {result["rc"]}')
return result
def wget(self, url):
self.print_starting_local(f'wget({url})')
if not os.path.isdir(self.local_path):
os.makedirs(self.local_path)
file_name = url.split('/')[-1]
local_file = os.path.join(self.local_path, file_name)
lock_file = os.path.join(self.local_path, f'.{file_name}.lock')
# TODO: is this pre-locker era code? why no context manager here??
if os.path.exists(local_file) and not os.path.exists(lock_file):
if not os.path.exists(lock_file):
# consider the file is not currently being downloaded
self.display.v(f'{local_file} already downloaded')
return dict(changed=False)
with self.Locker(lock_file):
if os.path.exists(local_file):
# the file is currently being written by another process, wait
# for lock and return
self.display.v(f'{local_file} downloaded by another process')
return dict(changed=False)
tries = 10
while tries:
result = self._wget(url, local_file)
if result == 'retry':
tries -= 1
else:
break
return result
def _wget(self, url, local_file):
cmd = [
'/usr/bin/wget',
'--output-document',
local_file,
'--continue',
'--progress=bar:force',
]
if self.headers:
for key, value in self.headers.items():
cmd += ['--header', f'{key}: {value}']
cmd.append(url)
result = self.subprocess(cmd)
if result['rc']:
# remove output file if wget failed
os.unlink(local_file)
return result
# check file if possible
self.print_starting_local('checking downloaded file md5...')
curl = self.subprocess(['/usr/bin/curl', '--fail', '-I', url])
md5_checksum = None
for line in curl['stdout'].split('\n'):
if 'checksum' not in line:
continue
key, value = line.split(': ')
if key.lower() == 'x-checksum-md5':
md5_checksum = value.strip()
if not md5_checksum:
# can't check
return result
md5 = hashlib.md5()
# for large files
# https://stackoverflow.com/questions/22058048/hashing-a-file-in-python
BUF_SIZE = 65536 # lets read stuff in 64kb chunks!
with open(local_file, 'rb') as f:
while True:
data = f.read(BUF_SIZE)
if not data:
break
md5.update(data)
if md5_checksum != md5.hexdigest():
self.print_starting_local(
f'Downloaded {md5_checksum} != {md5.hexdigest()}',
)
self.print_starting_local(f'Removing {local_file}')
os.unlink(local_file)
return 'retry'
result['md5'] = md5_checksum
return result
def subprocess_remote(self, cmd, **kwargs):
new_task = self._task.copy()
new_task.args = dict(_raw_params=cmd, **kwargs)
self.print_config(shell=cmd)
shell_action = self._shared_loader_obj.action_loader.get(
'ansible.builtin.shell',
task=new_task,
connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=self._templar,
shared_loader_obj=self._shared_loader_obj,
)
_result = shell_action.run(task_vars=self.task_vars.copy())
self.print_shell_result(_result)
return _result
def print(self, *args, **kwargs):
display.display(*args, **kwargs)
def print_result(self, module, result):
self.print_yaml(
{module: result},
color='yellow' if result.get('changed') else 'green'
)
def print_shell_result(self, result):
self.print_starting_remote(result['cmd'])
if 'stderr_lines' in result:
self.print('\n'.join(result['stderr_lines']))
if 'stdout_lines' in result:
self.print('\n'.join(result['stdout_lines']))
def print_config(self, **kwargs):
self.print_yaml(kwargs, color='blue')
def print_yaml(self, result, color=None):
if 'invocation' in result:
result.pop('invocation')
display.display(to_nice_yaml(result), color=color)
def print_starting_remote(self, content):
display.display(content, color='blue')
def print_starting_local(self, content):
display.display(content, color='bright purple')
def print_ongoing_remote(self, content):
display.display(content, color='cyan')
def print_ongoing_local(self, content):
display.display(content, color='purple')
def print_changer(self, content):
display.display(content, color='yellow')
def print_ok(self, content):
display.display(content, color='green')
def print_error(self, content):
display.display(content, color='red')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment