Last active
September 6, 2024 16:05
-
-
Save hugsy/714e0038d5d0b1deb7fad1907928252f to your computer and use it in GitHub Desktop.
Run Binary Ninja headlessly using RPyC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import binaryninja | |
import threading | |
import typing | |
import logging | |
import rpyc | |
import rpyc.utils.helpers | |
import rpyc.utils.server | |
if typing.TYPE_CHECKING: | |
import rpyc.core.protocol | |
SERVICE_NAME: str = "Binja-RPyC" | |
DEBUG: bool = False | |
HOST: str = "0.0.0.0" | |
PORT: int = 18812 | |
PAGE_SIZE: int = 0x1000 | |
g_ServiceThread = None | |
g_Server = None | |
__bv = None | |
class BinjaRpycService(rpyc.Service): | |
ALIASES = ["binja", ] | |
def __init__(self, bv): | |
self.bv = bv | |
return | |
def on_connect(self, conn: rpyc.core.protocol.Connection): | |
logging.info("connect open: {}".format(conn,)) | |
return | |
def on_disconnect(self, conn: rpyc.core.protocol.Connection): | |
print("connection closed: {}".format(conn,)) | |
return | |
exposed_binaryninja = binaryninja | |
def exposed_bv(self): | |
return self.bv | |
def exposed_eval(self, cmd): | |
return eval(cmd) | |
def is_service_started(): | |
global g_ServiceThread | |
return g_ServiceThread is not None | |
def start_service(host: str, port: int): | |
"""Starting the RPyC server""" | |
global g_Server, __bv | |
g_Server = None | |
__bv = bv | |
for i in range(1): | |
p: int = port + i | |
try: | |
service = rpyc.utils.helpers.classpartial(BinjaRpycService, bv) | |
g_Server = rpyc.utils.server.ThreadedServer( | |
service(), | |
hostname=host, | |
port=p, | |
protocol_config={'allow_public_attrs': True, } | |
) | |
break | |
except OSError as e: | |
logging.error(f"OSError: {str(e)}") | |
g_Server = None | |
if not g_Server: | |
logging.error("failed to start server...") | |
return | |
logging.info("server successfully started") | |
g_Server.start() | |
return | |
def rpyc_start(): | |
global g_ServiceThread | |
logging.debug("Starting background service...") | |
g_ServiceThread = threading.Thread( | |
target=start_service, args=(HOST, PORT)) | |
g_ServiceThread.daemon = True | |
g_ServiceThread.start() | |
time.sleep(0.2) | |
if g_ServiceThread: | |
binaryninja.show_message_box( | |
SERVICE_NAME, | |
"Service successfully started, you can use any RPyC client to connect to this instance of Binary Ninja", | |
binaryninja.MessageBoxButtonSet.OKButtonSet, | |
binaryninja.MessageBoxIcon.InformationIcon | |
) | |
return | |
def shutdown_service() -> bool: | |
if not g_Server: | |
logging.warning("Service not started") | |
return False | |
try: | |
logging.debug("Shutting down service") | |
g_Server.close() | |
except Exception as e: | |
logging.error(f"Exception: {str(e)}") | |
return False | |
return True | |
def stop_service() -> bool: | |
global g_ServiceThread | |
if not g_ServiceThread: | |
return False | |
logging.debug("Stopping service thread") | |
if shutdown_service(): | |
g_ServiceThread.join() | |
g_ServiceThread = None | |
logging.info("Service thread stopped") | |
return True | |
def rpyc_stop(bv: binaryninja.BinaryView): | |
if stop_service(): | |
binaryninja.show_message_box( | |
SERVICE_NAME, | |
"Service successfully stopped", | |
binaryninja.MessageBoxButtonSet.OKButtonSet, | |
binaryninja.MessageBoxIcon.InformationIcon | |
) | |
else: | |
binaryninja.show_message_box( | |
SERVICE_NAME, | |
"An error occured while stopping the service, check logs", | |
binaryninja.MessageBoxButtonSet.OKButtonSet, | |
binaryninja.MessageBoxIcon.ErrorIcon | |
) | |
return | |
if is_service_started(): | |
logging.info("Trying to start the service") | |
rpyc_stop() | |
else: | |
logging.info("Trying to stop the service") | |
rpyc_start() |
Awesome work!
Appreciated, thanks but rpyc
is really the hero here :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome work!