Skip to content

Instantly share code, notes, and snippets.

@JosXa
Created September 25, 2020 17:11
Show Gist options
  • Save JosXa/78c018b277b87f8f47aafc22b64a51fa to your computer and use it in GitHub Desktop.
Save JosXa/78c018b277b87f8f47aafc22b64a51fa to your computer and use it in GitHub Desktop.
import re
import traceback
from typing import (
Set,
cast,
)
import black
from haps import Inject
from meval import meval
from pyrogram import filters
from pyrogram.types import Message
from app.abstractions.base_module import BaseModule
from app.modules.repl.helpers import stdout_io
from app.modules.repl.lovers_fun import Joschi, Ruzie
from app.modules.repl.models import ResultViewModel
from app.modules.repl.repl_execution_context import build_execution_context
from app.modules.repl.result_view import ExecutionResultView
from app.services.draftsservice import DraftsService
from app.services.notifications import _UserNotificationService
from botkit.builders import CallbackBuilder, ViewBuilder
from botkit.builtin_modules.system.system_tests import notests
from botkit.core.modules import module
from botkit.future_tgtypes.message_identity import MessageIdentity
from botkit.persistence.callback_store import ICallbackStore
from botkit.routing.pipelines.execution_plan import SendTo
from botkit.routing.pipelines.steps._base import Continue
from botkit.routing.route_builder.builder import RouteBuilder
from botkit.services.companionbotservice import CompanionBotService
from botkit.services.historyservice import HistoryService
from botkit.utils.sentinel import NotSet
from botkit.views.botkit_context import Context
ALLOWED_CMD_PREFIXES = ["/", "#", ".", ""]
ALLOWED_PY_COMMANDS = ["py", "python", "exec", "eval", "execute_python"]
@module
class REPLModule(BaseModule):
notifs: _UserNotificationService = Inject()
drafts: DraftsService = Inject()
callback_store: ICallbackStore = Inject("redis")
def __init__(self):
self.ruzie = Ruzie()
self.joschi = Joschi()
self.companion = CompanionBotService(self.user_client, self.bot_client)
self.history = HistoryService(self.user_client)
self._sent_messages: Set[MessageIdentity] = set()
async def load(self) -> None:
pass
def register(self, routes: RouteBuilder):
routes.use(self.user_client)
(
routes.on(
filters.command(ALLOWED_PY_COMMANDS, ALLOWED_CMD_PREFIXES)
& (filters.outgoing | filters.user("ruzikmn"))
)
.remove_trigger()
.gather(self.execute_python)
.then_send(
ExecutionResultView, to=SendTo.same_chat_quote_replied_to, via_bot=self.bot_client
)
.and_collect(self.save_sent)
)
@notests
async def execute_python(self, context: Context) -> ResultViewModel:
message = cast(Message, context.update)
EDIT_SEP = "——————————\n"
text = message.text
text = text.replace(EDIT_SEP, "")
joined_commands = rf"({'|'.join(map(re.escape, ALLOWED_PY_COMMANDS))})"
joined_prefixes = rf"({'|'.join(map(re.escape, ALLOWED_CMD_PREFIXES))})"
pattern = rf"^\s*{joined_prefixes}{joined_commands}\s"
code = re.sub(pattern, "", text, re.MULTILINE)
Message.__hash__ = lambda x: (x.chat.id << 32) + x.message_id
"""
TODO: This needs to be put into routes...
```
(
routes
.(...)
.respond_with(
render_execution_result,
to=SendTo.same_chat_quote_original,
strategy=Strategy.edit_or_replace,
via_bot=self.bot_client
)
)
```
"""
should_replace = None
should_reply_to = None
if replied_to_msg := message.reply_to_message:
if replied_to_msg in self._sent_messages:
should_replace = replied_to_msg
if original_replied_to_msg := replied_to_msg.reply_to_message:
should_reply_to = original_replied_to_msg
else:
should_reply_to = replied_to_msg
print("should replace:", should_replace, " -- ", "should_reply_to:", should_reply_to)
try:
code = black.format_str(
code,
mode=black.FileMode(
target_versions={black.TargetVersion.PY38},
line_length=27,
string_normalization=True,
),
)
except:
traceback.print_exc()
await self.drafts.set_draft(
context.chat_id, f"/python\n{EDIT_SEP}{code}", reply_to=context.message_id
)
reply_to_id = message.reply_to_message.message_id if message.reply_to_message else None
view_builder = ViewBuilder(CallbackBuilder(state=None, callback_store=self.callback_store))
# TODO: put this in the state model (already a field for it)
async def commit():
await message.delete()
rendered = view_builder.render()
await self.companion.send_rendered_message_via(
context.chat_id, rendered, reply_to=reply_to_id, hide_via=True
)
execution_context = build_execution_context(
self.user_client, context, self.ruzie, self.joschi, message, reply_to_id, view_builder
)
error_msg = None
result = NotSet
if result is NotSet:
try:
with stdout_io() as s:
meval_globals = {"__name__": "ReplModule", "__package__": "repl_module"}
result = await meval(code, meval_globals, **execution_context)
result = s.getvalue() if result is None else result
except Exception as e:
error_msg = str(e)
result = None
if view_builder.is_dirty:
await commit()
raise Continue
# if result is not None and not result and "print" not in code:
# try:
# result = eval(code, execution_context)
# error_msg = None
# except:
# pass
#
# if view_builder.is_dirty:
# await commit()
# raise Continue
#
# # If eval failed, try exec
# if result is NotSet:
# try:
# result = exec(code, execution_context)
# except:
# pass
#
# if view_builder.is_dirty:
# await commit()
# raise Continue
if result is None:
evaluation_result = "None"
elif result:
evaluation_result = result
else:
evaluation_result = "No output"
return ResultViewModel(code=code, result=str(evaluation_result), error_msg=error_msg)
def save_sent(self, context: Context):
if context.response_identity:
self._sent_messages.add(context.response_identity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment