Last active
January 3, 2025 16:05
-
-
Save Elektordi/c4e7c7e5ff5b0f31ab9e006972b4b524 to your computer and use it in GitHub Desktop.
Ansible callback plugin to store results in SQLite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DOCUMENTATION = ''' | |
name: sqlite | |
callback_type: store | |
author: Guillaume "Elektordi" Genty <[email protected]> | |
requirements: | |
- enable in configuration | |
- install sqlite3 | |
short_description: Store playbook results in sqlite | |
version_added: "1.0" # for collections, use the collection version, not the Ansible version | |
description: | |
- Store playbook results in sqlite | |
''' | |
import os | |
import sqlite3 | |
import re | |
import unicodedata | |
import json | |
from ansible.plugins.callback import CallbackBase | |
def slugify(text): | |
text = unicodedata.normalize('NFKD', text).lower() | |
return re.sub(r'[\W_]+', '_', text).strip('_') | |
class CallbackModule(CallbackBase): | |
CALLBACK_VERSION = 2.0 | |
CALLBACK_TYPE = 'store' | |
CALLBACK_NAME = 'sqlite' | |
# only needed if you ship it and don't want to enable by default | |
CALLBACK_NEEDS_ENABLED = True | |
def v2_playbook_on_start(self, playbook): | |
dbname = "%s/store.sqlite" % (playbook._basedir) | |
self.con = sqlite3.connect(dbname) | |
self.cur = self.con.cursor() | |
self.table = slugify(os.path.splitext(playbook._file_name)[0]) | |
if len(playbook._entries) == 1: | |
self.table = slugify(playbook._entries[0]._name) | |
self.cur.execute('CREATE TABLE IF NOT EXISTS `%s` (`host` TEXT, `last` TEXT);'%(self.table)) | |
self.cur.execute('CREATE UNIQUE INDEX IF NOT EXISTS `%s_host_unique` ON `%s` (`host`);'%(self.table, self.table)) | |
res = self.cur.execute('PRAGMA table_info(`%s`);'%(self.table)) | |
self.existing_columns = [x[1] for x in res.fetchall()] | |
self.map = {} | |
def v2_runner_on_ok(self, result): | |
if result._result.get('changed'): | |
self.save_result("CHANGED", result) | |
else: | |
self.save_result("OK", result) | |
def v2_runner_on_failed(self, result, ignore_errors = False): | |
if ignore_errors: | |
self.save_result("FAILED_IGNORED", result) | |
else: | |
self.save_result("FAILED", result) | |
def v2_runner_on_skipped(self, result): | |
self.save_result("SKIPPED", result) | |
def v2_runner_on_unreachable(self, result): | |
self.save_result("UNREACHABLE", result) | |
def save_result(self, status, result): | |
if "censored" in result._result: | |
return | |
col = self.map.get(result.task_name) | |
if not col: | |
col = slugify(result.task_name) | |
self.map[result.task_name] = col | |
if col not in self.existing_columns: | |
self.cur.execute('ALTER TABLE `%s` ADD COLUMN `%s` TEXT;'%(self.table, col)) | |
self.cur.execute('ALTER TABLE `%s` ADD COLUMN `%s_result` TEXT;'%(self.table, col)) | |
self.existing_columns.append(col) | |
self.cur.execute('INSERT INTO `%s`(host) VALUES(?) ON CONFLICT DO NOTHING;'%(self.table), [result._host.name]) | |
self.cur.execute('UPDATE `%s` SET `last` = datetime(), `%s` = ?, `%s_result` = ? WHERE `host` = ?;'%(self.table, col, col), [status, json.dumps(result._result), result._host.name]) | |
self.con.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Store sqlite.py in callback_plugins/ directory (near your playbook).
And add to ansible.cfg:
callbacks_enabled = sqlite