Skip to content

Instantly share code, notes, and snippets.

@mdellavo
Last active December 16, 2015 17:45
Show Gist options
  • Save mdellavo/d349cbc9ffc3b30c1862 to your computer and use it in GitHub Desktop.
Save mdellavo/d349cbc9ffc3b30c1862 to your computer and use it in GitHub Desktop.
import time
import socket
import threading
import paramiko
class ServerInterface(paramiko.ServerInterface):
def get_allowed_auths(self, username):
return "none"
def check_auth_none(self, username):
print "auth", username
return paramiko.AUTH_SUCCESSFUL
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED if kind == "session" else paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
key = paramiko.RSAKey.generate(1024)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 2222))
sock.listen(10)
print "listening..."
client, address = sock.accept()
transport = paramiko.Transport(client)
print "transport =", transport
transport.load_server_moduli()
transport.add_server_key(key)
transport.set_subsystem_handler("sftp", paramiko.SFTPServer, paramiko.SFTPServerInterface)
transport.start_server(server=ServerInterface())
# Transport.server_accepts is empty
print "BEFORE transport.server_accepts =", transport.server_accepts
print "accept() =", transport.accept()
# Do work...
while transport.is_active():
time.sleep(1)
print "AFTER transport.server_accepts =", transport.server_accepts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment