Created
September 25, 2020 17:11
-
-
Save JosXa/78c018b277b87f8f47aafc22b64a51fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import traceback | |
from typing import ( | |
Set, | |
cast, | |
) | |
import black | |
from haps import Inject | |
from meval import meval | |
from pyrogram import filters | |
from pyrogram.types import Message | |
from app.abstractions.base_module import BaseModule | |
from app.modules.repl.helpers import stdout_io | |
from app.modules.repl.lovers_fun import Joschi, Ruzie | |
from app.modules.repl.models import ResultViewModel | |
from app.modules.repl.repl_execution_context import build_execution_context | |
from app.modules.repl.result_view import ExecutionResultView | |
from app.services.draftsservice import DraftsService | |
from app.services.notifications import _UserNotificationService | |
from botkit.builders import CallbackBuilder, ViewBuilder | |
from botkit.builtin_modules.system.system_tests import notests | |
from botkit.core.modules import module | |
from botkit.future_tgtypes.message_identity import MessageIdentity | |
from botkit.persistence.callback_store import ICallbackStore | |
from botkit.routing.pipelines.execution_plan import SendTo | |
from botkit.routing.pipelines.steps._base import Continue | |
from botkit.routing.route_builder.builder import RouteBuilder | |
from botkit.services.companionbotservice import CompanionBotService | |
from botkit.services.historyservice import HistoryService | |
from botkit.utils.sentinel import NotSet | |
from botkit.views.botkit_context import Context | |
ALLOWED_CMD_PREFIXES = ["/", "#", ".", ""] | |
ALLOWED_PY_COMMANDS = ["py", "python", "exec", "eval", "execute_python"] | |
@module | |
class REPLModule(BaseModule): | |
notifs: _UserNotificationService = Inject() | |
drafts: DraftsService = Inject() | |
callback_store: ICallbackStore = Inject("redis") | |
def __init__(self): | |
self.ruzie = Ruzie() | |
self.joschi = Joschi() | |
self.companion = CompanionBotService(self.user_client, self.bot_client) | |
self.history = HistoryService(self.user_client) | |
self._sent_messages: Set[MessageIdentity] = set() | |
async def load(self) -> None: | |
pass | |
def register(self, routes: RouteBuilder): | |
routes.use(self.user_client) | |
( | |
routes.on( | |
filters.command(ALLOWED_PY_COMMANDS, ALLOWED_CMD_PREFIXES) | |
& (filters.outgoing | filters.user("ruzikmn")) | |
) | |
.remove_trigger() | |
.gather(self.execute_python) | |
.then_send( | |
ExecutionResultView, to=SendTo.same_chat_quote_replied_to, via_bot=self.bot_client | |
) | |
.and_collect(self.save_sent) | |
) | |
@notests | |
async def execute_python(self, context: Context) -> ResultViewModel: | |
message = cast(Message, context.update) | |
EDIT_SEP = "——————————\n" | |
text = message.text | |
text = text.replace(EDIT_SEP, "") | |
joined_commands = rf"({'|'.join(map(re.escape, ALLOWED_PY_COMMANDS))})" | |
joined_prefixes = rf"({'|'.join(map(re.escape, ALLOWED_CMD_PREFIXES))})" | |
pattern = rf"^\s*{joined_prefixes}{joined_commands}\s" | |
code = re.sub(pattern, "", text, re.MULTILINE) | |
Message.__hash__ = lambda x: (x.chat.id << 32) + x.message_id | |
""" | |
TODO: This needs to be put into routes... | |
``` | |
( | |
routes | |
.(...) | |
.respond_with( | |
render_execution_result, | |
to=SendTo.same_chat_quote_original, | |
strategy=Strategy.edit_or_replace, | |
via_bot=self.bot_client | |
) | |
) | |
``` | |
""" | |
should_replace = None | |
should_reply_to = None | |
if replied_to_msg := message.reply_to_message: | |
if replied_to_msg in self._sent_messages: | |
should_replace = replied_to_msg | |
if original_replied_to_msg := replied_to_msg.reply_to_message: | |
should_reply_to = original_replied_to_msg | |
else: | |
should_reply_to = replied_to_msg | |
print("should replace:", should_replace, " -- ", "should_reply_to:", should_reply_to) | |
try: | |
code = black.format_str( | |
code, | |
mode=black.FileMode( | |
target_versions={black.TargetVersion.PY38}, | |
line_length=27, | |
string_normalization=True, | |
), | |
) | |
except: | |
traceback.print_exc() | |
await self.drafts.set_draft( | |
context.chat_id, f"/python\n{EDIT_SEP}{code}", reply_to=context.message_id | |
) | |
reply_to_id = message.reply_to_message.message_id if message.reply_to_message else None | |
view_builder = ViewBuilder(CallbackBuilder(state=None, callback_store=self.callback_store)) | |
# TODO: put this in the state model (already a field for it) | |
async def commit(): | |
await message.delete() | |
rendered = view_builder.render() | |
await self.companion.send_rendered_message_via( | |
context.chat_id, rendered, reply_to=reply_to_id, hide_via=True | |
) | |
execution_context = build_execution_context( | |
self.user_client, context, self.ruzie, self.joschi, message, reply_to_id, view_builder | |
) | |
error_msg = None | |
result = NotSet | |
if result is NotSet: | |
try: | |
with stdout_io() as s: | |
meval_globals = {"__name__": "ReplModule", "__package__": "repl_module"} | |
result = await meval(code, meval_globals, **execution_context) | |
result = s.getvalue() if result is None else result | |
except Exception as e: | |
error_msg = str(e) | |
result = None | |
if view_builder.is_dirty: | |
await commit() | |
raise Continue | |
# if result is not None and not result and "print" not in code: | |
# try: | |
# result = eval(code, execution_context) | |
# error_msg = None | |
# except: | |
# pass | |
# | |
# if view_builder.is_dirty: | |
# await commit() | |
# raise Continue | |
# | |
# # If eval failed, try exec | |
# if result is NotSet: | |
# try: | |
# result = exec(code, execution_context) | |
# except: | |
# pass | |
# | |
# if view_builder.is_dirty: | |
# await commit() | |
# raise Continue | |
if result is None: | |
evaluation_result = "None" | |
elif result: | |
evaluation_result = result | |
else: | |
evaluation_result = "No output" | |
return ResultViewModel(code=code, result=str(evaluation_result), error_msg=error_msg) | |
def save_sent(self, context: Context): | |
if context.response_identity: | |
self._sent_messages.add(context.response_identity) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment