Skip to content

Instantly share code, notes, and snippets.

@eloycoto
Created June 18, 2013 15:01
Show Gist options
  • Save eloycoto/5806105 to your computer and use it in GitHub Desktop.
Save eloycoto/5806105 to your computer and use it in GitHub Desktop.
def start_server(host, port, keyfile, level):
paramiko_level = getattr(paramiko.common, level)
paramiko.common.logging.basicConfig(level=paramiko_level)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
server_socket.bind((host, port))
server_socket.listen(BACKLOG)
while True:
conn, addr = server_socket.accept()
host_key = paramiko.RSAKey.from_private_key_file(keyfile)
transport = paramiko.Transport(conn)
transport.add_server_key(host_key)
transport.set_subsystem_handler(
'sftp', paramiko.SFTPServer, StubSFTPServer)
server = StubServer()
transport.start_server(server=server)
channel = transport.accept()
while transport.is_active():
time.sleep(1)
start_server("localhost",222,"/tmp/host.key","INFO"):
import os
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
class StubServer (ServerInterface):
def check_auth_password(self, username, password):
# all are allowed
if username == 'eloy'
return AUTH_SUCCESSFUL
def check_channel_request(self, kind, chanid):
return OPEN_SUCCEEDED
class StubSFTPHandle (SFTPHandle):
def stat(self):
try:
return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
except OSError, e:
return SFTPServer.convert_errno(e.errno)
def chattr(self, attr):
# python doesn't have equivalents to fchown or fchmod, so we have to
# use the stored filename
try:
SFTPServer.set_file_attr(self.filename, attr)
return SFTP_OK
except OSError, e:
return SFTPServer.convert_errno(e.errno)
class StubSFTPServer (SFTPServerInterface):
# assume current folder is a fine root
# (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
ROOT = os.getcwd()
def _realpath(self, path):
return self.ROOT + self.canonicalize(path)
def list_folder(self, path):
path = self._realpath(path)
try:
out = [ ]
flist = os.listdir(path)
for fname in flist:
attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname)))
attr.filename = fname
out.append(attr)
return out
except OSError, e:
return SFTPServer.convert_errno(e.errno)
def stat(self, path):
path = self._realpath(path)
try:
return SFTPAttributes.from_stat(os.stat(path))
except OSError, e:
return SFTPServer.convert_errno(e.errno)
def lstat(self, path):
path = self._realpath(path)
try:
return SFTPAttributes.from_stat(os.lstat(path))
except OSError, e:
return SFTPServer.convert_errno(e.errno)
def open(self, path, flags, attr):
path = self._realpath(path)
try:
binary_flag = getattr(os, 'O_BINARY', 0)
flags |= binary_flag
mode = getattr(attr, 'st_mode', None)
if mode is not None:
fd = os.open(path, flags, mode)
else:
# os.open() defaults to 0777 which is
# an odd default mode for files
fd = os.open(path, flags, 0666)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
if (flags & os.O_CREAT) and (attr is not None):
attr._flags &= ~attr.FLAG_PERMISSIONS
SFTPServer.set_file_attr(path, attr)
if flags & os.O_WRONLY:
if flags & os.O_APPEND:
fstr = 'ab'
else:
fstr = 'wb'
elif flags & os.O_RDWR:
if flags & os.O_APPEND:
fstr = 'a+b'
else:
fstr = 'r+b'
else:
# O_RDONLY (== 0)
fstr = 'rb'
try:
f = os.fdopen(fd, fstr)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
fobj = StubSFTPHandle(flags)
fobj.filename = path
fobj.readfile = f
fobj.writefile = f
return fobj
def remove(self, path):
path = self._realpath(path)
try:
os.remove(path)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def rename(self, oldpath, newpath):
oldpath = self._realpath(oldpath)
newpath = self._realpath(newpath)
try:
os.rename(oldpath, newpath)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def mkdir(self, path, attr):
path = self._realpath(path)
try:
os.mkdir(path)
if attr is not None:
SFTPServer.set_file_attr(path, attr)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def rmdir(self, path):
path = self._realpath(path)
try:
os.rmdir(path)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def chattr(self, path, attr):
path = self._realpath(path)
try:
SFTPServer.set_file_attr(path, attr)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def symlink(self, target_path, path):
path = self._realpath(path)
if (len(target_path) > 0) and (target_path[0] == '/'):
# absolute symlink
target_path = os.path.join(self.ROOT, target_path[1:])
if target_path[:2] == '//':
# bug in os.path.join
target_path = target_path[1:]
else:
# compute relative to path
abspath = os.path.join(os.path.dirname(path), target_path)
if abspath[:len(self.ROOT)] != self.ROOT:
# this symlink isn't going to work anyway -- just break it immediately
target_path = '<error>'
try:
os.symlink(target_path, path)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
def readlink(self, path):
path = self._realpath(path)
try:
symlink = os.readlink(path)
except OSError, e:
return SFTPServer.convert_errno(e.errno)
# if it's absolute, remove the root
if os.path.isabs(symlink):
if symlink[:len(self.ROOT)] == self.ROOT:
symlink = symlink[len(self.ROOT):]
if (len(symlink) == 0) or (symlink[0] != '/'):
symlink = '/' + symlink
else:
symlink = '<error>'
return symlink
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment