Created
April 23, 2021 09:06
-
-
Save AnotherTwinkle/9506e83a9eafad64f9e3e742acbf0e17 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''This extension implents a rtfm support with discord.py''' | |
import discord | |
from discord.ext import commands | |
import inspect | |
import re | |
import builtins | |
import zlib | |
import io | |
import os | |
import lxml.etree as etree | |
from collections import Counter | |
from operator import attrgetter | |
from .botutils import utils, fuzzy, checks | |
class SphinxObjectFileReader: | |
# Inspired by Sphinx's InventoryFileReader | |
BUFSIZE = 16 * 1024 | |
def __init__(self, buffer): | |
self.stream = io.BytesIO(buffer) | |
def readline(self): | |
return self.stream.readline().decode('utf-8') | |
def skipline(self): | |
self.stream.readline() | |
def read_compressed_chunks(self): | |
decompressor = zlib.decompressobj() | |
while True: | |
chunk = self.stream.read(self.BUFSIZE) | |
if len(chunk) == 0: | |
break | |
yield decompressor.decompress(chunk) | |
yield decompressor.flush() | |
def read_compressed_lines(self): | |
buf = b'' | |
for chunk in self.read_compressed_chunks(): | |
buf += chunk | |
pos = buf.find(b'\n') | |
while pos != -1: | |
yield buf[:pos].decode('utf-8') | |
buf = buf[pos + 1:] | |
pos = buf.find(b'\n') | |
class RTFM(commands.Cog): | |
def __init__(self,bot): | |
self.bot = bot | |
self.rtfmtools = VisualRTFMtools() | |
def parse_object_inv(self, stream, url): | |
# key: URL | |
# n.b.: key doesn't have `discord` or `discord.ext.commands` namespaces | |
result = {} | |
# first line is version info | |
inv_version = stream.readline().rstrip() | |
if inv_version != '# Sphinx inventory version 2': | |
raise RuntimeError('Invalid objects.inv file version.') | |
# next line is "# Project: <name>" | |
# then after that is "# Version: <version>" | |
projname = stream.readline().rstrip()[11:] | |
version = stream.readline().rstrip()[11:] | |
# next line says if it's a zlib header | |
line = stream.readline() | |
if 'zlib' not in line: | |
raise RuntimeError('Invalid objects.inv file, not z-lib compatible.') | |
# This code mostly comes from the Sphinx repository. | |
entry_regex = re.compile(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)') | |
for line in stream.read_compressed_lines(): | |
match = entry_regex.match(line.rstrip()) | |
if not match: | |
continue | |
name, directive, prio, location, dispname = match.groups() | |
domain, _, subdirective = directive.partition(':') | |
if directive == 'py:module' and name in result: | |
# From the Sphinx Repository: | |
# due to a bug in 1.1 and below, | |
# two inventory entries are created | |
# for Python modules, and the first | |
# one is correct | |
continue | |
# Most documentation pages have a label | |
if directive == 'std:doc': | |
subdirective = 'label' | |
if location.endswith('$'): | |
location = location[:-1] + name | |
key = name if dispname == '-' else dispname | |
prefix = f'{subdirective}:' if domain == 'std' else '' | |
if projname == 'discord.py': | |
key = key.replace('discord.ext.commands.', '').replace('discord.', '') | |
result[f'{prefix}{key}'] = os.path.join(url, location) | |
return result | |
async def build_rtfm_lookup_table(self, page_types): | |
cache = {} | |
for key, page in page_types.items(): | |
sub = cache[key] = {} | |
async with self.bot.session.get(page + '/objects.inv') as resp: | |
if resp.status != 200: | |
raise RuntimeError('Cannot build rtfm lookup table, try again later.') | |
stream = SphinxObjectFileReader(await resp.read()) | |
cache[key] = self.parse_object_inv(stream, page) | |
self._rtfm_cache = cache | |
def transform_rtfm_language_key(self, ctx, prefix): | |
if ctx.guild is not None: | |
# 日本語 category | |
if ctx.channel.category_id == 490287576670928914: | |
return prefix + '-jp' | |
# d.py unofficial JP Discord Bot Portal JP | |
elif ctx.guild.id in (463986890190749698, 494911447420108820): | |
return prefix + '-jp' | |
return prefix | |
async def do_rtfm(self, ctx, key, obj): | |
page_types = { | |
'latest': 'https://discordpy.readthedocs.io/en/latest', | |
'latest-jp': 'https://discordpy.readthedocs.io/ja/latest', | |
'python': 'https://docs.python.org/3', | |
'python-jp': 'https://docs.python.org/ja/3', | |
} | |
if obj is None: | |
await ctx.send(page_types[key]) | |
return | |
if not hasattr(self, '_rtfm_cache'): | |
await ctx.trigger_typing() | |
await self.build_rtfm_lookup_table(page_types) | |
obj = re.sub(r'^(?:discord\.(?:ext\.)?)?(?:commands\.)?(.+)', r'\1', obj) | |
if key.startswith('latest'): | |
# point the abc.Messageable types properly: | |
q = obj.lower() | |
for name in dir(discord.abc.Messageable): | |
if name[0] == '_': | |
continue | |
if q == name: | |
obj = f'abc.Messageable.{name}' | |
break | |
cache = list(self._rtfm_cache[key].items()) | |
def transform(tup): | |
return tup[0] | |
matches = fuzzy.finder(obj, cache, key=lambda t: t[0], lazy=False)[:8] | |
e = discord.Embed(colour=discord.Colour.blurple()) | |
if len(matches) == 0: | |
return await ctx.send('Could not find anything. Sorry.') | |
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches) | |
return matches | |
def get_object(self,url): | |
string = url.split('discord.')[1] | |
return self.rtfmtools.objectify(module= discord,string=string) | |
def getdocstring(self,url : str): | |
obj = self.get_object(url) | |
if obj is None: | |
return None | |
return self.rtfmtools.get_docstring(obj) | |
def allattrs(self,obj): | |
allattr_dict= {} | |
allattr_dict['attributes'] = self.rtfmtools.allattrs(obj) | |
allattr_dict['methods'] = self.rtfmtools.allmethods(obj) | |
return allattr_dict | |
def parse_docstring(self,string): | |
if string is None: | |
return None | |
string = string.split('Attributes')[0].split('.. container')[0] | |
for char in ['`',':class:',':issue:',':exec:',':exc:',':ref:',':meth:',':attr:','*','~','\\']: | |
string = string.replace(char,'') | |
if len(string) > 2000: | |
return string[:2000] + '...' | |
return string | |
def create_embed(self,docstring,attrs,matches): | |
key = matches[0][0] | |
url=matches[0][1] | |
#The first match is what we need | |
e = discord.Embed(color=discord.Color.blurple()) | |
e.set_footer(text= "Note: This rtfm lookup is very unstable and inconsistent. Please follow the links for proper references.") | |
e.set_author(name='Discord.py',icon_url='https://cdn.discordapp.com/icons/336642139381301249/3aa641b21acded468308a37eef43d7b3.webp?size=1024') | |
if docstring: | |
e.title = f'`{key}`' | |
e.url = url | |
e.description = f'```yaml\n{docstring}```' | |
attributes = attrs['attributes'] if 'attributes' in attrs.keys() else None | |
methods = attrs['methods'] if 'methods' in attrs.keys() else None | |
if attributes: | |
attr_string = ' '.join(f'`{attr}`' for attr in attributes) | |
e.add_field(name='Attributes',value=attr_string , inline=False) | |
if methods: | |
method_string = ' '.join(f'`{method}`' for method in methods) | |
e.add_field(name='Methods',value=method_string , inline=False) | |
e.add_field(name='Matches',value='\n'.join(f'[`{key}`]({url})' for key, url in matches),inline=False) | |
return e | |
async def do_visual_rtfm(self,ctx,matches): | |
url = matches[0][1] | |
obj = self.get_object(url) | |
doc = self.getdocstring(url) | |
doc = self.parse_docstring(doc) | |
attr_dict = self.allattrs(obj) | |
embed = self.create_embed(docstring=doc,attrs=attr_dict,matches=matches) | |
await ctx.send(embed=embed) | |
@checks.not_blocked() | |
@commands.group(aliases=['rtfd'],invoke_without_command=True) | |
async def rtfm(self, ctx, *, obj: str = None): | |
"""Gives you a quick documentation for a discord.py entity. | |
Events, objects, and functions are all supported through a | |
a cruddy fuzzy algorithm. | |
""" | |
key = self.transform_rtfm_language_key(ctx, 'latest') | |
matches = await self.do_rtfm(ctx,key,obj) | |
if not isinstance(matches,list): | |
return | |
try: | |
await self.do_visual_rtfm(ctx,matches) | |
except (discord.HTTPException, IndexError): | |
await self.rtfm_legacy(ctx=ctx,obj=obj) | |
@checks.not_blocked() | |
@rtfm.command(name= 'legacy') | |
async def rtfm_legacy(self,ctx,*,obj: str = None): | |
"""Same as rtfm, but provides links only""" | |
key = self.transform_rtfm_language_key(ctx, 'latest') | |
matches = await self.do_rtfm(ctx, key, obj) | |
try: | |
e = discord.Embed(colour=discord.Colour.blurple()) | |
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches) | |
await ctx.send(embed=e) | |
except TypeError: | |
pass | |
@rtfm.command(name='python', aliases=['py']) | |
async def rtfm_python(self, ctx, *, obj: str = None): | |
"""Gives you a documentation link for a Python entity.""" | |
key = self.transform_rtfm_language_key(ctx, 'python') | |
matches = await self.do_rtfm(ctx, key, obj) | |
if not matches: | |
return | |
e = discord.Embed(colour=discord.Colour.blurple()) | |
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches) | |
await ctx.send(embed=e) | |
class VisualRTFMtools: | |
'''A bunch of functions allowing us to get the stuff we need.''' | |
def objectify(self,module,string): | |
try: | |
return attrgetter(string)(module) | |
except AttributeError: | |
return None | |
def get_docstring(self,obj): | |
return inspect.getdoc(obj) | |
def allmethods(self,obj,useful = True): | |
'''Returns all methods of a class, when useful is True, it will exclude methods starting with "_"''' | |
if not useful: | |
return [method for method in dir(obj) if callable(getattr(obj, method))] | |
else : | |
return [method for method in dir(obj) if callable(getattr(obj, method)) and not method.startswith('_')] | |
def allattrs(self,obj,useful = True): | |
'''Same as allmethods, but lists attributes instead''' | |
if not useful: | |
return [attr for attr in dir(obj) if not callable(getattr(obj, attr))] | |
else : | |
return [attr for attr in dir(obj) if not callable(getattr(obj, attr)) and not attr.startswith('_')] | |
def getall(self,obj,useful = True): | |
return sorted(self.allattrs(obj,useful) + self.allmethods(obj,useful), key=str.lower) | |
def setup(bot): | |
bot.add_cog(RTFM(bot)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment