Created
June 18, 2013 15:01
-
-
Save eloycoto/5806105 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def start_server(host, port, keyfile, level): | |
paramiko_level = getattr(paramiko.common, level) | |
paramiko.common.logging.basicConfig(level=paramiko_level) | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
server_socket.bind((host, port)) | |
server_socket.listen(BACKLOG) | |
while True: | |
conn, addr = server_socket.accept() | |
host_key = paramiko.RSAKey.from_private_key_file(keyfile) | |
transport = paramiko.Transport(conn) | |
transport.add_server_key(host_key) | |
transport.set_subsystem_handler( | |
'sftp', paramiko.SFTPServer, StubSFTPServer) | |
server = StubServer() | |
transport.start_server(server=server) | |
channel = transport.accept() | |
while transport.is_active(): | |
time.sleep(1) | |
start_server("localhost",222,"/tmp/host.key","INFO"): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ | |
SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED | |
class StubServer (ServerInterface): | |
def check_auth_password(self, username, password): | |
# all are allowed | |
if username == 'eloy' | |
return AUTH_SUCCESSFUL | |
def check_channel_request(self, kind, chanid): | |
return OPEN_SUCCEEDED | |
class StubSFTPHandle (SFTPHandle): | |
def stat(self): | |
try: | |
return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
def chattr(self, attr): | |
# python doesn't have equivalents to fchown or fchmod, so we have to | |
# use the stored filename | |
try: | |
SFTPServer.set_file_attr(self.filename, attr) | |
return SFTP_OK | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
class StubSFTPServer (SFTPServerInterface): | |
# assume current folder is a fine root | |
# (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess) | |
ROOT = os.getcwd() | |
def _realpath(self, path): | |
return self.ROOT + self.canonicalize(path) | |
def list_folder(self, path): | |
path = self._realpath(path) | |
try: | |
out = [ ] | |
flist = os.listdir(path) | |
for fname in flist: | |
attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname))) | |
attr.filename = fname | |
out.append(attr) | |
return out | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
def stat(self, path): | |
path = self._realpath(path) | |
try: | |
return SFTPAttributes.from_stat(os.stat(path)) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
def lstat(self, path): | |
path = self._realpath(path) | |
try: | |
return SFTPAttributes.from_stat(os.lstat(path)) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
def open(self, path, flags, attr): | |
path = self._realpath(path) | |
try: | |
binary_flag = getattr(os, 'O_BINARY', 0) | |
flags |= binary_flag | |
mode = getattr(attr, 'st_mode', None) | |
if mode is not None: | |
fd = os.open(path, flags, mode) | |
else: | |
# os.open() defaults to 0777 which is | |
# an odd default mode for files | |
fd = os.open(path, flags, 0666) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
if (flags & os.O_CREAT) and (attr is not None): | |
attr._flags &= ~attr.FLAG_PERMISSIONS | |
SFTPServer.set_file_attr(path, attr) | |
if flags & os.O_WRONLY: | |
if flags & os.O_APPEND: | |
fstr = 'ab' | |
else: | |
fstr = 'wb' | |
elif flags & os.O_RDWR: | |
if flags & os.O_APPEND: | |
fstr = 'a+b' | |
else: | |
fstr = 'r+b' | |
else: | |
# O_RDONLY (== 0) | |
fstr = 'rb' | |
try: | |
f = os.fdopen(fd, fstr) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
fobj = StubSFTPHandle(flags) | |
fobj.filename = path | |
fobj.readfile = f | |
fobj.writefile = f | |
return fobj | |
def remove(self, path): | |
path = self._realpath(path) | |
try: | |
os.remove(path) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def rename(self, oldpath, newpath): | |
oldpath = self._realpath(oldpath) | |
newpath = self._realpath(newpath) | |
try: | |
os.rename(oldpath, newpath) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def mkdir(self, path, attr): | |
path = self._realpath(path) | |
try: | |
os.mkdir(path) | |
if attr is not None: | |
SFTPServer.set_file_attr(path, attr) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def rmdir(self, path): | |
path = self._realpath(path) | |
try: | |
os.rmdir(path) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def chattr(self, path, attr): | |
path = self._realpath(path) | |
try: | |
SFTPServer.set_file_attr(path, attr) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def symlink(self, target_path, path): | |
path = self._realpath(path) | |
if (len(target_path) > 0) and (target_path[0] == '/'): | |
# absolute symlink | |
target_path = os.path.join(self.ROOT, target_path[1:]) | |
if target_path[:2] == '//': | |
# bug in os.path.join | |
target_path = target_path[1:] | |
else: | |
# compute relative to path | |
abspath = os.path.join(os.path.dirname(path), target_path) | |
if abspath[:len(self.ROOT)] != self.ROOT: | |
# this symlink isn't going to work anyway -- just break it immediately | |
target_path = '<error>' | |
try: | |
os.symlink(target_path, path) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
return SFTP_OK | |
def readlink(self, path): | |
path = self._realpath(path) | |
try: | |
symlink = os.readlink(path) | |
except OSError, e: | |
return SFTPServer.convert_errno(e.errno) | |
# if it's absolute, remove the root | |
if os.path.isabs(symlink): | |
if symlink[:len(self.ROOT)] == self.ROOT: | |
symlink = symlink[len(self.ROOT):] | |
if (len(symlink) == 0) or (symlink[0] != '/'): | |
symlink = '/' + symlink | |
else: | |
symlink = '<error>' | |
return symlink |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment