Skip to content

Instantly share code, notes, and snippets.

@Elektordi
Last active January 3, 2025 16:05
Show Gist options
  • Save Elektordi/c4e7c7e5ff5b0f31ab9e006972b4b524 to your computer and use it in GitHub Desktop.
Save Elektordi/c4e7c7e5ff5b0f31ab9e006972b4b524 to your computer and use it in GitHub Desktop.
Ansible callback plugin to store results in SQLite
DOCUMENTATION = '''
name: sqlite
callback_type: store
author: Guillaume "Elektordi" Genty <[email protected]>
requirements:
- enable in configuration
- install sqlite3
short_description: Store playbook results in sqlite
version_added: "1.0" # for collections, use the collection version, not the Ansible version
description:
- Store playbook results in sqlite
'''
import os
import sqlite3
import re
import unicodedata
import json
from ansible.plugins.callback import CallbackBase
def slugify(text):
text = unicodedata.normalize('NFKD', text).lower()
return re.sub(r'[\W_]+', '_', text).strip('_')
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'store'
CALLBACK_NAME = 'sqlite'
# only needed if you ship it and don't want to enable by default
CALLBACK_NEEDS_ENABLED = True
def v2_playbook_on_start(self, playbook):
dbname = "%s/store.sqlite" % (playbook._basedir)
self.con = sqlite3.connect(dbname)
self.cur = self.con.cursor()
self.table = slugify(os.path.splitext(playbook._file_name)[0])
if len(playbook._entries) == 1:
self.table = slugify(playbook._entries[0]._name)
self.cur.execute('CREATE TABLE IF NOT EXISTS `%s` (`host` TEXT, `last` TEXT);'%(self.table))
self.cur.execute('CREATE UNIQUE INDEX IF NOT EXISTS `%s_host_unique` ON `%s` (`host`);'%(self.table, self.table))
res = self.cur.execute('PRAGMA table_info(`%s`);'%(self.table))
self.existing_columns = [x[1] for x in res.fetchall()]
self.map = {}
def v2_runner_on_ok(self, result):
if result._result.get('changed'):
self.save_result("CHANGED", result)
else:
self.save_result("OK", result)
def v2_runner_on_failed(self, result, ignore_errors = False):
if ignore_errors:
self.save_result("FAILED_IGNORED", result)
else:
self.save_result("FAILED", result)
def v2_runner_on_skipped(self, result):
self.save_result("SKIPPED", result)
def v2_runner_on_unreachable(self, result):
self.save_result("UNREACHABLE", result)
def save_result(self, status, result):
if "censored" in result._result:
return
col = self.map.get(result.task_name)
if not col:
col = slugify(result.task_name)
self.map[result.task_name] = col
if col not in self.existing_columns:
self.cur.execute('ALTER TABLE `%s` ADD COLUMN `%s` TEXT;'%(self.table, col))
self.cur.execute('ALTER TABLE `%s` ADD COLUMN `%s_result` TEXT;'%(self.table, col))
self.existing_columns.append(col)
self.cur.execute('INSERT INTO `%s`(host) VALUES(?) ON CONFLICT DO NOTHING;'%(self.table), [result._host.name])
self.cur.execute('UPDATE `%s` SET `last` = datetime(), `%s` = ?, `%s_result` = ? WHERE `host` = ?;'%(self.table, col, col), [status, json.dumps(result._result), result._host.name])
self.con.commit()
@Elektordi
Copy link
Author

Store sqlite.py in callback_plugins/ directory (near your playbook).
And add to ansible.cfg:
callbacks_enabled = sqlite

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment