Created
January 18, 2021 21:18
-
-
Save tomowarkar/bfc18fa9137051d2cab441e9a3733de4 to your computer and use it in GitHub Desktop.
自動的に拡張子を設定して保存してくれるとラクだと思った
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import time | |
| import requests | |
| class DownloaderError(Exception): | |
| pass | |
| class Downloader: | |
| """ | |
| params: | |
| verbose [bool]: 詳細を表示 | |
| write_pages [bool]: download 時にファイルとして保存 | |
| y [bool]: yes option | |
| """ | |
| params = dict() | |
| def set_param(self, key, value): | |
| self.params.update({key: value}) | |
| def get_param(self, key, default=None): | |
| return self.params.get(key, default) | |
| @property | |
| def NAME(self): | |
| return str(self.__class__.__name__).rstrip("Downloader") + "DL" | |
| @staticmethod | |
| def to_screen(msg): | |
| print(msg) | |
| def _debug_verbose(self, text): | |
| if self.params.get("verbose", False): | |
| self.to_screen(f"[{self.NAME}] {text}") | |
| @staticmethod | |
| def wait_input(msg=""): | |
| return input(msg).strip() | |
| def wait_input_yN(self, msg): | |
| if self.params.get("y", False): | |
| return True | |
| r = self.wait_input(f"{msg} [y/N]: ") | |
| if r == "y": | |
| return True | |
| return False | |
| def _save_as_file(self, filename, bytes_content): | |
| if not self.params.get("write_pages", False): | |
| return | |
| # overwrite option | |
| if os.path.isfile(filename): | |
| if not self.wait_input_yN(f"File '{filename}' already exists. Overwrite?"): | |
| return | |
| self.to_screen(f"[download] Destination: '{filename}'") | |
| with open(filename, "wb") as f: | |
| f.write(bytes_content) | |
| def download(self, **info_dict): | |
| """ | |
| Download any data using info_dict | |
| Return a download content as bytes | |
| """ | |
| content = self._download(**info_dict) | |
| assert isinstance(content, bytes) | |
| return content | |
| def _download(self, **info_dict): | |
| self._debug_verbose("_download() This method must be implemented by subclasses") | |
| return b"" | |
| class HttpDownloader(Downloader): | |
| def _request_webpage(self, url, params=None, **kwargs): | |
| """Return a Response object""" | |
| # https://github.com/psf/requests/blob/master/requests/api.py | |
| r = requests.get(url, params, **kwargs) | |
| self._debug_verbose(f"Response {r.status_code}: {r.url}") | |
| return r | |
| @staticmethod | |
| def _guess_ext_from_content_type(content_type): | |
| pattern = { | |
| "text/plain": ".txt", | |
| "text/html": ".html", | |
| "text/csv": ".csv", | |
| "image/jpeg": ".jpeg", | |
| "image/png": ".png", | |
| "image/gif": ".gif", | |
| } | |
| for ct, ext in pattern.items(): | |
| if ct in content_type: | |
| return ext | |
| return "" | |
| def _download_webpage(self, **kwargs): | |
| """Return a tuple(page content as bytes, Response object)""" | |
| # reqiored kwargs | |
| url = kwargs.pop("url", None) | |
| if url is None: | |
| raise TypeError("_download() missing 1 required kwargs: 'url'") | |
| # optional kwargs | |
| params = kwargs.pop("params", None) | |
| filename = kwargs.pop("filename", str(time.time_ns())) | |
| self._debug_verbose("Downloading webpage") | |
| r = self._request_webpage(url, params) | |
| content_type = r.headers.get("Content-Type") | |
| ext = self._guess_ext_from_content_type(content_type) | |
| filename += ext | |
| self._save_as_file(filename, r.content) | |
| return (r.content, r) | |
| def _download(self, **kwargs): | |
| content, _ = self._download_webpage(**kwargs) | |
| return content | |
| class ImageDownloader(HttpDownloader): | |
| @staticmethod | |
| def _guess_ext_from_content_type(content_type): | |
| pattern = { | |
| "image/jpeg": ".jpeg", | |
| "image/png": ".png", | |
| "image/gif": ".gif", | |
| } | |
| for ct, ext in pattern.items(): | |
| if ct in content_type: | |
| return ext | |
| raise DownloaderError(f"Undefined or Non image content-type: {content_type}") | |
| def _download(self, **kwargs): | |
| self.params.update({"write_pages": True}) | |
| content, _ = self._download_webpage(**kwargs) | |
| return content | |
| if __name__ == "__main__": | |
| idl = ImageDownloader() | |
| idl.set_param("verbose", True) | |
| idl.download(url="https://example.com", filename="test") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment