Turns Ansible log outputs into plain JSON strings and sends them to an Elasticsearch cluster.
Place the script in your playbook's plugins/callbacks/ directory.
[defaults] | |
callback_plugins = plugins/callbacks/ |
# adapted from https://github.com/petems/ansible-json | |
import os | |
import time | |
import datetime | |
import httplib2 | |
import json | |
import uuid | |
from datetime import datetime | |
from httplib2 import Http | |
from json import JSONEncoder | |
class CallbackModule(object): | |
@staticmethod | |
def convert_string_to_timestamp_ms(value, format): | |
dt = datetime.strptime(value, format) | |
return int(time.mktime(dt.timetuple()) * 1000 + round(dt.microsecond / 1000)) | |
@staticmethod | |
def convert_ansible_datetime_string_to_timestamp_ms(value): | |
# Example input: '2015-02-23 13:48:21.316040' | |
return CallbackModule.convert_string_to_timestamp_ms(value, '%Y-%m-%d %H:%M:%S.%f') | |
@staticmethod | |
def convert_ansible_time_string_to_timestamp_ms(value): | |
# Example input: '0:00:00.004156' | |
return CallbackModule.convert_string_to_timestamp_ms('1970-01-01 0' + value, '%Y-%m-%d %H:%M:%S.%f') | |
def __init__(self): | |
self.uuid = uuid.uuid1() | |
self.http = Http() | |
self.http_headers = {'Content-type': 'application/json'} | |
self.http_method = 'PUT' | |
self.http_uri = 'http://localhost:9200/ansible-deployments-' + time.strftime('%Y') + '/playbook-run-' + str(self.uuid) | |
def json_create_message(self, host, status, res): | |
if type(res) == type(dict()): | |
if 'verbose_override' not in res: | |
for i in ['start', 'end']: | |
if i in res: | |
res[i] = self.convert_ansible_datetime_string_to_timestamp_ms(res[i]) | |
for i in ['delta']: | |
if i in res: | |
res[i] = self.convert_ansible_time_string_to_timestamp_ms(res[i]) | |
res.update({unicode('host'): unicode(host)}) | |
res.update({unicode('status'): unicode(status)}) | |
if 'start' in res: | |
timestamp = res['start'] | |
else: | |
timestamp = int(time.time() * 1000) | |
res.update({'timestamp': timestamp}) | |
return (str(timestamp), JSONEncoder().encode(res)) | |
return (None, None) | |
def json_post_message(self, id, msg): | |
response, content = self.http.request(self.http_uri + '/' + id, self.http_method, msg, self.http_headers) | |
print('Data available at: ' + self.http_uri + '/' + id + '?pretty=1') | |
def json_post_result(self, host, status, res): | |
id, msg = self.json_create_message(host, status, res) | |
if id != None: | |
self.json_post_message(id, msg) | |
def on_any(self, *args, **kwargs): | |
pass | |
def runner_on_failed(self, host, res, ignore_errors=False): | |
self.json_post_result(host, 'failed', res) | |
def runner_on_ok(self, host, res): | |
self.json_post_result(host, 'ok', res) | |
def runner_on_error(self, host, msg): | |
json_post_result(host, 'error', {unicode('msg'): msg}) | |
pass | |
def runner_on_skipped(self, host, item=None): | |
pass | |
def runner_on_unreachable(self, host, res): | |
self.json_post_result(host, 'unreachable', res) | |
def runner_on_no_hosts(self): | |
pass | |
def runner_on_async_poll(self, host, res, jid, clock): | |
self.json_post_result(host, 'async_poll', res) | |
def runner_on_async_ok(self, host, res, jid): | |
self.json_post_result(host, 'async_ok', res) | |
def runner_on_async_failed(self, host, res, jid): | |
self.json_post_result(host, 'async_failed', res) | |
def playbook_on_start(self): | |
pass | |
def playbook_on_notify(self, host, handler): | |
pass | |
def playbook_on_no_hosts_matched(self): | |
pass | |
def playbook_on_no_hosts_remaining(self): | |
pass | |
def playbook_on_task_start(self, name, is_conditional): | |
pass | |
def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None): | |
pass | |
def playbook_on_setup(self): | |
pass | |
def playbook_on_import_for_host(self, host, imported_file): | |
pass | |
def playbook_on_not_import_for_host(self, host, missing_file): | |
pass | |
def playbook_on_play_start(self, pattern): | |
pass | |
def playbook_on_stats(self, stats): | |
pass |
Hi,
I faced the same issue,
I changed the
class CallbackModule(object):
TO class CallbackModule(CallbackBase):
After that the error disappeared
ansible-playbook -i hosts main.yml
PLAY [Checking for the file and creating it if not present] ********************
TASK [setup] *******************************************************************
ok: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456306127?pretty=1
TASK [filetester : Check if a file exists] *************************************
ok: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456308776?pretty=1
TASK [filetester : Display messages] *******************************************
skipping: [NEO-LABLINUXSCRIPT]
TASK [filetester : Copying files] **********************************************
changed: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456310722?pretty=1
PLAY RECAP *********************************************************************
NEO-LABLINUXSCRIPT : ok=3 changed=1 unreachable=0 failed=0
Hope it helps
Might help to note that you'll need to import it:
from ansible.plugins.callback import CallbackBase
I am confused by this. Does this feed data in to elastic using logstash or an ingest node?
I am confused by this. Does this feed data in to elastic using logstash or an ingest node?
No, this is making a call to a Elastic index directly.
# adapted from https://github.com/petems/ansible-json
import os
import time
import datetime
import httplib2
import json
import uuid
from datetime import datetime
from httplib2 import Http
from json import JSONEncoder
class CallbackModule(CallbackBase):
@staticmethod
def convert_string_to_timestamp_ms(value, format):
dt = datetime.strptime(value, format)
return int(time.mktime(dt.timetuple()) * 1000 + round(dt.microsecond / 1000))
@staticmethod
def convert_ansible_datetime_string_to_timestamp_ms(value):
# Example input: '2015-02-23 13:48:21.316040'
return CallbackModule.convert_string_to_timestamp_ms(value, '%Y-%m-%d %H:%M:%S.%f')
@staticmethod
def convert_ansible_time_string_to_timestamp_ms(value):
# Example input: '0:00:00.004156'
return CallbackModule.convert_string_to_timestamp_ms('1970-01-01 0' + value, '%Y-%m-%d %H:%M:%S.%f')
def __init__(self):
self.uuid = uuid.uuid1()
self.http = Http()
self.http_headers = {'Content-type': 'application/json'}
self.http_method = 'PUT'
self.http_uri = 'http://localhost:9200/ansible-deployments-' + time.strftime('%Y') + '/playbook-run-' + str(self.uuid)
def json_create_message(self, host, status, res):
if type(res) == type(dict()):
if 'verbose_override' not in res:
for i in ['start', 'end']:
if i in res:
res[i] = self.convert_ansible_datetime_string_to_timestamp_ms(res[i])
for i in ['delta']:
if i in res:
res[i] = self.convert_ansible_time_string_to_timestamp_ms(res[i])
res.update({unicode('host'): unicode(host)})
res.update({unicode('status'): unicode(status)})
if 'start' in res:
timestamp = res['start']
else:
timestamp = int(time.time() * 1000)
res.update({'timestamp': timestamp})
return (str(timestamp), JSONEncoder().encode(res))
return (None, None)
def json_post_message(self, id, msg):
response, content = self.http.request(self.http_uri + '/' + id, self.http_method, msg, self.http_headers)
print('Data available at: ' + self.http_uri + '/' + id + '?pretty=1')
def json_post_result(self, host, status, res):
id, msg = self.json_create_message(host, status, res)
if id != None:
self.json_post_message(id, msg)
def on_any(self, *args, **kwargs):
pass
def runner_on_failed(self, host, res, ignore_errors=False):
self.json_post_result(host, 'failed', res)
def runner_on_ok(self, host, res):
self.json_post_result(host, 'ok', res)
def runner_on_error(self, host, msg):
json_post_result(host, 'error', {unicode('msg'): msg})
pass
def runner_on_skipped(self, host, item=None):
pass
def runner_on_unreachable(self, host, res):
self.json_post_result(host, 'unreachable', res)
def runner_on_no_hosts(self):
pass
def runner_on_async_poll(self, host, res, jid, clock):
self.json_post_result(host, 'async_poll', res)
def runner_on_async_ok(self, host, res, jid):
self.json_post_result(host, 'async_ok', res)
def runner_on_async_failed(self, host, res, jid):
self.json_post_result(host, 'async_failed', res)
def playbook_on_start(self):
pass
def playbook_on_notify(self, host, handler):
pass
def playbook_on_no_hosts_matched(self):
pass
def playbook_on_no_hosts_remaining(self):
pass
def playbook_on_task_start(self, name, is_conditional):
pass
def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
pass
def playbook_on_setup(self):
pass
def playbook_on_import_for_host(self, host, imported_file):
pass
def playbook_on_not_import_for_host(self, host, missing_file):
pass
def playbook_on_play_start(self, pattern):
pass
def playbook_on_stats(self, stats):
pass
Hi there,
I am trying to use this call back script but I get this error:
[WARNING]: Failure when attempting to use callback plugin (</usr/share/ansible_plugins/callback_plugins/ansible_log_to_elasticsearch.CallbackModule object at
0x1ffca50>): runner_on_ok() takes exactly 3 arguments (2 given)
[WARNING]: Failure when attempting to use callback plugin (</usr/share/ansible_plugins/callback_plugins/inventory.CallbackModule object at 0x20197d0>):
runner_on_ok() takes exactly 3 arguments (2 given)