Created
July 7, 2020 05:08
-
-
Save gh640/5cbac32097302c8cddf3008a45ab0a97 to your computer and use it in GitHub Desktop.
サンプルコード: Blogger からエクスポートした XML からデータを抽出して Markdown に変換して保存するスクリプト
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import re | |
| from pathlib import Path | |
| from typing import Dict, Generator, Iterable, List | |
| from urllib.parse import urlparse | |
| import aiofiles | |
| import aiohttp | |
| import html2markdown | |
| from bs4 import BeautifulSoup | |
| from bs4.element import Tag | |
| from jinja2 import Template | |
| FILE_IN = './blog.xml' | |
| DIR_OUT = Path('./out') | |
| async def main() -> None: | |
| with open(FILE_IN) as f: | |
| soup = BeautifulSoup(f.read(), 'xml') | |
| # 画像をダウンロードする | |
| urls = [] | |
| for entry in soup.find_all('entry'): | |
| if is_page(entry) or is_post(entry): | |
| urls.extend(extract_image_urls_in_body(get_body(entry))) | |
| await download_files(urls) | |
| for entry in soup.find_all('entry'): | |
| # 固定ページや投稿以外のデータも entry に含まれるのでチェックしてから抽出が必要 | |
| if is_page(entry): | |
| extract_page(entry) | |
| if is_post(entry): | |
| extract_post(entry) | |
| def extract_image_urls_in_body(body: str) -> List[str]: | |
| """投稿のボディ内の画像の URL を全件抽出する""" | |
| soup = BeautifulSoup(body, 'html.parser') | |
| return [x.attrs['src'] for x in soup.find_all('img')] | |
| async def download_files(urls: List[str]) -> None: | |
| """投稿のボディ内の画像を全件ダウンロードする | |
| ファイルは `DIR_OUT` 以下の `images` ディレクトリに保存する | |
| """ | |
| parent = DIR_OUT / 'images' | |
| if not parent.is_dir(): | |
| parent.mkdir(parents=True) | |
| def filepath(url): | |
| return parent / Path(url).name | |
| await asyncio.gather(*[download_file(url, filepath(url)) for url in urls]) | |
| async def download_file(url: str, filepath: Path) -> None: | |
| """ファイルを 1 件ダウンロードする""" | |
| print('Downloading {}...'.format(filepath.name)) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as resp: | |
| if resp.status == 200: | |
| async with aiofiles.open(filepath, mode='wb') as f: | |
| await f.write(await resp.read()) | |
| def extract_page(entry: Tag): | |
| """「ページ」のデータを抽出する | |
| - タイトル | |
| - 本文 | |
| - 投稿日 | |
| - 更新日 | |
| - URL | |
| """ | |
| title = entry.find('title', type='text').text.strip() | |
| body = h2m(get_body(entry)) | |
| published = entry.find('published').text.strip() | |
| updated = entry.find('updated').text.strip() | |
| slug = path_from_url(get_url(entry)) | |
| template = """--- | |
| title: >- | |
| {{ title }} | |
| published: {{ published }} | |
| updated: {{ updated }} | |
| slug: {{ slug }} | |
| --- | |
| {{ body }} | |
| """ | |
| write(template, locals(), DIR_OUT / 'pages', slug) | |
| def extract_post(entry: Tag): | |
| """「投稿」のデータを抽出する | |
| - タイトル | |
| - 本文 | |
| - タグ | |
| - 投稿日 | |
| - 更新日 | |
| - URL | |
| - 下書きかどうか | |
| """ | |
| title = entry.find('title', type='text').text.strip() | |
| body = h2m(get_body(entry)) | |
| published = entry.find('published').text.strip() | |
| updated = entry.find('updated').text.strip() | |
| term_scheme = 'http://www.blogger.com/atom/ns#' | |
| terms = [x.attrs['term'] for x in entry.find_all('category', scheme=term_scheme)] | |
| slug = path_from_url(get_url(entry)) | |
| draft = entry.find('app:draft') is not None | |
| template = """--- | |
| title: >- | |
| {{ title }} | |
| published: {{ published }} | |
| updated: {{ updated }} | |
| slug: {{ slug }} | |
| terms: {{ terms }} | |
| --- | |
| {{ body }} | |
| """ | |
| write(template, locals(), DIR_OUT / 'posts', slug) | |
| def is_page(entry: Tag) -> bool: | |
| """ページかどうかをチェックする""" | |
| category = entry.find( | |
| 'category', | |
| scheme='http://schemas.google.com/g/2005#kind', | |
| term='http://schemas.google.com/blogger/2008/kind#page', | |
| ) | |
| return category is not None | |
| def is_post(entry: Tag) -> bool: | |
| """投稿かどうかを判定する""" | |
| category = entry.find( | |
| 'category', | |
| scheme='http://schemas.google.com/g/2005#kind', | |
| term='http://schemas.google.com/blogger/2008/kind#post', | |
| ) | |
| url = entry.find('link', type='text/html', rel='alternate') | |
| return (category is not None) and (url is not None) | |
| def h2m(html: str) -> str: | |
| """Blogger の投稿ボディの HTML を Markdown に変換する""" | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # スタイル付きの `ul` のスタイルを削除 | |
| for ul_tag in soup.find_all('ul', style=True): | |
| del ul_tag.attrs['style'] | |
| # 画像のラッパーになっている `div` を `p` に変換 | |
| for div_tag in soup.find_all('div', class_='separator'): | |
| change_tag(soup, div_tag, 'p') | |
| for div_tag in soup.find_all('div', dir='ltr'): | |
| change_tag(soup, div_tag, 'p') | |
| # WYSIWYG の改行で自動的に生成される ` ` だけからなるテキスト要素を削除 | |
| for text_element in soup.find_all(string=re.compile(r'^\s+$')): | |
| text_element.replace_with('') | |
| # 中身が `br` だけで実質空の `div` を削除 | |
| for div_tag in soup.find_all('div', class_=False): | |
| if not div_tag.get_text(): | |
| div_tag.unwrap() | |
| # ルート直下の `div` を `p` に変換 | |
| for div_tag in soup.find_all('div', recursive=False, class_=False): | |
| change_tag(soup, div_tag, 'p') | |
| # 何にも囲われていない `br` を改行を維持して削除 | |
| for br_tag in soup.find_all('br', recursive=False): | |
| br_tag.replace_with('\n') | |
| # `img` についているリンクを削除 | |
| for a_tag in soup.select('a[href*=".bp.blogspot.com"]'): | |
| a_tag.unwrap() | |
| # `img` の `src` を相対パスに変更 + アトリビュートをもろもろ削除 | |
| for img_tag in soup.select('img[src*=".bp.blogspot.com"]'): | |
| src = Path(img_tag.attrs['src']) | |
| img_tag.attrs['src'] = Path('../images') / src.name | |
| for key in [ | |
| 'border', | |
| 'width', | |
| 'height', | |
| 'data-original-width', | |
| 'data-original-height', | |
| ]: | |
| if key in img_tag.attrs: | |
| del img_tag.attrs[key] | |
| # インラインの `style` アトリビュートを削除 | |
| for tag in soup.find_all(style=True): | |
| if tag.name == 'iframe': | |
| continue | |
| del tag.attrs['style'] | |
| # `pre` の中の `br` を改行に変換 | |
| for pre_tag in soup.find_all('pre', attrs=None): | |
| for br_tag in pre_tag.find_all('br'): | |
| br_tag.replace_with('\n') | |
| # `pre` タグを ``` に変換 | |
| for pre_tag in soup.find_all('pre'): | |
| pre_tag.insert(0, '\n```\n') | |
| pre_tag.append('```\n\n') | |
| pre_tag.unwrap() | |
| # タグを変換 | |
| # h4 → h2 | |
| # h5 → h3 | |
| # h6 → h4 | |
| tag_conversion = [('h4', 'h2'), ('h5', 'h3'), ('h6', 'h4')] | |
| for tag_from, tag_to in tag_conversion: | |
| for h_tag in soup.find_all(tag_from): | |
| change_tag(soup, h_tag, tag_to) | |
| # 余計な改行を削除 | |
| soup.smooth() | |
| result = html2markdown.convert(str(soup)) | |
| result = re.compile(r'\n +\n', flags=re.MULTILINE).sub('\n', result) | |
| result = re.compile(r'\n{2,}', flags=re.MULTILINE).sub('\n\n', result) | |
| return result | |
| def get_body(entry: Tag) -> str: | |
| """entry から body を抽出する""" | |
| return entry.find('content', type='html').text | |
| def get_url(entry: Tag) -> str: | |
| """entry から URL を抽出する""" | |
| return entry.find('link', type='text/html', rel='alternate').attrs['href'] | |
| def path_from_url(url: str) -> str: | |
| """URL からパスを抽出する""" | |
| return urlparse(url).path | |
| def write(template: str, data: Dict[str, str], parent: Path, slug: str): | |
| """Markdown をファイルに書き出す""" | |
| # スラグ内の `/` を `_` に変更 | |
| slug_stripped = slug.lstrip('/').replace('/', '_') | |
| filepath = parent / (Path(slug_stripped).with_suffix('.md')) | |
| if not parent.is_dir(): | |
| parent.mkdir(parents=True) | |
| with filepath.open('w') as f: | |
| md = Template(template).render(data) | |
| f.write(md) | |
| def strip_prefixes(items: Iterable[str], prefix: str) -> Generator[str, None, None]: | |
| """要素である `str` からプリフィックスを取り除いたリストを返す""" | |
| for item in items: | |
| if item.startswith(prefix): | |
| yield item[len(prefix) :] | |
| else: | |
| yield item | |
| def change_tag(soup, tag, new_tag_name: str): | |
| """タグを別のタグに置き換える""" | |
| tag.wrap(soup.new_tag(new_tag_name)) | |
| tag.unwrap() | |
| if __name__ == '__main__': | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment