Created
December 29, 2024 02:04
-
-
Save jikamens/1951c4193fa00715d3fac48c1bc36550 to your computer and use it in GitHub Desktop.
Ansible module for editing CUPS config files such as printers.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Copyright (c) 2024, Jonathan Kamens <[email protected]> | |
# GNU General Public License v3.0+ | |
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
# SPDX-License-Identifier: GPL-3.0-or-later | |
import copy | |
import os | |
import re | |
from stat import S_IMODE | |
import sys | |
from tempfile import NamedTemporaryFile | |
from ansible.module_utils.basic import AnsibleModule | |
__metaclass__ = type | |
DOCUMENTATION = ''' | |
--- | |
module: cups_conf | |
short_description: Administer CUPS configuration file | |
requirements: | |
- CUPS _must not be running_ when you use this module to modify one if its | |
configuration files | |
description: | |
- Query or modify contents of a CUPS configuration file such as | |
C(/etc/cups/printers.conf). | |
- You may want to separately deploy PPD files into C(/etc/cups/ppd), if CUPS | |
can't figure out how to do driverless printing to your printer. | |
- Be careful when making changes to existing printers that stuff in | |
C(/var/cache/cups) may obscure your changes unless you clean it out. | |
- Assumes that there are no settings with the same names as sections. | |
options: | |
path: | |
description: | |
- File to operate on. | |
type: path | |
required: true | |
section: | |
description: | |
- Config section to operate on. | |
- Leave blank to operate on settings at top level of file. | |
- Specify generic (e.g., "Printer") or specific ("Printer name") section | |
name. | |
- First matching section is selected. | |
- For nested section, list section headers from outer to inner. | |
- Do not specify angle brackets around section names. | |
type: list | |
elements: str | |
option: | |
description: | |
- Config option to operate on. | |
- Leave blank to operate on entire section. | |
- Specify generic (e.g., "Option") or specific ("Option sides") option | |
name. | |
- First matching option is selected. | |
type: str | |
value: | |
description: | |
- Value to set for option. | |
- Specify if and only if O(option) is specified and O(state=present). | |
type: str | |
state: | |
description: | |
- Whether the section or option should be there or not. | |
- Missing sections will be created automatically when adding options. | |
- Matching section or option will be replaced when O(state=present), so be | |
careful about specifying a generic section or option name which might | |
cause the wrong thing to be replaced. | |
type: str | |
choices: [ absent, present ] | |
default: present | |
multi: | |
description: | |
- Specify multiple changes in a single module invocation. | |
- List of dicts, each of which should contain O(section), O(option), | |
O(value), and/or O(state) as appropriate. | |
- When O(multi) is specified, then O(section), O(state), O(option), and/or | |
O(value) may also be specified at the module level as a default for all | |
the O(multi) items. | |
type: list | |
elements: dict | |
author: Jonathan Kamens (@jikamens) | |
''' | |
EXAMPLES = ''' | |
--- | |
- name: Add my printer to CUPS | |
vars: | |
conf_settings: | |
- section: Printer MyPrinter | |
option: DeviceURI | |
value: dnssd://MyPrinterModel-501C89._ipp._tcp.local | |
- section: Printer MyPrinter | |
option: Option sides | |
value: two-sided-long-edge | |
tasks: | |
- name: Check if CUPS settings need to be updated | |
cups_conf: | |
path: /etc/cups/printers.conf | |
multi: "{{conf_settings}}" | |
check_mode: true | |
register: need_conf | |
- when: need_conf.changed | |
block: | |
- name: Stop CUPS | |
service: name=cups state=stopped | |
- name: Search for cache files | |
find: | |
paths: /var/cache/cups | |
patterns: MyPrinter.* | |
register: cache_files | |
- name: Remove cache files | |
file: | |
path: "{{item}}" | |
state: absent | |
loop: "{{cache_files.files|map(attribute='path')}}" | |
- name: Deploy PPD file | |
copy: | |
src: MyPrinter.ppd | |
dest: /etc/cups/ppd/MyPrinter.ppd | |
- name: Update printers.conf | |
cups_conf: | |
path: /etc/cups/printers.conf | |
multi: "{{conf_settings}}" | |
- name: Start CUPS | |
service: name=cups state=started | |
''' | |
RETURN = ''' | |
--- | |
old: | |
description: Old value of section or option | |
type: str | |
returned: When O(multi) is omitted | |
new: | |
description: New value of section or option | |
type: str | |
returned: When O(multi) is omitted | |
multi: | |
description List of dicts with C(old) and C(new) elements as above. | |
type: list | |
elements: dict | |
returned: When O(multi) is specified | |
''' | |
class ConfigItem: | |
def __init__(self, lines, debug=False): | |
raise NotImplementedError() | |
@staticmethod | |
def isa(lines): | |
raise NotImplementedError() | |
def __str__(self): | |
raise NotImplementedError() | |
class ConfigNoop(ConfigItem): | |
def __init__(self, lines, debug=False): | |
self.line = lines.pop(0) | |
if not self.line.endswith('\n'): | |
self.line += '\n' | |
@staticmethod | |
def isa(lines): | |
return True if re.match(r'\s*(?:#.*)?$', lines[0]) else False | |
def __str__(self): | |
return self.line | |
def __repr__(self): | |
return self.line.strip() | |
class ConfigSetting(ConfigItem): | |
def __init__(self, lines, debug=False): | |
# Be foregiving | |
if isinstance(lines, str): | |
lines = [lines] | |
self.line = lines.pop(0) | |
if not self.line.endswith('\n'): | |
self.line += '\n' | |
words = self.line.split() | |
self.keys = [words.pop(0)] | |
for word in words: | |
self.keys.append(self.keys[-1] + ' ' + word) | |
def isa(lines): | |
return True if re.match(r'\s*[^\s#<]', lines[0]) else False | |
@classmethod | |
def new(cls, option, value): | |
return cls(f'{option} {value}\n') | |
def __str__(self): | |
return self.line | |
def __repr__(self): | |
return self.keys[-1] | |
class ConfigSection(ConfigItem): | |
def __init__(self, lines, debug=False): | |
if not isinstance(lines, list): | |
lines = list(lines) | |
self.header = lines.pop(0) | |
if not self.header.endswith('\n'): | |
self.header += '\n' | |
match = re.match(r'\s*<(.*)>$', self.header) | |
words = match[1].split() | |
self.section_type = words[0] | |
self.keys = [words.pop(0)] | |
for word in words: | |
self.keys.append(self.keys[-1] + ' ' + word) | |
if debug: | |
print(f'Start section {self.keys[-1]}', file=sys.stderr) | |
self.children = [] | |
self.footer = '' | |
while lines: | |
match = re.match(r'\s*<\s*/\s*([^\s>]+)', lines[0]) | |
if match: | |
if match[1] != self.section_type: | |
raise Exception(f'{repr(self)} ended with {lines[0]}') | |
self.footer = lines.pop(0) | |
if not self.footer.endswith('\n'): | |
self.footer += '\n' | |
return | |
if ConfigNoop.isa(lines): | |
self.children.append(ConfigNoop(lines, debug=debug)) | |
elif ConfigSetting.isa(lines): | |
self.children.append(ConfigSetting(lines, debug=debug)) | |
elif ConfigSection.isa(lines): | |
self.children.append(ConfigSection(lines, debug=debug)) | |
else: | |
raise Exception(f'Could not parse {lines[0]}') | |
raise Exception(f'Unterminated section {self.header}') | |
def isa(lines): | |
return True if re.match(r'\s*<\s*[^/]', lines[0]) else False | |
@classmethod | |
def new(cls, name): | |
return cls((f'<{name}>\n', f'</{name.split()[0]}>\n')) | |
def find_all(self, key): | |
'''Returns an iterator''' | |
return (self.children[i] for i in self._find(key)) | |
def _find(self, key): | |
'''Returns an iterator''' | |
key = ' '.join(key.strip().strip('<>').split()) | |
return (i for i in range(len(self.children)) | |
if key in getattr(self.children[i], 'keys', ())) | |
def __str__(self): | |
return (self.header + | |
''.join(str(c) for c in self.children) + | |
self.footer) | |
def __repr__(self): | |
return self.keys[-1] | |
def __getitem__(self, key): | |
try: | |
i = next(self._find(key)) | |
except StopIteration: | |
raise KeyError(key) | |
return self.children[i] | |
def __setitem__(self, key, value): | |
if not isinstance(value, ConfigItem): | |
raise ValueError('value must be a config object') | |
try: | |
i = next(self._find(key)) | |
except StopIteration: | |
self += value | |
return | |
self.children[i] = value | |
def __delitem__(self, key): | |
try: | |
i = next(self._find(key)) | |
except StopIteration: | |
raise KeyError(key) | |
self.children.pop(i) | |
def __add__(self, other): | |
if not isinstance(other, ConfigItem): | |
return NotImplemented | |
new = copy.deepcopy(self) | |
new.children.append(other) | |
return new | |
def __iadd__(self, other): | |
if not isinstance(other, ConfigItem): | |
return NotImplemented | |
self.children.append(other) | |
return self | |
class ConfigFile(ConfigSection): | |
def __init__(self, lines, debug=False): | |
'''lines can be any iterable, including an open file handle''' | |
lines = list(lines) | |
lines.insert(0, '<placeholder>\n') | |
lines.append('</placeholder>\n') | |
super().__init__(lines, debug=debug) | |
self.header = '' | |
self.footer = '' | |
def run_module(): | |
module_args = dict( | |
path=dict(type='path', required=True), | |
section=dict(type='list', elements='str'), | |
option=dict(type='str'), | |
value=dict(type='str'), | |
state=dict(type='str', default='present', | |
choices=('present', 'absent')), | |
multi=dict(type='list', elements='dict', default=[]), | |
) | |
result = dict(changed=False) | |
module = AnsibleModule( | |
argument_spec=module_args, supports_check_mode=True) | |
params = {k: v for k, v in module.params.items() if v} | |
path = params['path'] | |
cf = ConfigFile(open(path)) | |
if params['multi']: | |
result['multi'] = [] | |
for multi_setting in params['multi']: | |
setting = copy.deepcopy(params) | |
setting.update(multi_setting) | |
old_new = dict() | |
result['multi'].append(old_new) | |
do_setting(cf, module, result, old_new, setting) | |
else: | |
do_setting(cf, module, result, result, params) | |
if result['changed'] and not module.check_mode: | |
realpath = os.path.realpath(path) | |
with NamedTemporaryFile('w', dir=os.path.dirname(realpath), | |
prefix=os.path.basename(realpath) + '.', | |
delete=False) as tf: | |
print(str(cf), file=tf, end='') | |
# Make owner, group, and permissions of temporary file match | |
# original. | |
metadata = os.stat(realpath) | |
mode = S_IMODE(metadata.st_mode) | |
owner = metadata.st_uid | |
group = metadata.st_gid | |
fileno = tf.file.fileno() | |
os.chmod(fileno, mode) | |
os.chown(fileno, owner, group) | |
os.rename(tf.name, realpath) | |
module.exit_json(**result) | |
def do_setting(cf, module, result, old_new, setting): | |
section_names = setting.get('section', None) | |
if section_names and isinstance(section_names, str): | |
section_names = [section_names] | |
option = setting.get('option', None) | |
value = setting.get('value', None) | |
state = setting.get('state', 'present') | |
if not (section_names or option): | |
raise Exception('Must specify section or option') | |
if value and not option: | |
raise Exception('value specified without corresponding option') | |
if option and value is None: | |
raise Exception('option specified without value') | |
if value is not None and state == 'absent': | |
raise Exception('Do not specify value with state==absent') | |
current_level = cf | |
if state == 'present': | |
for section_name in section_names: | |
try: | |
current_level = current_level[section_name] | |
except KeyError: | |
current_level[section_name] = ConfigSection.new(section_name) | |
current_level = current_level[section_name] | |
old_new['old'] = '' | |
old_new['new'] = str(current_level) | |
result['changed'] = True | |
if not option: | |
return | |
elif option: | |
# Deleting the option, not the section | |
for section_name in section_names: | |
try: | |
current_level = current_level[section_name] | |
except KeyError: | |
return | |
else: | |
# Deleting the section if it exists | |
for section_name in section_names[0:-1]: | |
try: | |
current_level = current_level[section_name] | |
except KeyError: | |
return | |
try: | |
section = current_level[section_names[-1]] | |
except KeyError: | |
return | |
old_new['old'] = str(section) | |
old_new['new'] = '' | |
del current_level[section_names[-1]] | |
result['changed'] = True | |
return | |
if state == 'absent': | |
try: | |
setting = current_level[option] | |
except KeyError: | |
return | |
old_new['old'] = str(setting) | |
old_new['new'] = '' | |
del current_level[option] | |
result['changed'] = True | |
return | |
new_setting = ConfigSetting.new(option, value) | |
try: | |
old_setting = current_level[option] | |
except KeyError: | |
current_level[option] = new_setting | |
old_new['old'] = '' | |
old_new['new'] = str(current_level[option]) | |
result['changed'] = True | |
return | |
if repr(old_setting) == repr(new_setting): | |
old_new['old'] = str(old_setting) | |
old_new['new'] = str(old_setting) | |
return | |
current_level[option] = new_setting | |
def main(): | |
run_module() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment