Created
March 28, 2020 05:35
-
-
Save lionelyoung/d640669f3fdeba36760cee02e37cb8c1 to your computer and use it in GitHub Desktop.
Download xml and parse it into org mode for reading. Requires the full content to be in the RSS feed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Download xml and parse it into org mode for reading | |
# Requires the full content to be in the RSS feed | |
import argparse | |
from lxml import etree | |
import logging | |
import os | |
import requests | |
import subprocess | |
import sys | |
ROOT_DIR = os.path.dirname(os.path.realpath(__file__)) | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
def dl_url(url, fpath): | |
""" | |
Args: | |
url (str): url | |
fpath (str): fpath | |
Returns: | |
str: xml content | |
""" | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36", | |
} | |
try: | |
with open(fpath, "r",) as f: | |
payload = f.read() | |
logger.info(f"Found cache {fpath}") | |
except FileNotFoundError: | |
response = requests.get(url, headers=headers) | |
with open(fpath, "wb") as f: | |
f.write(response.content) | |
logger.info(f"Downloaded {fpath}") | |
payload = response.content | |
return payload | |
def main(tag): | |
logger.info(f"Using tag: {tag}") | |
# Download | |
url = f"http://{tag}.blogspot.com/feeds/posts/default" | |
xml_fpath = os.path.join(ROOT_DIR, "output", f"{tag}.xml") | |
dl_url(url, xml_fpath) | |
# Write html to file | |
tag_fdpath = os.path.join(ROOT_DIR, "output", tag) | |
if not os.path.exists(tag_fdpath): | |
os.makedirs(tag_fdpath) | |
logger.debug(f"Created {tag_fdpath}") | |
# Parse xml | |
skipped = 0 | |
xmlns = "{http://www.w3.org/2005/Atom}" | |
element = etree.parse(xml_fpath) | |
post_contents = element.findall("{0}entry/{0}content".format(xmlns)) | |
post_titles = element.findall("{0}entry/{0}title".format(xmlns)) | |
for i, (ptitle, pcontent) in enumerate(zip(post_titles, post_contents)): | |
html_fpath = os.path.join(tag_fdpath, f"{tag}_post_{i:03}.html") | |
if os.path.exists(html_fpath): | |
logger.debug(f"SKIP {i} {ptitle.text}") | |
skipped += 1 | |
continue | |
html = str(pcontent.text) | |
with open(html_fpath, "w") as f: | |
f.write(html) | |
logger.info(f"Wrote <{ptitle.text}> {html_fpath}, skipped {skipped}") | |
logger.info(f"Done writing html, skipped {skipped}") | |
# Parse html to org | |
fnames = os.listdir(tag_fdpath) | |
fnames.sort() | |
fnames = [f for f in fnames if ".html" in f] | |
skipped = 0 | |
for i, html_fname in enumerate(fnames): | |
html_fpath = os.path.join(tag_fdpath, html_fname) | |
org_fpath = html_fpath.replace(".html", ".org") | |
if os.path.exists(org_fpath): | |
skipped += 1 | |
continue | |
command = f"pandoc {html_fpath} -o {org_fpath}" | |
subprocess.call(command, shell=True) | |
logger.info(f"Converted {i} from html to org") | |
logger.info(f"Done converting to org, skipped {skipped}") | |
# Combine all the org files into one org file with header | |
tag_fnames = os.listdir(tag_fdpath) | |
org_fnames = [f for f in tag_fnames if ".org" in f] | |
org_fnames.sort() | |
posts = [] | |
for i, org_fname in enumerate(org_fnames): | |
org_fpath = os.path.join(tag_fdpath, org_fname) | |
with open(org_fpath, "r") as f: | |
posts.append(f.read()) | |
# write it to a file | |
posts_fpath = os.path.join(ROOT_DIR, "output", f"{tag}_posts.org") | |
payload = [] | |
payload.append(f"* {tag}") | |
for ptitle, pcontent in zip(post_titles, posts): | |
title = ptitle.text | |
if not title: | |
title = "Post" | |
payload.append(f"** {title}") | |
payload.append(pcontent) | |
with open(posts_fpath, "w") as f: | |
f.write("\n".join(payload)) | |
logger.info(f"Done creating org: {posts_fpath}") | |
def make_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-t", | |
"--tag", | |
action="store", | |
required=True, | |
help="Tag is the blog name, like this: http://<TAG>.blogspot.com/feeds/posts/default", | |
) | |
parser.set_defaults() | |
args = parser.parse_args() | |
return args | |
if __name__ == "__main__": | |
args = make_args() | |
main(tag=args.tag) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment