Skip to content

Instantly share code, notes, and snippets.

@aanari
Last active August 15, 2023 18:35
Show Gist options
  • Save aanari/f47fe5e79ac041d88e06697edb05acf6 to your computer and use it in GitHub Desktop.
Save aanari/f47fe5e79ac041d88e06697edb05acf6 to your computer and use it in GitHub Desktop.
Airflow Dropbox Hook
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import dropbox
import logging
import sys
from airflow.hooks.base_hook import BaseHook
from dropbox.files import WriteMode
from dropbox.exceptions import AuthError, HttpError, CreateFolderError, ListFolderError, DeleteError, RelocationError, GetMetadataError, UploadError
class DropboxHook(BaseHook):
"""
Interact with the Dropbox API.
"""
def __init__(self, dropbox_conn_id):
self.dropbox_conn_id = dropbox_conn_id
self.conn = None
def get_conn(self):
"""
Returns an Dropbox connection object.
"""
if self.conn is None:
params = self.get_connection(self.dropbox_conn_id)
# Use the connection password as Dropbox token
self.conn = dropbox.Dropbox(params.password)
# Check that the access token is valid
try:
self.conn.users_get_current_account()
except AuthError as err:
sys.exit('ERROR: Invalid access token; try re-generating an access token from the app console on the web: ' + str(err))
return self.conn
def close_conn(self):
"""
Closes the connection. An error will occur if the
connection wasnt ever opened.
"""
self.conn = None
def describe_directory(self, path):
"""
Returns a list of metadata for the specified Dropbox folder.
:param path: full path to the remote directory
:type path: str
"""
conn = self.get_conn()
logging.info('Describing folder on Dropbox: {}'.format(path))
try:
metadata = conn.files_get_metadata(path)
logging.info('Finished describing folder on Dropbox: {}'.format(path))
return metadata
except GetMetadataError as err:
print("Dropbox API error", err)
def list_directory(self, path, recursive=False):
"""
Returns a list of files on Dropbox.
:param path: full path to the remote directory to list
:type path: str
:param recursive: If true, the list folder operation will be applied
recursively to all subfolders and the response will contain contents of all subfolders.
:type recursive: bool
"""
conn = self.get_conn()
logging.info('Listing folder on Dropbox: {}'.format(path))
try:
files = conn.files_list_folder(path, recursive)
logging.info('Finished listing folder on Dropbox: {}'.format(path))
return files
except ListFolderError as err:
print("Dropbox API error", err)
def create_directory(self, path):
"""
Creates a directory on Dropbox at the given path.
:param path: full path to the remote directory to create
:type path: str
"""
conn = self.get_conn()
logging.info('Creating folder on Dropbox: {}'.format(path))
try:
conn.files_create_folder(path)
logging.info('Finished creating folder on Dropbox: {}'.format(path))
except CreateFolderError as err:
print("Dropbox API error", err)
def copy(self, remote_source_path, remote_destination_path):
"""
Copies a remote file or directory from source to destination on Dropbox.
:param remote_source_path: full path to the remote source to copy from
:type remote_source_path: str
:param remote_destination_path: full path to the remote destination to copy to
:type remote_destination_path: str
"""
conn = self.get_conn()
logging.info('Copying files on Dropbox: {} to {}'.format(remote_source_path, remote_destination_path))
try:
conn.files_copy(remote_source_path, remote_destination_path)
logging.info('Finished copying files on Dropbox: {}'.format(remote_source_path, remote_destination_path))
except RelocationError as err:
print("Dropbox API error", err)
def move(self, remote_source_path, remote_destination_path):
"""
Moves a remote file or directory from source to destination on Dropbox.
:param remote_source_path: full path to the remote source to move from
:type remote_source_path: str
:param remote_destination_path: full path to the remote destination to move to
:type remote_destination_path: str
"""
conn = self.get_conn()
logging.info('Moving files on Dropbox: {} to {}'.format(remote_source_path, remote_destination_path))
try:
conn.files_move(remote_source_path, remote_destination_path)
logging.info('Finished moving files on Dropbox: {}'.format(remote_source_path, remote_destination_path))
except RelocationError as err:
print("Dropbox API error", err)
def retrieve_file(self, remote_full_path, local_full_path):
"""
Transfers the remote file to a local location.
:param remote_full_path: full path to the remote source to retrieve from
:type remote_full_path: str
:param local_full_path: full path to the local destination to save to
:type local_full_path: str
"""
conn = self.get_conn()
logging.info('Retrieving file from Dropbox: {}'.format(remote_full_path))
try:
conn.files_download_to_file(local_full_path, remote_full_path)
logging.info('Finished retrieving file from Dropbox: {}'.format(remote_full_path))
except HttpError as err:
print("Dropbox API error", err)
def store_file(self, remote_full_path, local_full_path):
"""
Transfers a local file to the remote location.
:param remote_full_path: full path to the remote source to save to
:type remote_full_path: str
:param local_full_path: full path to the local destination to retrieve from
:type local_full_path: str
"""
conn = self.get_conn()
logging.info('Uploading file to Dropbox: {}'.format(remote_full_path))
with open(local_full_path, 'rb') as f:
try:
# We use WriteMode=overwrite to make sure that the settings in the file are changed on upload
conn.files_upload(f.read(), remote_full_path, mode=WriteMode('overwrite'))
logging.info('Finished uploading file to Dropbox: {}'.format(remote_full_path))
except UploadError as err:
print("Dropbox API error", err)
def delete(self, path):
"""
Deletes a file or directory on Dropbox.
:param path: full path to the remote file or directory to delete
:type path: str
"""
conn = self.get_conn()
logging.info('Deleting file on Dropbox: {}'.format(path))
try:
conn.permanently_delete(path)
logging.info('Finished deleting file on Dropbox: {}'.format(path))
except DeleteError as err:
print("Dropbox API error", err)
@aanari
Copy link
Author

aanari commented Mar 8, 2018

Installation Instructions

  1. Copy dropbox_hook.py to Airflow worker container (as root) to this location: /usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/dropbox_hook.py
  2. Modify _hooks object in /usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/__init__.py and add an entry for 'dropbox_hook': [DropboxHook'] to it
  3. Use Docker compose to restart all Airflow containers (using Celery)

@DustinKLo
Copy link

👍 LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment