Last active
February 17, 2022 09:39
-
-
Save maxnoe/bdea2715daf0e04beba2c7fbfdfb27f9 to your computer and use it in GitHub Desktop.
Download shared directory from a seafile server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Download files from a password protected seafile share. | |
Converted to use python from https://gist.github.com/vmx/ed6f91b93bc3c4f7e152dcd5f377ecef | |
''' | |
import requests | |
from argparse import ArgumentParser | |
import logging | |
from urllib.parse import urlparse | |
from pathlib import Path | |
from tqdm import tqdm | |
from getpass import getpass | |
import sys | |
log = logging.getLogger('seafile') | |
parser = ArgumentParser() | |
parser.add_argument('url') | |
parser.add_argument('directory') | |
parser.add_argument('outputdir', type=Path) | |
parser.add_argument('--password') | |
def download_file(session, url, path, chunk_size=10240, progress=False, **kwargs): | |
log.info(f"Downloading {url} to {path}") | |
name = urlparse(url).path.split("/")[-1] | |
path = Path(path) | |
with session.get(url, stream=True, timeout=5, **kwargs) as r: | |
# make sure the request is successful | |
r.raise_for_status() | |
total = float(r.headers.get("Content-Length", float("inf"))) | |
pbar = tqdm( | |
total=total, | |
disable=not progress, | |
unit="B", | |
unit_scale=True, | |
desc=f"Downloading {name}", | |
) | |
# open a .part file to avoid creating | |
# a broken file at the intended location | |
part_file = path.with_suffix(path.suffix + ".part") | |
try: | |
part_file.parent.mkdir(parents=True, exist_ok=True) | |
with part_file.open("wb") as f: | |
for chunk in r.iter_content(chunk_size=chunk_size): | |
f.write(chunk) | |
pbar.update(len(chunk)) | |
except: # we really want to catch everythin here | |
# cleanup part file if something goes wrong | |
if part_file.is_file(): | |
part_file.unlink() | |
raise | |
# when successful, move to intended location | |
part_file.rename(path) | |
def main(): | |
args = parser.parse_args() | |
password = args.password or getpass() | |
logging.basicConfig(level=logging.INFO) | |
parsed_url = urlparse(args.url.rstrip('/')) | |
token = parsed_url.path.split('/')[-1] | |
# make sure we have a single '/' at the end | |
url = args.url.rstrip('/') + '/' | |
session = requests.session() | |
session.headers['referer'] = url | |
# get the csrf token cookie | |
session.get(url) | |
# login | |
session.post( | |
# make sure there is / at the end | |
url, | |
data={ | |
'csrfmiddlewaretoken': session.cookies['sfcsrftoken'], | |
'token': token, | |
'password': password, | |
} | |
) | |
if "sessionid" not in session.cookies: | |
log.critical('Login failed') | |
sys.exit(1) | |
# get file listing | |
api_url = f'{parsed_url.scheme}://{parsed_url.hostname}/api/v2.1/share-links/{token}/dirents/' | |
ret = session.get(api_url, params=dict(path=args.directory)) | |
if ret.status_code == 404: | |
logging.critical(f"Did not find path {args.directory} for given URL") | |
sys.exit(1) | |
paths = [Path(p['file_path']) for p in ret.json()['dirent_list']] | |
args.outputdir.mkdir(parents=True, exist_ok=True) | |
for path in paths: | |
download_file( | |
session, | |
f'{url}files/', | |
args.outputdir / path.name, | |
params=dict(p=str(path), dl=1), | |
progress=True, | |
) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment