Skip to content

Instantly share code, notes, and snippets.

@jctanner
Created February 1, 2019 14:42
Show Gist options
  • Save jctanner/512ec07171d95e61a5b7fd5f9103a301 to your computer and use it in GitHub Desktop.
Save jctanner/512ec07171d95e61a5b7fd5f9103a301 to your computer and use it in GitHub Desktop.
testinfra ansible_runner port
# coding: utf-8
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=import-error,no-name-in-module,no-member
# pylint: disable=unexpected-keyword-arg,no-value-for-parameter
# pylint: disable=arguments-differ
from __future__ import unicode_literals
from __future__ import absolute_import
import glob
import json
import os
import pprint
import shutil
import subprocess
import tempfile
import yaml
# the real ansible-runner
import ansible_runner
__all__ = ['AnsibleRunner', 'to_bytes']
def to_bytes(data):
'''Why!?!?'''
return b'%s' % data
class AnsibleInventoryException(Exception):
def __init__(self, message):
self.message = message
class AnsibleRunnerV2(object):
# testinfra api
_runners = {}
# testinfra api
host_list = None
# variable cache
variables = {}
def __init__(self, host_list=None):
# host_list is the list of inventory files, aka -i
self.host_list = host_list
@classmethod
def get_runner(cls, inventory):
# stores a copy of the runner in a dict keyed by inv
if inventory not in cls._runners:
cls._runners[inventory] = cls(inventory)
return cls._runners[inventory]
def fetch_inventory(self, host=None):
'''Helper function for ansible-inventory'''
cmd = 'ansible-inventory -i %s' % self.host_list
if host is not None:
cmd += ' --host=%s' % host
else:
cmd += ' --list'
p = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
(so, se) = p.communicate()
if p.returncode != 0:
msg = 'ansible-inventory failed: %s' % se
raise AnsibleInventoryException(msg)
inv = json.loads(so)
return inv
def get_hosts(self, pattern=None):
'''Return a list of host names from inventory via the pattern'''
inv = self.fetch_inventory()
return list(inv['_meta']['hostvars'].keys())
def get_variables(self, host, refresh=True):
'''Get a mixture of inventory vars and magic vars'''
if host not in self.variables or refresh:
_vars = self.variables.get(host, {})
# inventory vars
_vars.update(self.fetch_inventory(host=host))
# this is a hack to get the magic vars
res = self.run(host, 'debug', 'var=hostvars')
_vars.update(res.get('hostvars', {}).get(host, {}))
# one of the unit tests insist this should be returned
_vars['inventory_hostname'] = host
self.variables[host] = _vars
return self.variables[host]
def run(self, host, module_name, module_args=None, **kwargs):
'''Invokes a single module on a single host and returns dict results'''
# runner must have a directory based payload
data_dir = tempfile.mkdtemp()
if os.path.exists(data_dir):
shutil.rmtree(data_dir)
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# runner must have an inventory file
inv_dir = os.path.join(data_dir, 'inventory')
if not os.path.exists(inv_dir):
os.makedirs(inv_dir)
inv_file = os.path.join(inv_dir, os.path.basename(self.host_list))
shutil.copy(self.host_list, inv_file)
# molecule inventories use lookups
this_env = os.environ.copy()
env_keys = list(this_env.keys())
for ekey in env_keys:
if not ekey.startswith('MOLECULE'):
this_env.pop(ekey)
env_dir = os.path.join(data_dir, 'env')
if not os.path.exists(env_dir):
os.makedirs(env_dir)
env_file = os.path.join(env_dir, 'envvars')
with open(env_file, 'w') as f:
f.write('---\n')
f.write(yaml.dump(this_env))
# build the kwarg payload ansible-runner requires
runner_kwargs = {
'private_data_dir': data_dir,
'host_pattern': host,
'module': module_name,
'module_args': module_args
}
# ansible-runner does not have kwargs for these
for opt in ['become', 'check']:
if kwargs.get(opt, False):
if 'cmdline' not in runner_kwargs:
runner_kwargs['cmdline'] = '--%s' % opt
else:
runner_kwargs['cmdline'] += ' --%s' % opt
# invoke ansible-runer -> ansible adhoc
r = ansible_runner.run(**runner_kwargs)
# events and stats are broken in this env
#import epdb; epdb.st()
'''
events = [x for x in r.events]
#import q; q(events)
for event in events:
#import q; q(event)
#import q; q(r.stats)
result = {
'stdout': '',
'stdout_lines': [],
'stderr': '',
'stderr_lines': [],
'rc': 0
}
'''
#for event in r.events:
# import epdb; epdb.st()
#fds = [(x, os.path.realpath(x)) for x in glob.glob('/proc/self/fd/*')]
#fds = sorted(fds)
#for fd in fds:
# import q; q(fd)
'''
# since events aren't being captured we have to read manually
eventfs = glob.glob('%s/artifacts/*/job_events/*.json' % data_dir)
this_event = None
failed = False
for eventf in eventfs:
with open(eventf, 'r') as f:
event = json.loads(f.read())
if event.get('event') in ['runner_on_ok', 'runner_on_failed']:
if event['event'] == 'runner_on_failed':
failed = True
this_event = event
break
'''
this_event = None
failed = False
events = [x for x in r.events]
#events = self.get_events_from_disk(data_dir)
import q; q(events)
for event in events:
if event.get('event') in ['runner_on_ok', 'runner_on_failed']:
if event['event'] == 'runner_on_failed':
failed = True
this_event = event
break
# extract the module return data from the events
result = {}
if this_event is not None:
if 'res' in this_event['event_data']:
result = this_event['event_data']['res'].copy()
else:
pass
if 'rc' not in result:
if not failed:
result['rc'] = 0
else:
result['rc'] = -1
if 'stdout' not in result:
result['stdout'] = ''
if 'stderr' not in result:
result['stderr'] = ''
events = [x for x in r.events]
# cleanup
#shutil.rmtree(data_dir)
return result
def get_events_from_disk(self, datadir):
# since events aren't being captured we have to read manually
eventfs = glob.glob('%s/artifacts/*/job_events/*.json' % datadir)
events = []
for eventf in eventfs:
with open(eventf, 'r') as f:
events.append(json.loads(f.read()))
return events
AnsibleRunner = AnsibleRunnerV2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment