Last active
August 15, 2023 18:35
-
-
Save aanari/f47fe5e79ac041d88e06697edb05acf6 to your computer and use it in GitHub Desktop.
Airflow Dropbox Hook
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import print_function | |
import dropbox | |
import logging | |
import sys | |
from airflow.hooks.base_hook import BaseHook | |
from dropbox.files import WriteMode | |
from dropbox.exceptions import AuthError, HttpError, CreateFolderError, ListFolderError, DeleteError, RelocationError, GetMetadataError, UploadError | |
class DropboxHook(BaseHook): | |
""" | |
Interact with the Dropbox API. | |
""" | |
def __init__(self, dropbox_conn_id): | |
self.dropbox_conn_id = dropbox_conn_id | |
self.conn = None | |
def get_conn(self): | |
""" | |
Returns an Dropbox connection object. | |
""" | |
if self.conn is None: | |
params = self.get_connection(self.dropbox_conn_id) | |
# Use the connection password as Dropbox token | |
self.conn = dropbox.Dropbox(params.password) | |
# Check that the access token is valid | |
try: | |
self.conn.users_get_current_account() | |
except AuthError as err: | |
sys.exit('ERROR: Invalid access token; try re-generating an access token from the app console on the web: ' + str(err)) | |
return self.conn | |
def close_conn(self): | |
""" | |
Closes the connection. An error will occur if the | |
connection wasnt ever opened. | |
""" | |
self.conn = None | |
def describe_directory(self, path): | |
""" | |
Returns a list of metadata for the specified Dropbox folder. | |
:param path: full path to the remote directory | |
:type path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Describing folder on Dropbox: {}'.format(path)) | |
try: | |
metadata = conn.files_get_metadata(path) | |
logging.info('Finished describing folder on Dropbox: {}'.format(path)) | |
return metadata | |
except GetMetadataError as err: | |
print("Dropbox API error", err) | |
def list_directory(self, path, recursive=False): | |
""" | |
Returns a list of files on Dropbox. | |
:param path: full path to the remote directory to list | |
:type path: str | |
:param recursive: If true, the list folder operation will be applied | |
recursively to all subfolders and the response will contain contents of all subfolders. | |
:type recursive: bool | |
""" | |
conn = self.get_conn() | |
logging.info('Listing folder on Dropbox: {}'.format(path)) | |
try: | |
files = conn.files_list_folder(path, recursive) | |
logging.info('Finished listing folder on Dropbox: {}'.format(path)) | |
return files | |
except ListFolderError as err: | |
print("Dropbox API error", err) | |
def create_directory(self, path): | |
""" | |
Creates a directory on Dropbox at the given path. | |
:param path: full path to the remote directory to create | |
:type path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Creating folder on Dropbox: {}'.format(path)) | |
try: | |
conn.files_create_folder(path) | |
logging.info('Finished creating folder on Dropbox: {}'.format(path)) | |
except CreateFolderError as err: | |
print("Dropbox API error", err) | |
def copy(self, remote_source_path, remote_destination_path): | |
""" | |
Copies a remote file or directory from source to destination on Dropbox. | |
:param remote_source_path: full path to the remote source to copy from | |
:type remote_source_path: str | |
:param remote_destination_path: full path to the remote destination to copy to | |
:type remote_destination_path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Copying files on Dropbox: {} to {}'.format(remote_source_path, remote_destination_path)) | |
try: | |
conn.files_copy(remote_source_path, remote_destination_path) | |
logging.info('Finished copying files on Dropbox: {}'.format(remote_source_path, remote_destination_path)) | |
except RelocationError as err: | |
print("Dropbox API error", err) | |
def move(self, remote_source_path, remote_destination_path): | |
""" | |
Moves a remote file or directory from source to destination on Dropbox. | |
:param remote_source_path: full path to the remote source to move from | |
:type remote_source_path: str | |
:param remote_destination_path: full path to the remote destination to move to | |
:type remote_destination_path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Moving files on Dropbox: {} to {}'.format(remote_source_path, remote_destination_path)) | |
try: | |
conn.files_move(remote_source_path, remote_destination_path) | |
logging.info('Finished moving files on Dropbox: {}'.format(remote_source_path, remote_destination_path)) | |
except RelocationError as err: | |
print("Dropbox API error", err) | |
def retrieve_file(self, remote_full_path, local_full_path): | |
""" | |
Transfers the remote file to a local location. | |
:param remote_full_path: full path to the remote source to retrieve from | |
:type remote_full_path: str | |
:param local_full_path: full path to the local destination to save to | |
:type local_full_path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Retrieving file from Dropbox: {}'.format(remote_full_path)) | |
try: | |
conn.files_download_to_file(local_full_path, remote_full_path) | |
logging.info('Finished retrieving file from Dropbox: {}'.format(remote_full_path)) | |
except HttpError as err: | |
print("Dropbox API error", err) | |
def store_file(self, remote_full_path, local_full_path): | |
""" | |
Transfers a local file to the remote location. | |
:param remote_full_path: full path to the remote source to save to | |
:type remote_full_path: str | |
:param local_full_path: full path to the local destination to retrieve from | |
:type local_full_path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Uploading file to Dropbox: {}'.format(remote_full_path)) | |
with open(local_full_path, 'rb') as f: | |
try: | |
# We use WriteMode=overwrite to make sure that the settings in the file are changed on upload | |
conn.files_upload(f.read(), remote_full_path, mode=WriteMode('overwrite')) | |
logging.info('Finished uploading file to Dropbox: {}'.format(remote_full_path)) | |
except UploadError as err: | |
print("Dropbox API error", err) | |
def delete(self, path): | |
""" | |
Deletes a file or directory on Dropbox. | |
:param path: full path to the remote file or directory to delete | |
:type path: str | |
""" | |
conn = self.get_conn() | |
logging.info('Deleting file on Dropbox: {}'.format(path)) | |
try: | |
conn.permanently_delete(path) | |
logging.info('Finished deleting file on Dropbox: {}'.format(path)) | |
except DeleteError as err: | |
print("Dropbox API error", err) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Installation Instructions
dropbox_hook.py
to Airflow worker container (as root) to this location:/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/dropbox_hook.py
_hooks
object in/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/__init__.py
and add an entry for'dropbox_hook': [DropboxHook']
to it