Skip to content

Instantly share code, notes, and snippets.

@jikamens
Created December 29, 2024 02:04
Show Gist options
  • Save jikamens/1951c4193fa00715d3fac48c1bc36550 to your computer and use it in GitHub Desktop.
Save jikamens/1951c4193fa00715d3fac48c1bc36550 to your computer and use it in GitHub Desktop.
Ansible module for editing CUPS config files such as printers.conf
#!/usr/bin/python3
# Copyright (c) 2024, Jonathan Kamens <[email protected]>
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
import copy
import os
import re
from stat import S_IMODE
import sys
from tempfile import NamedTemporaryFile
from ansible.module_utils.basic import AnsibleModule
__metaclass__ = type
DOCUMENTATION = '''
---
module: cups_conf
short_description: Administer CUPS configuration file
requirements:
- CUPS _must not be running_ when you use this module to modify one if its
configuration files
description:
- Query or modify contents of a CUPS configuration file such as
C(/etc/cups/printers.conf).
- You may want to separately deploy PPD files into C(/etc/cups/ppd), if CUPS
can't figure out how to do driverless printing to your printer.
- Be careful when making changes to existing printers that stuff in
C(/var/cache/cups) may obscure your changes unless you clean it out.
- Assumes that there are no settings with the same names as sections.
options:
path:
description:
- File to operate on.
type: path
required: true
section:
description:
- Config section to operate on.
- Leave blank to operate on settings at top level of file.
- Specify generic (e.g., "Printer") or specific ("Printer name") section
name.
- First matching section is selected.
- For nested section, list section headers from outer to inner.
- Do not specify angle brackets around section names.
type: list
elements: str
option:
description:
- Config option to operate on.
- Leave blank to operate on entire section.
- Specify generic (e.g., "Option") or specific ("Option sides") option
name.
- First matching option is selected.
type: str
value:
description:
- Value to set for option.
- Specify if and only if O(option) is specified and O(state=present).
type: str
state:
description:
- Whether the section or option should be there or not.
- Missing sections will be created automatically when adding options.
- Matching section or option will be replaced when O(state=present), so be
careful about specifying a generic section or option name which might
cause the wrong thing to be replaced.
type: str
choices: [ absent, present ]
default: present
multi:
description:
- Specify multiple changes in a single module invocation.
- List of dicts, each of which should contain O(section), O(option),
O(value), and/or O(state) as appropriate.
- When O(multi) is specified, then O(section), O(state), O(option), and/or
O(value) may also be specified at the module level as a default for all
the O(multi) items.
type: list
elements: dict
author: Jonathan Kamens (@jikamens)
'''
EXAMPLES = '''
---
- name: Add my printer to CUPS
vars:
conf_settings:
- section: Printer MyPrinter
option: DeviceURI
value: dnssd://MyPrinterModel-501C89._ipp._tcp.local
- section: Printer MyPrinter
option: Option sides
value: two-sided-long-edge
tasks:
- name: Check if CUPS settings need to be updated
cups_conf:
path: /etc/cups/printers.conf
multi: "{{conf_settings}}"
check_mode: true
register: need_conf
- when: need_conf.changed
block:
- name: Stop CUPS
service: name=cups state=stopped
- name: Search for cache files
find:
paths: /var/cache/cups
patterns: MyPrinter.*
register: cache_files
- name: Remove cache files
file:
path: "{{item}}"
state: absent
loop: "{{cache_files.files|map(attribute='path')}}"
- name: Deploy PPD file
copy:
src: MyPrinter.ppd
dest: /etc/cups/ppd/MyPrinter.ppd
- name: Update printers.conf
cups_conf:
path: /etc/cups/printers.conf
multi: "{{conf_settings}}"
- name: Start CUPS
service: name=cups state=started
'''
RETURN = '''
---
old:
description: Old value of section or option
type: str
returned: When O(multi) is omitted
new:
description: New value of section or option
type: str
returned: When O(multi) is omitted
multi:
description List of dicts with C(old) and C(new) elements as above.
type: list
elements: dict
returned: When O(multi) is specified
'''
class ConfigItem:
def __init__(self, lines, debug=False):
raise NotImplementedError()
@staticmethod
def isa(lines):
raise NotImplementedError()
def __str__(self):
raise NotImplementedError()
class ConfigNoop(ConfigItem):
def __init__(self, lines, debug=False):
self.line = lines.pop(0)
if not self.line.endswith('\n'):
self.line += '\n'
@staticmethod
def isa(lines):
return True if re.match(r'\s*(?:#.*)?$', lines[0]) else False
def __str__(self):
return self.line
def __repr__(self):
return self.line.strip()
class ConfigSetting(ConfigItem):
def __init__(self, lines, debug=False):
# Be foregiving
if isinstance(lines, str):
lines = [lines]
self.line = lines.pop(0)
if not self.line.endswith('\n'):
self.line += '\n'
words = self.line.split()
self.keys = [words.pop(0)]
for word in words:
self.keys.append(self.keys[-1] + ' ' + word)
def isa(lines):
return True if re.match(r'\s*[^\s#<]', lines[0]) else False
@classmethod
def new(cls, option, value):
return cls(f'{option} {value}\n')
def __str__(self):
return self.line
def __repr__(self):
return self.keys[-1]
class ConfigSection(ConfigItem):
def __init__(self, lines, debug=False):
if not isinstance(lines, list):
lines = list(lines)
self.header = lines.pop(0)
if not self.header.endswith('\n'):
self.header += '\n'
match = re.match(r'\s*<(.*)>$', self.header)
words = match[1].split()
self.section_type = words[0]
self.keys = [words.pop(0)]
for word in words:
self.keys.append(self.keys[-1] + ' ' + word)
if debug:
print(f'Start section {self.keys[-1]}', file=sys.stderr)
self.children = []
self.footer = ''
while lines:
match = re.match(r'\s*<\s*/\s*([^\s>]+)', lines[0])
if match:
if match[1] != self.section_type:
raise Exception(f'{repr(self)} ended with {lines[0]}')
self.footer = lines.pop(0)
if not self.footer.endswith('\n'):
self.footer += '\n'
return
if ConfigNoop.isa(lines):
self.children.append(ConfigNoop(lines, debug=debug))
elif ConfigSetting.isa(lines):
self.children.append(ConfigSetting(lines, debug=debug))
elif ConfigSection.isa(lines):
self.children.append(ConfigSection(lines, debug=debug))
else:
raise Exception(f'Could not parse {lines[0]}')
raise Exception(f'Unterminated section {self.header}')
def isa(lines):
return True if re.match(r'\s*<\s*[^/]', lines[0]) else False
@classmethod
def new(cls, name):
return cls((f'<{name}>\n', f'</{name.split()[0]}>\n'))
def find_all(self, key):
'''Returns an iterator'''
return (self.children[i] for i in self._find(key))
def _find(self, key):
'''Returns an iterator'''
key = ' '.join(key.strip().strip('<>').split())
return (i for i in range(len(self.children))
if key in getattr(self.children[i], 'keys', ()))
def __str__(self):
return (self.header +
''.join(str(c) for c in self.children) +
self.footer)
def __repr__(self):
return self.keys[-1]
def __getitem__(self, key):
try:
i = next(self._find(key))
except StopIteration:
raise KeyError(key)
return self.children[i]
def __setitem__(self, key, value):
if not isinstance(value, ConfigItem):
raise ValueError('value must be a config object')
try:
i = next(self._find(key))
except StopIteration:
self += value
return
self.children[i] = value
def __delitem__(self, key):
try:
i = next(self._find(key))
except StopIteration:
raise KeyError(key)
self.children.pop(i)
def __add__(self, other):
if not isinstance(other, ConfigItem):
return NotImplemented
new = copy.deepcopy(self)
new.children.append(other)
return new
def __iadd__(self, other):
if not isinstance(other, ConfigItem):
return NotImplemented
self.children.append(other)
return self
class ConfigFile(ConfigSection):
def __init__(self, lines, debug=False):
'''lines can be any iterable, including an open file handle'''
lines = list(lines)
lines.insert(0, '<placeholder>\n')
lines.append('</placeholder>\n')
super().__init__(lines, debug=debug)
self.header = ''
self.footer = ''
def run_module():
module_args = dict(
path=dict(type='path', required=True),
section=dict(type='list', elements='str'),
option=dict(type='str'),
value=dict(type='str'),
state=dict(type='str', default='present',
choices=('present', 'absent')),
multi=dict(type='list', elements='dict', default=[]),
)
result = dict(changed=False)
module = AnsibleModule(
argument_spec=module_args, supports_check_mode=True)
params = {k: v for k, v in module.params.items() if v}
path = params['path']
cf = ConfigFile(open(path))
if params['multi']:
result['multi'] = []
for multi_setting in params['multi']:
setting = copy.deepcopy(params)
setting.update(multi_setting)
old_new = dict()
result['multi'].append(old_new)
do_setting(cf, module, result, old_new, setting)
else:
do_setting(cf, module, result, result, params)
if result['changed'] and not module.check_mode:
realpath = os.path.realpath(path)
with NamedTemporaryFile('w', dir=os.path.dirname(realpath),
prefix=os.path.basename(realpath) + '.',
delete=False) as tf:
print(str(cf), file=tf, end='')
# Make owner, group, and permissions of temporary file match
# original.
metadata = os.stat(realpath)
mode = S_IMODE(metadata.st_mode)
owner = metadata.st_uid
group = metadata.st_gid
fileno = tf.file.fileno()
os.chmod(fileno, mode)
os.chown(fileno, owner, group)
os.rename(tf.name, realpath)
module.exit_json(**result)
def do_setting(cf, module, result, old_new, setting):
section_names = setting.get('section', None)
if section_names and isinstance(section_names, str):
section_names = [section_names]
option = setting.get('option', None)
value = setting.get('value', None)
state = setting.get('state', 'present')
if not (section_names or option):
raise Exception('Must specify section or option')
if value and not option:
raise Exception('value specified without corresponding option')
if option and value is None:
raise Exception('option specified without value')
if value is not None and state == 'absent':
raise Exception('Do not specify value with state==absent')
current_level = cf
if state == 'present':
for section_name in section_names:
try:
current_level = current_level[section_name]
except KeyError:
current_level[section_name] = ConfigSection.new(section_name)
current_level = current_level[section_name]
old_new['old'] = ''
old_new['new'] = str(current_level)
result['changed'] = True
if not option:
return
elif option:
# Deleting the option, not the section
for section_name in section_names:
try:
current_level = current_level[section_name]
except KeyError:
return
else:
# Deleting the section if it exists
for section_name in section_names[0:-1]:
try:
current_level = current_level[section_name]
except KeyError:
return
try:
section = current_level[section_names[-1]]
except KeyError:
return
old_new['old'] = str(section)
old_new['new'] = ''
del current_level[section_names[-1]]
result['changed'] = True
return
if state == 'absent':
try:
setting = current_level[option]
except KeyError:
return
old_new['old'] = str(setting)
old_new['new'] = ''
del current_level[option]
result['changed'] = True
return
new_setting = ConfigSetting.new(option, value)
try:
old_setting = current_level[option]
except KeyError:
current_level[option] = new_setting
old_new['old'] = ''
old_new['new'] = str(current_level[option])
result['changed'] = True
return
if repr(old_setting) == repr(new_setting):
old_new['old'] = str(old_setting)
old_new['new'] = str(old_setting)
return
current_level[option] = new_setting
def main():
run_module()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment