Skip to content

Instantly share code, notes, and snippets.

@AnotherTwinkle
Created April 23, 2021 09:06
Show Gist options
  • Save AnotherTwinkle/9506e83a9eafad64f9e3e742acbf0e17 to your computer and use it in GitHub Desktop.
Save AnotherTwinkle/9506e83a9eafad64f9e3e742acbf0e17 to your computer and use it in GitHub Desktop.
'''This extension implents a rtfm support with discord.py'''
import discord
from discord.ext import commands
import inspect
import re
import builtins
import zlib
import io
import os
import lxml.etree as etree
from collections import Counter
from operator import attrgetter
from .botutils import utils, fuzzy, checks
class SphinxObjectFileReader:
# Inspired by Sphinx's InventoryFileReader
BUFSIZE = 16 * 1024
def __init__(self, buffer):
self.stream = io.BytesIO(buffer)
def readline(self):
return self.stream.readline().decode('utf-8')
def skipline(self):
self.stream.readline()
def read_compressed_chunks(self):
decompressor = zlib.decompressobj()
while True:
chunk = self.stream.read(self.BUFSIZE)
if len(chunk) == 0:
break
yield decompressor.decompress(chunk)
yield decompressor.flush()
def read_compressed_lines(self):
buf = b''
for chunk in self.read_compressed_chunks():
buf += chunk
pos = buf.find(b'\n')
while pos != -1:
yield buf[:pos].decode('utf-8')
buf = buf[pos + 1:]
pos = buf.find(b'\n')
class RTFM(commands.Cog):
def __init__(self,bot):
self.bot = bot
self.rtfmtools = VisualRTFMtools()
def parse_object_inv(self, stream, url):
# key: URL
# n.b.: key doesn't have `discord` or `discord.ext.commands` namespaces
result = {}
# first line is version info
inv_version = stream.readline().rstrip()
if inv_version != '# Sphinx inventory version 2':
raise RuntimeError('Invalid objects.inv file version.')
# next line is "# Project: <name>"
# then after that is "# Version: <version>"
projname = stream.readline().rstrip()[11:]
version = stream.readline().rstrip()[11:]
# next line says if it's a zlib header
line = stream.readline()
if 'zlib' not in line:
raise RuntimeError('Invalid objects.inv file, not z-lib compatible.')
# This code mostly comes from the Sphinx repository.
entry_regex = re.compile(r'(?x)(.+?)\s+(\S*:\S*)\s+(-?\d+)\s+(\S+)\s+(.*)')
for line in stream.read_compressed_lines():
match = entry_regex.match(line.rstrip())
if not match:
continue
name, directive, prio, location, dispname = match.groups()
domain, _, subdirective = directive.partition(':')
if directive == 'py:module' and name in result:
# From the Sphinx Repository:
# due to a bug in 1.1 and below,
# two inventory entries are created
# for Python modules, and the first
# one is correct
continue
# Most documentation pages have a label
if directive == 'std:doc':
subdirective = 'label'
if location.endswith('$'):
location = location[:-1] + name
key = name if dispname == '-' else dispname
prefix = f'{subdirective}:' if domain == 'std' else ''
if projname == 'discord.py':
key = key.replace('discord.ext.commands.', '').replace('discord.', '')
result[f'{prefix}{key}'] = os.path.join(url, location)
return result
async def build_rtfm_lookup_table(self, page_types):
cache = {}
for key, page in page_types.items():
sub = cache[key] = {}
async with self.bot.session.get(page + '/objects.inv') as resp:
if resp.status != 200:
raise RuntimeError('Cannot build rtfm lookup table, try again later.')
stream = SphinxObjectFileReader(await resp.read())
cache[key] = self.parse_object_inv(stream, page)
self._rtfm_cache = cache
def transform_rtfm_language_key(self, ctx, prefix):
if ctx.guild is not None:
# 日本語 category
if ctx.channel.category_id == 490287576670928914:
return prefix + '-jp'
# d.py unofficial JP Discord Bot Portal JP
elif ctx.guild.id in (463986890190749698, 494911447420108820):
return prefix + '-jp'
return prefix
async def do_rtfm(self, ctx, key, obj):
page_types = {
'latest': 'https://discordpy.readthedocs.io/en/latest',
'latest-jp': 'https://discordpy.readthedocs.io/ja/latest',
'python': 'https://docs.python.org/3',
'python-jp': 'https://docs.python.org/ja/3',
}
if obj is None:
await ctx.send(page_types[key])
return
if not hasattr(self, '_rtfm_cache'):
await ctx.trigger_typing()
await self.build_rtfm_lookup_table(page_types)
obj = re.sub(r'^(?:discord\.(?:ext\.)?)?(?:commands\.)?(.+)', r'\1', obj)
if key.startswith('latest'):
# point the abc.Messageable types properly:
q = obj.lower()
for name in dir(discord.abc.Messageable):
if name[0] == '_':
continue
if q == name:
obj = f'abc.Messageable.{name}'
break
cache = list(self._rtfm_cache[key].items())
def transform(tup):
return tup[0]
matches = fuzzy.finder(obj, cache, key=lambda t: t[0], lazy=False)[:8]
e = discord.Embed(colour=discord.Colour.blurple())
if len(matches) == 0:
return await ctx.send('Could not find anything. Sorry.')
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches)
return matches
def get_object(self,url):
string = url.split('discord.')[1]
return self.rtfmtools.objectify(module= discord,string=string)
def getdocstring(self,url : str):
obj = self.get_object(url)
if obj is None:
return None
return self.rtfmtools.get_docstring(obj)
def allattrs(self,obj):
allattr_dict= {}
allattr_dict['attributes'] = self.rtfmtools.allattrs(obj)
allattr_dict['methods'] = self.rtfmtools.allmethods(obj)
return allattr_dict
def parse_docstring(self,string):
if string is None:
return None
string = string.split('Attributes')[0].split('.. container')[0]
for char in ['`',':class:',':issue:',':exec:',':exc:',':ref:',':meth:',':attr:','*','~','\\']:
string = string.replace(char,'')
if len(string) > 2000:
return string[:2000] + '...'
return string
def create_embed(self,docstring,attrs,matches):
key = matches[0][0]
url=matches[0][1]
#The first match is what we need
e = discord.Embed(color=discord.Color.blurple())
e.set_footer(text= "Note: This rtfm lookup is very unstable and inconsistent. Please follow the links for proper references.")
e.set_author(name='Discord.py',icon_url='https://cdn.discordapp.com/icons/336642139381301249/3aa641b21acded468308a37eef43d7b3.webp?size=1024')
if docstring:
e.title = f'`{key}`'
e.url = url
e.description = f'```yaml\n{docstring}```'
attributes = attrs['attributes'] if 'attributes' in attrs.keys() else None
methods = attrs['methods'] if 'methods' in attrs.keys() else None
if attributes:
attr_string = ' '.join(f'`{attr}`' for attr in attributes)
e.add_field(name='Attributes',value=attr_string , inline=False)
if methods:
method_string = ' '.join(f'`{method}`' for method in methods)
e.add_field(name='Methods',value=method_string , inline=False)
e.add_field(name='Matches',value='\n'.join(f'[`{key}`]({url})' for key, url in matches),inline=False)
return e
async def do_visual_rtfm(self,ctx,matches):
url = matches[0][1]
obj = self.get_object(url)
doc = self.getdocstring(url)
doc = self.parse_docstring(doc)
attr_dict = self.allattrs(obj)
embed = self.create_embed(docstring=doc,attrs=attr_dict,matches=matches)
await ctx.send(embed=embed)
@checks.not_blocked()
@commands.group(aliases=['rtfd'],invoke_without_command=True)
async def rtfm(self, ctx, *, obj: str = None):
"""Gives you a quick documentation for a discord.py entity.
Events, objects, and functions are all supported through a
a cruddy fuzzy algorithm.
"""
key = self.transform_rtfm_language_key(ctx, 'latest')
matches = await self.do_rtfm(ctx,key,obj)
if not isinstance(matches,list):
return
try:
await self.do_visual_rtfm(ctx,matches)
except (discord.HTTPException, IndexError):
await self.rtfm_legacy(ctx=ctx,obj=obj)
@checks.not_blocked()
@rtfm.command(name= 'legacy')
async def rtfm_legacy(self,ctx,*,obj: str = None):
"""Same as rtfm, but provides links only"""
key = self.transform_rtfm_language_key(ctx, 'latest')
matches = await self.do_rtfm(ctx, key, obj)
try:
e = discord.Embed(colour=discord.Colour.blurple())
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches)
await ctx.send(embed=e)
except TypeError:
pass
@rtfm.command(name='python', aliases=['py'])
async def rtfm_python(self, ctx, *, obj: str = None):
"""Gives you a documentation link for a Python entity."""
key = self.transform_rtfm_language_key(ctx, 'python')
matches = await self.do_rtfm(ctx, key, obj)
if not matches:
return
e = discord.Embed(colour=discord.Colour.blurple())
e.description = '\n'.join(f'[`{key}`]({url})' for key, url in matches)
await ctx.send(embed=e)
class VisualRTFMtools:
'''A bunch of functions allowing us to get the stuff we need.'''
def objectify(self,module,string):
try:
return attrgetter(string)(module)
except AttributeError:
return None
def get_docstring(self,obj):
return inspect.getdoc(obj)
def allmethods(self,obj,useful = True):
'''Returns all methods of a class, when useful is True, it will exclude methods starting with "_"'''
if not useful:
return [method for method in dir(obj) if callable(getattr(obj, method))]
else :
return [method for method in dir(obj) if callable(getattr(obj, method)) and not method.startswith('_')]
def allattrs(self,obj,useful = True):
'''Same as allmethods, but lists attributes instead'''
if not useful:
return [attr for attr in dir(obj) if not callable(getattr(obj, attr))]
else :
return [attr for attr in dir(obj) if not callable(getattr(obj, attr)) and not attr.startswith('_')]
def getall(self,obj,useful = True):
return sorted(self.allattrs(obj,useful) + self.allmethods(obj,useful), key=str.lower)
def setup(bot):
bot.add_cog(RTFM(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment