Skip to content

Instantly share code, notes, and snippets.

@billcrook
Created April 9, 2018 15:56
Show Gist options
  • Save billcrook/44eef8b5ca18d9236c5502077e3db6d5 to your computer and use it in GitHub Desktop.
Save billcrook/44eef8b5ca18d9236c5502077e3db6d5 to your computer and use it in GitHub Desktop.
An SFTP util class I created for use in my airflow pipelines. Not beautiful, but it works.
import logging
import pexpect
from airflow.hooks.base_hook import BaseHook
class SFTP(object):
"""
Requires openssh_client. Spawns process to execute sftp command.
"""
def __init__(self, connection_id):
"""
:param connection_id: The airflow connection id to use when connecting running sftp command
"""
connection = BaseHook.get_connection(connection_id)
self.username = connection.login
self.host = connection.host
self.port = connection.port
self.password = connection.password
def get_files(self, remote_path, local_path):
"""
Connect to connection_id and download all files in the remote path.
:param remote_path: The remote path to sync
:param local_path: The local dir to store files
:return: None
"""
logging.info(f'sftp get {remote_path} to: {local_path}')
child = None
try:
logging.debug('spawning child')
child = pexpect.spawn(
f'/usr/bin/sftp -r -P {self.port} {self.username}@{self.host}:{remote_path} {local_path}',
timeout=14400)
self._handle_sftp_prompt(child)
child.expect(pexpect.EOF)
finally:
if child:
child.close()
if child.isalive():
logging.warning('Child did not exit gracefully.')
else:
logging.debug('Child exited gracefully.')
if child:
if child.status > 0:
raise Exception(f'sftp command exited with'
f' status: {child.status}'
f' sigstatus: {child.signalstatus}'
f' exitstatus: {child.exitstatus}')
else:
logging.info('download complete')
def _handle_sftp_prompt(self, child):
logging.debug('expecting prompt...')
i = child.expect(['.*password:.*', '.*continue connecting.*', '.*Connected.*'])
if i == 0:
logging.info('supplying pass to sftp server')
child.sendline(self.password)
logging.debug('sent pw')
self._handle_sftp_prompt(child)
elif i == 1:
logging.info('answering yes to host check')
child.sendline('yes')
logging.debug('sent yes')
self._handle_sftp_prompt(child)
elif i == 2:
logging.info('connected to sftp server')
@lioneltran
Copy link

Thanks so much, it works well. But how about put file. :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment