Last active
October 25, 2021 16:00
-
-
Save Nachtalb/991411b7c59846162099c93d43e5ed2b to your computer and use it in GitHub Desktop.
IRC ZNC module for posting danbooru pics on demand by any user with configurable commands for each IRC channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from datetime import datetime | |
from random import choice | |
from requests import Session | |
from yarl import URL | |
import inspect | |
import json | |
import os | |
import pprint | |
import sys | |
import traceback | |
import znc | |
pp = pprint.PrettyPrinter() | |
def _is_self(*args): | |
if len(args) > 1 and type(args[0]) == danbooru: | |
return args[0] | |
return None | |
def catchfail(return_code): | |
func = return_code if callable(return_code) else None | |
return_code = None if func else return_code | |
def outer_wrapper(fn): | |
def wrapper(*args, **kwargs): | |
try: | |
return fn(*args, **kwargs) | |
except Exception as e: | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
s = _is_self(*args) | |
if s: | |
s.PutModule('Failed with %s' % (e)) | |
lines = traceback.format_exception(exc_type, exc_value, | |
exc_traceback) | |
s.internal_log.error(*lines) | |
for line in lines: | |
s.PutModule(line) | |
return return_code | |
return wrapper | |
if func: | |
return outer_wrapper(func) | |
return outer_wrapper | |
class danbooru(znc.Module): | |
_base_url = URL('https://danbooru.donmai.us/') | |
_session = None | |
loaded_aliases = [] | |
description = 'Auto posts danbooru content by defined aliases' | |
has_args = True | |
args_help_text = 'API Key and Login space separated in this order' | |
hook_debugging = False | |
module_types = [znc.CModInfo.NetworkModule] | |
def OnLoad(self, args: str, message): | |
self.internal_log = InternalLog(self.GetSavePath()) | |
self.debug_hook() | |
try: | |
return self.setup(args, message) | |
except Exception as error: | |
self.internal_log.error(error) | |
raise error | |
def setup(self, args, message): | |
self._api_key = self.nv.get('api_key') | |
self._login = self.nv.get('login') | |
self._session = Session() | |
arguments = args.strip().split() | |
if len(args) > 1 and not self._api_key and not self._login: | |
self._api_key, self._login = arguments | |
self.nv['api_key'], self.nv['login'] = self._api_key, self._login | |
return znc.CONTINUE | |
def _request(self, endpoint, params={}): | |
data = { | |
'api_key': self._api_key, | |
'login': self._login | |
} | |
data.update(params) | |
for key, value in data.items(): | |
if value is None: | |
del data[key] | |
url = self._base_url / endpoint % data | |
response = self._session.get(str(url)) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
self._request_error(response) | |
def _request_error(self, response): | |
error_msg = response.reason | |
try: | |
data = response.json() | |
error_msg = data.get('message', error_msg) | |
except json.JSONDecodeError: | |
try: | |
data.raise_for_status() | |
except Exception as error: | |
error_msg = error.args.join(', ') | |
self.internal_log.error(error_msg) | |
def posts(self, tags=None, limit=None): | |
return self._request('posts.json', {'tags': tags, 'limit': limit if limit else 200}) | |
@property | |
def config_key(self): | |
return self.GetUser().GetUsername() | |
def get_client_options(self, clientname=None): | |
clientname = clientname or self.config_key | |
config = self.nv.get(clientname, '{"alias": {}}') | |
return json.loads(config) if config else None | |
def set_client_options(self, config, clientname=None): | |
clientname = clientname or self.config_key | |
self.nv[clientname] = json.dumps(config) | |
def add_alias(self, alias, tags, channels, clientname=None): | |
config = self.get_client_options(clientname) | |
updated = alias in config['alias'] | |
config['alias'].update({alias: { | |
'tags': tags, | |
'channels': channels, | |
}}) | |
self.set_client_options(config, clientname) | |
return updated | |
def get_alias(self, alias, clientname=None): | |
config = self.get_client_options(clientname) | |
return config['alias'].get(alias) | |
def remove_alias(self, alias, clientname=None): | |
config = self.get_client_options(clientname) | |
if alias in config['alias']: | |
del config['alias'][alias] | |
self.set_client_options(config, clientname) | |
return True | |
def add_alias_command(self, args): | |
self.debug_hook() | |
name = args[0] | |
channels = [item for item in args[1:] if item.startswith('#')] | |
tags = [item for item in args[1:] if not item.startswith('#')] | |
if (not tags and channels) or (not channels and tags): | |
current_config = self.get_alias(name) | |
if not current_config: | |
self.inform_user('You have to at least specify one tag and one channel', True) | |
return | |
if not tags: | |
tags = current_config['tags'].split() | |
channels += current_config['channels'] | |
if not channels: | |
tags += current_config['tags'].split() | |
channels = current_config['channels'] | |
channels = sorted(set(channels)) | |
tags = ' '.join(sorted(set(tags))) | |
updated = self.add_alias(name, tags, list(channels)) | |
if not channels: | |
self.inform_user('No channels defined. This alias is not available anywhere!', notice=True) | |
if updated: | |
self.inform_user(f'Updated {name}: {tags}, {" ".join(channels)}') | |
else: | |
self.inform_user(f'Added {name}: {tags}, {" ".join(channels)}') | |
def add_channel_to_all_command(self, args): | |
channels = [c for c in args if c.startswith('#')] | |
if not channels: | |
self.inform_user('You have to specify at least one channel', True) | |
return | |
client_config = self.get_client_options() | |
if not client_config['alias']: | |
self.inform_user('No aliases configured', True) | |
return | |
for alias, config in client_config['alias'].items(): | |
client_config['alias'][alias]['channels'] = sorted(set(client_config['alias'][alias]['channels'] + channels)) | |
self.set_client_options(client_config) | |
self.inform_user('All aliases have been update') | |
self.list_config_command() | |
def remove_channel_from_all(self, args): | |
channels = set([c for c in args if c.startswith('#')]) | |
if not channels: | |
self.inform_user('You have to specify at least one channel', True) | |
return | |
client_config = self.get_client_options() | |
if not client_config['alias']: | |
self.inform_user('No aliases configured', True) | |
return | |
for alias, config in client_config['alias'].items(): | |
current_channels = set(client_config['alias'][alias]['channels']) | |
client_config['alias'][alias]['channels'] = sorted(current_channels - channels) | |
self.set_client_options(client_config) | |
self.inform_user('All aliases have been update') | |
self.list_config_command() | |
def remove_alias_command(self, args): | |
for name in args: | |
removed = self.remove_alias(name) | |
if removed: | |
self.inform_user(f'Removed {name}') | |
else: | |
self.inform_user(f'{name} does not exist') | |
def list_config_command(self): | |
config = self.get_client_options() | |
if not config['alias']: | |
self.inform_user('There are no aliases configured') | |
table_data = [(alias, v['tags'], ', '.join(v['channels'])) for alias, v in config['alias'].items()] | |
table = self.create_table(['alias', 'tags', 'channels'], table_data) | |
self.inform_user(table) | |
def create_table(self, header, rows): | |
table = znc.CTable() | |
for title in header: | |
table.AddColumn(title) | |
for row in rows: | |
table.AddRow() | |
for title, value in zip(header, row): | |
table.SetCell(title, value) | |
return table | |
def show_usage_command(self): | |
self.inform_user(self.create_table( | |
['command', 'arguments', 'function'], | |
[ | |
['help', '', 'Show list of commands'], | |
['add/update', 'NAME TAGS... CHANNELS...', 'Add / Update an alias. "{args}" will be replaced with the users input. The triggers will only work in the defined channels.'], | |
['add-channel', 'CHANNELS...', 'Add channel to all aliases'], | |
['remove-channel', 'CHANNELS...', 'Remove channel from all aliases'], | |
['set-help', 'CHANNELS...', '!!help command for any users in those channels'], | |
['list-help', '', 'Show where !!help is enabled'], | |
['remove', 'NAME...', 'Remove the given aliases'], | |
['list', '', 'List all aliases'], | |
] | |
)) | |
def send_both_ways(self, message, target): | |
self.debug_hook() | |
self.send_to_irc(message, target) | |
self.send_to_client(message, target) | |
def pass_through(self, message): | |
self.debug_hook() | |
if self.GetClient(): | |
self.send_to_irc(message.GetText(), message.GetTarget()) | |
self.send_to_client(message.GetText(), message.GetTarget(), current=False) | |
else: | |
self.send_to_client(message.GetText(), message.GetTarget(), from_mask=message.GetNick().GetHostMask()) | |
def send_to_irc(self, message, target): | |
self.PutIRC(f'PRIVMSG {target} :{message}') | |
def send_to_client(self, message, target, current=True, from_mask=None): | |
self.debug_hook() | |
current_client = self.GetClient() | |
current_network = self.GetNetwork() | |
for client in self.GetUser().GetAllClients(): | |
if (not current and current_client == client) or current_network != client.GetNetwork(): | |
continue | |
if not from_mask: | |
from_mask = client.GetNickMask() | |
client.PutClient(f':{from_mask} PRIVMSG {target} :{message}') | |
def inform_user(self, message, notice=False): | |
if notice: | |
self.PutModNotice(message) | |
else: | |
self.PutModule(message) | |
def send_random(self, tags, target): | |
posts = self.posts(tags) | |
post = None | |
while not post: | |
post = choice(posts or [None]) | |
if not post: | |
self.send_both_ways(f'Didn\'t find anything for the tags: {tags}', target) | |
return | |
if 'loli' in tags or 'shota' in tags: | |
url = post.get('file_url') | |
else: | |
url = self._base_url / 'posts' / str(post['id']) % {'q': tags} | |
if not url: | |
post = None | |
continue | |
self.send_both_ways(f'Post: {post["id"]}: {url}', target) | |
return True | |
@contextmanager | |
def check_and_send(self, message): | |
target = message.GetTarget() | |
msg_args = message.GetText().split() | |
alias = self.get_alias(msg_args[0].lower()) | |
if not alias: | |
yield | |
return | |
if target.startswith('#') and target.lower() not in alias['channels']: | |
yield | |
return | |
tags = alias['tags'] | |
if '{args}' in tags: | |
tags = tags.replace('{args}', ' '.join(msg_args[1:])) | |
try: | |
yield True | |
finally: | |
self.send_random(tags, target) | |
@contextmanager | |
def user_help(self, message): | |
args = list(filter(None, message.GetText().split())) | |
target = message.GetTarget() | |
if len(args) == 0 or not args[0].startswith('!!help'): | |
yield | |
return | |
config = self.get_client_options() | |
allowed_channels = config.get('help_in_channels') | |
if not allowed_channels or target.lower() not in allowed_channels: | |
yield | |
return | |
text = '' | |
if len(args) > 1: | |
alias = config['alias'].get(args[1]) | |
if not alias: | |
text = f'Could not find alisa {args[1]}, use "!!help" to list all available aliases' | |
else: | |
text = f'The alias "{args[1]}" is available in the channels: {", ".join(alias["channels"])} and uses these tags: "{alias["tags"]}"' | |
else: | |
if not config['alias']: | |
text = 'No aliases are configured atm' | |
else: | |
text = 'Available aliases: %s' % ', '.join(config['alias']) | |
try: | |
yield True | |
finally: | |
self.send_both_ways(text, target) | |
def message_hook(self, message): | |
hooks = [self.user_help, self.check_and_send] | |
for hook in hooks: | |
with hook(message) as action: | |
if action: | |
self.pass_through(message) | |
return True | |
def update_help_in_channels_command(self, args): | |
channels = sorted(set([c.lower() for c in args if c.startswith('#')])) | |
config = self.get_client_options() | |
config['help_in_channels'] = channels | |
self.set_client_options(config) | |
if not channels: | |
self.inform_user('!!help: Disabled') | |
else: | |
self.inform_user(f'!!help: Enabled for: {", ".join(channels)}') | |
def show_help_channels(self): | |
config = self.get_client_options() | |
channels = config.get('help_in_channels') | |
if not channels: | |
self.inform_user('!!help: Disabled') | |
else: | |
self.inform_user(f'!!help: Enabled for: {", ".join(channels)}') | |
@catchfail | |
def OnModCommand(self, command): | |
self.debug_hook() | |
args = command.lower().split() | |
if args[0] in ['add', 'update'] and len(args) > 1: | |
self.add_alias_command(args[1:]) | |
elif args[0] == 'add-channel' and len(args) > 1: | |
self.add_channel_to_all_command(args[1:]) | |
elif args[0] == 'remove-channel' and len(args) > 1: | |
self.remove_channel_from_all(args[1:]) | |
elif args[0] == 'set-help': | |
self.update_help_in_channels_command(args[1:]) | |
elif args[0] == 'list-help': | |
self.show_help_channels() | |
elif args[0] == 'remove' and len(args) > 1: | |
self.remove_alias_command(args[1:]) | |
elif args[0] == 'list': | |
self.list_config_command() | |
else: | |
self.show_usage_command() | |
return znc.CONTINUE | |
@catchfail(znc.CONTINUE) | |
def OnUserTextMessage(self, message): | |
""" | |
This module hook is called when a user sends a PRIVMSG message. | |
""" | |
self.debug_hook() | |
return znc.HALT if self.message_hook(message) else znc.CONTINUE | |
@catchfail(znc.CONTINUE) | |
def OnChanTextMessage(self, message): | |
""" | |
Called when we receive a channel PRIVMSG message from IRC. | |
""" | |
self.debug_hook() | |
return znc.HALT if self.message_hook(message) else znc.CONTINUE | |
# DEBUGGING HOOKS | |
# =============== | |
def debug_hook(self): | |
""" | |
Dumps parent calling method name and its arguments to debug logfile. | |
""" | |
if self.hook_debugging is not True: | |
return | |
frameinfo = inspect.stack()[1] | |
argvals = frameinfo.frame.f_locals | |
messages = [] | |
messages.append('Called method: ' + frameinfo.function + '()') | |
for argname in argvals: | |
if argname == 'self': | |
continue | |
messages.append(' ' + argname + ' -> ' + pprint.pformat(argvals[argname])) | |
messages.append('') | |
self.internal_log.debug(*messages) | |
class InternalLog: | |
def __init__(self, save_path: str): | |
self.save_path = save_path | |
def _write_to(self, file, *text): | |
with self.open(file) as file: | |
text = map(lambda t: str(t).rstrip('\n') + '\n', text) | |
file.writelines(text) | |
def debug(self, *text): | |
self._write_to('debug', *text) | |
def error(self, *text): | |
self._write_to('error', *text) | |
def open(self, level: str): | |
target = open(os.path.join(self.save_path, level + '.log'), 'a') | |
line = 'Log opened at: {} UTC\n'.format(datetime.utcnow()) | |
target.write(line) | |
target.write('=' * len(line) + '\n\n') | |
return target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment