Last active
July 3, 2021 04:51
-
-
Save PandaWhoCodes/ed847c54d6ffe6236ba2caae8a3e27ef to your computer and use it in GitHub Desktop.
parse and filter out wordpress posts. python filter_posts.py [XML File name] [txt or html] [keyword1 keyword2 ....]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from parse import find_posts, set_filename | |
import sys | |
import os | |
import string | |
from bs4 import BeautifulSoup | |
def format_filename(s): | |
""" | |
Take a string and return a valid filename constructed from the string. | |
""" | |
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits) | |
filename = ''.join(c for c in s if c in valid_chars) | |
filename = filename.replace(' ', '_') # I don't like spaces in filenames. | |
return filename | |
def create_folder(folder_name): | |
if not os.path.exists(folder_name): | |
os.mkdir(folder_name) | |
def get_html(post_title, post_body): | |
return "<html><body>" + post_title + "</br>" + post_body + "</html></body>" | |
def get_text(html): | |
soup = BeautifulSoup(html, "html.parser") | |
return soup.text | |
def save_posts(posts, folder_name, html): | |
create_folder(folder_name) | |
for number, post in enumerate(posts): | |
filename = "( " + str(number + 1) + " ) " + post.title | |
if html: | |
filename += ".html" | |
else: | |
filename += ".txt" | |
with open(folder_name + "/" + format_filename(filename), "w", encoding="utf-8") as f: | |
if html: | |
f.write(get_html(post.title, post.body)) | |
else: | |
f.write(post.title) | |
f.write("\n") | |
f.write(get_text(post.body)) | |
def get_posts(posts, filter_words): | |
ALL = len(filter_words) == 0 | |
final_posts = [] | |
for post in posts: | |
if not ALL: | |
for word in filter_words: | |
if word in post.title.lower(): | |
final_posts.append(post) | |
break | |
else: | |
final_posts.append(post) | |
return final_posts | |
if __name__ == '__main__': | |
filename = sys.argv[1] | |
print("Loading file:", filename) | |
tree, namespaces = set_filename(filename) | |
posts = find_posts(tree) | |
html = sys.argv[2] == "html" | |
if len(sys.argv) < 4: | |
print("Do you want to extract all the blog posts?") | |
inp = input("Enter 'y' for yes and 'n' for no.") | |
if inp.lower() != 'y': | |
sys.exit(1) | |
else: | |
keywords = [] | |
else: | |
keywords = sys.argv[3:] | |
print(keywords) | |
wordpress_posts = get_posts(posts, keywords) | |
save_posts(wordpress_posts, 'posts', html) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lxml import etree | |
from progressbar import ProgressBar, Percentage, Bar | |
from dateutil import parser | |
import re | |
import unidecode | |
import datetime | |
import pytz | |
import requests | |
import os | |
DEBUG = False | |
# namespaces = None | |
# tree = None | |
# FILENAME = None | |
def set_filename(name): | |
global FILENAME, tree, namespaces | |
FILENAME = name | |
tree = etree.parse(FILENAME) | |
namespaces = tree.getroot().nsmap | |
# print(namespaces) | |
return tree, namespaces | |
def set_debug(status): | |
global DEBUG | |
DEBUG = status | |
def slugify(string): | |
if string is not None: | |
string = unidecode.unidecode(string).lower() | |
return re.sub(r'\W+', '-', string) | |
else: | |
return "" | |
class Post: | |
""" Ommitted from the XML standard: | |
pubDate | |
guid | |
excerpt:encoded | |
post_date_gmt | |
post_type | |
post_password | |
is_sticky | |
""" | |
def __init__(self, id=None, title=None): | |
self.id = id | |
self.title = title | |
self.description = None | |
self.creator = None | |
self.body = None | |
self.url = None | |
self.post_date = datetime.datetime.now() | |
self.comment_status = "open" | |
self.ping_status = "open" | |
self.slug = slugify(title) | |
self.status = "publish" | |
self.parent = None | |
self.menu_order = 0 | |
self.tags = [] | |
self.categories = [] | |
self.comments = [] | |
def adjust_paths(self, attachments=None, prefix=''): | |
if prefix is not '' and not prefix.endswith('/'): | |
print("[ERRR] Your attachment prefix does not end in a trailing slash") | |
return False | |
if self.body is not None and attachments is not None: | |
for attachment in attachments: | |
if attachment.url in self.body: | |
new_url = prefix + attachment.url.split('/')[-1] | |
self.body = self.body.replace(attachment.url, new_url) | |
if DEBUG: | |
print("[DEBG] Replaced " + attachment.url + " with " + new_url) | |
def fix_paragraphs(self): | |
fixed = self.body.replace('\n', '</p><p>') | |
fixed = '<p>' + fixed + '</p>' | |
fixed = fixed.replace('</p><p></p><p>', '</p><p>') | |
self.body = fixed | |
def fix_more(self): | |
fixed = self.body.replace('<!--more-->', '[[MORE]]') | |
self.body = fixed | |
class Attachment: | |
def __init__(self, id=None, title=None, url=None): | |
self.id = id | |
self.title = title | |
self.url = url | |
def download(self, path='attachments'): | |
if self.url is not None: | |
title = self.url.split('/')[-1] | |
attachment = requests.get(self.url) | |
if attachment.status_code == requests.codes.ok: | |
f = open(os.path.join(path, title), 'wb') | |
f.write(attachment.content) | |
f.close() | |
else: | |
attachment.raise_for_status() | |
def find_blog(tree): | |
if tree.find(".//title") is not None: | |
title = tree.find(".//title").text | |
url = tree.find(".//link").text | |
description = tree.find(".//description").text | |
exported = tree.find(".//pubDate").text | |
language = tree.find(".//language").text | |
print("Found %s" % title) | |
def find_authors(tree): | |
author_elems = tree.findall(".//wp:author", namespaces=namespaces) | |
authors = [] | |
for author_elem in author_elems: | |
login = author_elem.find("./wp:author_login", namespaces=namespaces) | |
email = author_elem.find("./wp:author_email", namespaces=namespaces) | |
username = author_elem.find("./wp:author_display_name", namespaces=namespaces) | |
first_name = author_elem.find("./wp:author_first_name", namespaces=namespaces) | |
last_name = author_elem.find("./wp:author_last_name", namespaces=namespaces) | |
authors.append({ | |
'login': login, | |
'email': email, | |
'username': username, | |
'first_name': first_name, | |
'last_name': last_name | |
}) | |
if len(authors) > 0: | |
print("Found %i authors" % len(authors)) | |
return authors | |
else: | |
print("[WARN] Found no authors!") | |
return False | |
def find_tags(tree): | |
tag_elems = tree.findall(".//wp:tag", namespaces=namespaces) | |
tags = [] | |
for tag_elem in tag_elems: | |
slug = tag_elem.find("./wp:tag_slug", namespaces=namespaces) | |
name = tag_elem.find("./wp:tag_name", namespaces=namespaces) | |
tags.append({ | |
'slug': slug, | |
'name': name | |
}) | |
if len(tags) > 0: | |
print("Found %i tags" % len(tags)) | |
return tags | |
else: | |
print("[WARN] Found no tags!") | |
return False | |
def find_posts(tree, published=True): | |
if published: | |
xpath = ".//item[wp:post_type='post' and wp:status='publish']" | |
item_elems = tree.xpath(xpath, namespaces=namespaces) | |
else: | |
item_elems = tree.findall(".//item[wp:post_type='post']", namespaces=namespaces) | |
posts = [] | |
for post_elem in item_elems: | |
post = Post(str(post_elem.find("./wp:post_id", namespaces=namespaces).text), | |
str(post_elem.find("./title").text)) | |
post.url = str(post_elem.find("./link").text) | |
post.body = str(post_elem.find("./content:encoded", namespaces=namespaces).text) | |
post_stamp = parser.parse(post_elem.find("./wp:post_date", namespaces=namespaces).text) | |
local = pytz.timezone("America/Chicago") | |
local_stamp = local.localize(post_stamp, is_dst=None) | |
utc_stamp = local_stamp.astimezone(pytz.utc) | |
post.post_date = utc_stamp | |
tag_elems = post_elem.xpath("./category[@domain='post_tag']") | |
tags = [] | |
if tag_elems is not None: | |
for tag in tag_elems: | |
tags.append(tag.get('nicename')) | |
post.tags = tags | |
posts.append(post) | |
if len(posts) > 0: | |
# print("Found %i posts" % len(posts)) | |
return posts | |
else: | |
print("[WARN] Found no posts!") | |
return False | |
def find_attachments(tree, download=True): | |
xpath = ".//item[wp:post_type='attachment']" | |
attachment_elems = tree.xpath(xpath, namespaces=namespaces) | |
attachments = [] | |
for attachment_elem in attachment_elems: | |
attachment = Attachment(attachment_elem.find("./wp:post_id", namespaces=namespaces).text, | |
str(attachment_elem.find("./title").text), | |
attachment_elem.find("./wp:attachment_url", namespaces=namespaces).text) | |
attachments.append(attachment) | |
if len(attachments) > 0: | |
print("Found %i attachments" % len(attachments)) | |
if download: | |
print("Downloading %i attachments" % len(attachments)) | |
progress = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(attachments)).start() | |
for i, attachment in enumerate(attachments): | |
attachment.download('attachments') | |
progress.update(i) | |
progress.finish() | |
# print("Downloaded %i attachments" % len(attachments)) | |
return attachments | |
else: | |
print("[WARN] Found no attachments!") | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment