Last active
February 2, 2021 18:17
-
-
Save svagionitis/c198dd990429258528821a23f1d8f1bd to your computer and use it in GitHub Desktop.
An example of webscraping in Python with requests an BeuatifulSoup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An example of webscraping in Python with requests an BeuatifulSoup | |
Scraping lab test data from https://labtestsonline.org.uk/tests-index | |
This script will get a list of the lab tests from the above link and then get | |
more information for each test following the appropriate link. The web page stores | |
the data in Drupal 8. The lab tests are saved in JSON files. | |
""" | |
import logging | |
from datetime import datetime | |
import json | |
import random | |
import time | |
import os | |
import re | |
import sqlite3 | |
from sqlite3 import Error | |
import requests | |
from bs4 import BeautifulSoup | |
LOGGER = logging.getLogger(__name__) | |
def parseLinkTags(soup): | |
""" | |
Parse data in the following format | |
<link rel="canonical" href="https://labtestsonline.org.uk/tests/17-hydroxyprogesterone" /> | |
<link rel="alternate" hreflang="en-gb" href="https://labtestsonline.org.uk/tests/17-hydroxyprogesterone" /> | |
<link rel="alternate" hreflang="en-us" href="https://labtestsonline.org/tests/17-hydroxyprogesterone" /> | |
<link rel="alternate" hreflang="hu-hu" href="https://labtestsonline.hu/tests/17-hidroxiprogeszteron" /> | |
<link rel="revision" href="https://labtestsonline.org.uk/tests/17-hydroxyprogesterone" /> | |
""" | |
data = {} | |
alternate_list = [] | |
link_tags = soup.find_all("link") | |
for link_tag in link_tags: | |
if "rel" in link_tag.attrs: | |
if link_tag["rel"][0] == "canonical" or link_tag["rel"][0] == "revision": | |
data[link_tag["rel"][0]] = link_tag["href"] | |
elif link_tag["rel"][0] == "alternate": | |
alternate = {link_tag["hreflang"]: link_tag["href"]} | |
alternate_list.append(alternate) | |
data[link_tag["rel"][0]] = alternate_list | |
LOGGER.debug(data) | |
link_data = {"link_info": data} | |
return link_data | |
def parseMetaTags(soup): | |
""" | |
Parse data in the following format | |
<meta name="title" content="17-Hydroxyprogesterone" /> | |
<meta name="description" content="Describes how the 17-hydroxyprogesterone (17-OHP) test is used, when a 17-OHP test is requested, and what the results of a 17-hydroxyprogesterone test might mean" /> | |
<meta name="Generator" content="Drupal 8 (https://www.drupal.org)" /> | |
<meta name="MobileOptimized" content="width" /> | |
<meta name="HandheldFriendly" content="true" /> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> | |
<meta content="ie=edge, chrome=1" http-equiv="x-ua-compatible" /> | |
<meta http-equiv="ImageToolbar" content="false" /> | |
""" | |
data = {} | |
meta_tags = soup.find_all("meta") | |
for meta_tag in meta_tags: | |
if "charset" in meta_tag.attrs: | |
data["charset"] = meta_tag["charset"] | |
elif "name" in meta_tag.attrs: | |
data[meta_tag["name"]] = meta_tag["content"] | |
elif "http-equiv" in meta_tag.attrs: | |
data[meta_tag["http-equiv"]] = meta_tag["content"] | |
LOGGER.debug(data) | |
meta_data = {"meta_info": data} | |
return meta_data | |
def parseFieldWrapperWithTwoFieldItems(soup): | |
""" | |
Parse data in the following format | |
<div class="field-wrapper field field-paragraph--field-label field-name-field-label field-type-string field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The key part of dict</div> | |
</div> | |
</div> | |
<div class="field-wrapper field field-paragraph--field-body field-name-field-body field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item expandable-text">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
# There will be 2 field-wrappers | |
field_wrapper_divs = soup.find_all("div", class_=["field-wrapper"]) | |
LOGGER.debug(field_wrapper_divs) | |
field_item_div_0 = ( | |
field_wrapper_divs[0] | |
.find("div", class_=["field-items"]) | |
.find("div", class_=["field-item"]) | |
) | |
field_item_div_1 = ( | |
field_wrapper_divs[1] | |
.find("div", class_=["field-items"]) | |
.find("div", class_=["field-item"]) | |
) | |
# Cleanup multiple whitespaces | |
# See https://stackoverflow.com/questions/30936020/replace-multiple-occurrences-of-any-special-character-by-one-in-python | |
clean_up_field_item_div_0 = re.sub(r"(\s)(?=\1)", "", field_item_div_0.text.strip()) | |
clean_up_field_item_div_1 = re.sub(r"(\s)(?=\1)", "", field_item_div_1.text.strip()) | |
data[clean_up_field_item_div_0] = clean_up_field_item_div_1 | |
LOGGER.debug(data) | |
return data | |
def parseRelatedContentDivWithFieldWrapper(soup): | |
""" | |
Parse data in the following format | |
<div data-magellan-target="Related_Content" id="Related_Content" class="field-item"> | |
<div class="field-wrapper field field-paragraph--field-label field-name-field-label field-type-string field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The key part of dict</div> | |
</div> | |
</div> | |
<div class="field-wrapper field field-paragraph--field-body field-name-field-body field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item expandable-text">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
related_content_div = soup.find("div", id="Related_Content") | |
if related_content_div is None: | |
return data | |
data = parseFieldWrapperWithTwoFieldItems(related_content_div) | |
LOGGER.debug(data) | |
return data | |
def parseCommonQuestionsWithAccordionElements(soup): | |
""" | |
Parse data in the following format | |
<div data-magellan-target="Common_Questions" id="Common_Questions" class="field-item"> | |
<ul class="accordion accordion-element" data-accordion data-allow-all-closed="true"> | |
<li class="accordion-item" data-accordion-item> | |
<div class="field-wrapper field field-paragraph--field-label field-name-field-label field-type-string field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The key part of dict</div> | |
</div> | |
</div> | |
<div class="accordion-content" data-tab-content> | |
<div class="field-wrapper field field-paragraph--field-body field-name-field-body field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
</div> | |
</li> | |
</ul> | |
</div> | |
""" | |
data = {} | |
common_questions_div = soup.find("div", id="Common_Questions") | |
if common_questions_div is None: | |
return data | |
# It should be only one accordion ul | |
accordion_ul = common_questions_div.find( | |
"ul", class_=["accordion accordion-element"] | |
) | |
LOGGER.debug(accordion_ul) | |
accordion_items_li = accordion_ul.find_all("li", class_=["accordion-item"]) | |
for accordion_item_li in accordion_items_li: | |
data.update(parseFieldWrapperWithTwoFieldItems(accordion_item_li)) | |
LOGGER.debug(data) | |
return data | |
def parseWhatIsBeingTestedDivWithExpandableTextWrapper(soup): | |
""" | |
Parse data in the following format | |
<div data-magellan-target="What_is_being_tested_" id="What_is_being_tested_" class="field-item"> | |
<div class="expandable-text-wrapper"> | |
<div class="field-wrapper field field-paragraph--field-label field-name-field-label field-type-string field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The key part of dict</div> | |
</div> | |
</div> | |
<div class="field-wrapper field field-paragraph--field-body field-name-field-body field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item expandable-text">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
what_is_being_tested_div = soup.find("div", id="What_is_being_tested_") | |
if what_is_being_tested_div is None: | |
return data | |
expandable_text_wrapper_divs = what_is_being_tested_div.find_all( | |
"div", class_=["expandable-text-wrapper"] | |
) | |
LOGGER.debug(expandable_text_wrapper_divs) | |
for expandable_text_wrapper_div in expandable_text_wrapper_divs: | |
data.update(parseFieldWrapperWithTwoFieldItems(expandable_text_wrapper_div)) | |
LOGGER.debug(data) | |
return data | |
def parseFeaturetteWrapperWithBlockContentWrapper(soup): | |
""" | |
Parse data in the following format | |
<div class="featurette-wrapper field-items looking-for-buttons row"> | |
<div class="block-content-wrapper"> | |
<button class="button expandable-button"> | |
<span class="icon-health-chart"></span> | |
<div class="button-title">Some text here. The key part of dict</div> | |
</button> | |
<div class="button-content"> | |
<div class="field-wrapper field field-paragraph--field-default-test-result-conten field-name-field-default-test-result-conten field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
featurette_wrapper_div = soup.find("div", class_=["featurette-wrapper"]) | |
if featurette_wrapper_div is None: | |
return data | |
block_content_wrapper_divs = featurette_wrapper_div.find_all( | |
"div", class_=["block-content-wrapper"] | |
) | |
LOGGER.debug(block_content_wrapper_divs) | |
for block_content_wrapper_div in block_content_wrapper_divs: | |
LOGGER.debug(block_content_wrapper_div) | |
button_tag = block_content_wrapper_div.find( | |
"button", class_=["button expandable-button"] | |
) | |
button_title_div = button_tag.find("div", class_=["button-title"]) | |
button_content_div = block_content_wrapper_div.find( | |
"div", class_=["button-content"] | |
) | |
field_wrapper_div = button_content_div.find("div", class_=["field-wrapper"]) | |
if field_wrapper_div is None: | |
continue | |
button_content_field_item_div = field_wrapper_div.find( | |
"div", class_=["field-items"] | |
).find("div", class_=["field-item"]) | |
data[button_title_div.text.strip()] = button_content_field_item_div.text | |
LOGGER.debug(data) | |
return data | |
def parseAtAGlanceDivWithGridWrapper(soup): | |
""" | |
Parse data in the following format | |
<div data-magellan-target="At_a_Glance" id="At_a_Glance" class="field-item"> | |
<div class="grid-wrapper" data-equalizer-watch> | |
<div class="field-wrapper field field-paragraph--field-label field-name-field-label field-type-string field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The key part of dict</div> | |
</div> | |
</div> | |
<div class="field-wrapper field field-paragraph--field-body field-name-field-body field-type-text-long field-label-hidden"> | |
<div class="field-items"> | |
<div class="field-item">Some text here. The data part of dict</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
at_a_glance_div = soup.find("div", id="At_a_Glance") | |
if at_a_glance_div is None: | |
return data | |
grid_wrapper_divs = at_a_glance_div.find_all("div", class_=["grid-wrapper"]) | |
LOGGER.debug(grid_wrapper_divs) | |
for grid_wrapper_div in grid_wrapper_divs: | |
data.update(parseFieldWrapperWithTwoFieldItems(grid_wrapper_div)) | |
LOGGER.debug(data) | |
return data | |
def parseReviewInfoWithFieldWrapperAndFieldRevisionData(soup): | |
""" | |
Parse data in the following format | |
<div class="review-info"> | |
<div class="field-wrapper field field-node--field-reviewed field-name-field-reviewed field-type-datetime field-label-hidden"> | |
Some text <span data-tooltip aria-haspopup="true" class="has-tip top" data-disable-hover="false" tabindex="2" title="Span Title">last reviewed</span> some other text | |
<div class="field-items"> | |
<div class="field-item"><time datetime="2015-06-22T12:00:00Z">22 June 2015.</time> | |
</div> | |
</div> | |
</div> | |
<div class="field-revision-date"> | |
Some text <span data-tooltip aria-haspopup="true" class="has-tip top" data-disable-hover="false" tabindex="2" title="Span Title">last modified</span> | |
on 21 October 2020. | |
</div> | |
</div> | |
""" | |
data = {} | |
# It should be only one review-info div | |
review_info_div = soup.find("div", class_=["review-info"]) | |
if review_info_div is None: | |
return data | |
field_wrapper_div = review_info_div.find("div", class_=["field-wrapper"]) | |
span_field_wrapper_div = field_wrapper_div.find("span") | |
LOGGER.debug(span_field_wrapper_div) | |
field_item_div = field_wrapper_div.find("div", class_=["field-items"]).find( | |
"div", class_=["field-item"] | |
) | |
time_tag = field_item_div.find("time") | |
LOGGER.debug(time_tag) | |
data[span_field_wrapper_div.text] = time_tag.text.strip(" .") | |
# It should be only one field-revision-date | |
field_revision_date_div = review_info_div.find( | |
"div", class_=["field-revision-date"] | |
) | |
span_field_revision_date_div = field_revision_date_div.find("span") | |
LOGGER.debug(span_field_revision_date_div) | |
data[span_field_revision_date_div.text] = field_revision_date_div.text.split("\n")[ | |
2 | |
].strip(" .") | |
LOGGER.debug(data) | |
return data | |
def parseFieldWrapperWithFieldLabelsAndFieldItems(soup): | |
""" | |
Parse data in the following format | |
<div class="field-wrapper field field-node--field-test-synonyms field-name-field-test-synonyms field-type-string field-label-inline clearfix"> | |
<div class="field-label">Some text here. The key part of dict</div> | |
<div class="field-items"> | |
<div class="field-item">Some text here. This should be a member of a list</div> | |
<div class="field-item">Some text here. This should be a member of a list</div> | |
<div class="field-item">Some text here. This should be a member of a list</div> | |
</div> | |
</div> | |
""" | |
data = {} | |
field_wrapper_divs = soup.find_all("div", class_=["field-wrapper"]) | |
for field_wrapper_div in field_wrapper_divs: | |
# Get the field-label div without the field-label-hidden | |
field_label_div = field_wrapper_div.find( | |
lambda tag: tag.name == "div" and tag.get("class") == ["field-label"] | |
) | |
if field_label_div is None: | |
# If there is no field-label go to the next field-wrapper | |
continue | |
LOGGER.debug("FIELD-LABEL: %s", field_label_div) | |
data[field_label_div.text] = [] | |
field_items_div = field_wrapper_div.find("div", class_=["field-items"]) | |
if field_items_div is None: | |
continue | |
field_item_div = field_items_div.find_all("div", class_=["field-item"]) | |
for field_item in field_item_div: | |
LOGGER.debug("FIELD-ITEM: %s", field_item) | |
data[field_label_div.text].append(field_item.text) | |
LOGGER.debug(data) | |
return data | |
def parseLabtestsonlineTestPage(html): | |
""" | |
Parse the link of a lab test and get more information about a lab tests | |
""" | |
LOGGER.debug("html: %s", html) | |
soup = BeautifulSoup(html, "html.parser") | |
labtest_info = {} | |
link_info = parseLinkTags(soup) | |
labtest_info.update(link_info) | |
meta_info = parseMetaTags(soup) | |
labtest_info.update(meta_info) | |
info = parseFieldWrapperWithFieldLabelsAndFieldItems(soup) | |
labtest_info.update(info) | |
info = parseReviewInfoWithFieldWrapperAndFieldRevisionData(soup) | |
labtest_info.update(info) | |
info = parseAtAGlanceDivWithGridWrapper(soup) | |
labtest_info.update(info) | |
info = parseFeaturetteWrapperWithBlockContentWrapper(soup) | |
labtest_info.update(info) | |
info = parseWhatIsBeingTestedDivWithExpandableTextWrapper(soup) | |
labtest_info.update(info) | |
info = parseCommonQuestionsWithAccordionElements(soup) | |
labtest_info.update(info) | |
info = parseRelatedContentDivWithFieldWrapper(soup) | |
labtest_info.update(info) | |
LOGGER.debug(labtest_info) | |
return labtest_info | |
def parseLabtestsonlineTestsIndexPage(html): | |
""" | |
Parse the link of the lab tests index and get a list of lab tests | |
Next following some examples of the lab tests indexing page | |
<div class="field-content"><a href="/tests/17-hydroxyprogesterone" hreflang="en">17-Hydroxyprogesterone</a></div> | |
<div class="field-content"><a href="/tests/17-hydroxyprogesterone" hreflang="en">17-OHP</a></div> | |
<div class="field-content"><a href="/tests/urine-protein-and-urine-protein-creatinine-ratio" hreflang="en">24 Hour Urine Protein</a></div> | |
html: The HTML text of the lab tests index page | |
""" | |
LOGGER.debug("html: %s", html) | |
soup = BeautifulSoup(html, "html.parser") | |
# Get all the div tags which point to a lab test | |
field_content_divs = soup.find_all("div", class_=["field-content"]) | |
labtests_list = [] | |
for field_content_div in field_content_divs: | |
labtest = {} | |
# There should be only one a tag | |
a_tag = field_content_div.a | |
labtest["labtest_name"] = a_tag.text | |
labtest["labtest_href"] = a_tag.get("href") | |
# A unique name for the tests | |
labtest["labtest_key"] = a_tag.get("href").split("/")[-1] | |
LOGGER.debug(labtest) | |
labtests_list.append(labtest) | |
LOGGER.debug(labtests_list) | |
return labtests_list | |
def getHTMLText( | |
url, | |
user_agent="https://gist.github.com/svagionitis/c198dd990429258528821a23f1d8f1bd", | |
): | |
""" | |
Get the HTML text of a URL | |
url: The URL to get the HTML text | |
""" | |
LOGGER.info("url: %s, user_agent: %s", url, user_agent) | |
headers = {"User-Agent": user_agent} | |
try: | |
get_url = requests.get(url, headers=headers) | |
LOGGER.debug(get_url) | |
get_url.raise_for_status() | |
get_url.encoding = get_url.apparent_encoding | |
return get_url.text | |
except requests.exceptions.RequestException as req_ex: | |
LOGGER.error("Error getting the URL %s: %s", url, req_ex) | |
return None | |
def saveDataToSqliteDb( | |
data, db_dir, db_filename="lab-tests.db", table_name="lab_tests" | |
): | |
""" | |
Save data to an Sqlite DB | |
Currently the schema of the DB is very simple. There is only one table | |
with labtest_id, labtest_date_added and labtest_data where labtest_id and | |
labtest_date_added are promary key. | |
TODO: Add more fields | |
data: The data to save. | |
db_dir: The directory to save the DB. | |
db_filename: The filename of the DB. The default value is "lab-tests.db". | |
table_name: The table in DB to save the data. The default value is "lab_tests". | |
""" | |
LOGGER.debug( | |
"data: %s, db_dir: %s, db_filename: %s, table_name: %s", | |
data, | |
db_dir, | |
db_filename, | |
table_name, | |
) | |
if not os.path.exists(db_dir): | |
os.makedirs(db_dir) | |
data_db_filename = os.path.join(db_dir, db_filename) | |
# Create the table if it does not exist | |
create_table_sql = "\ | |
CREATE TABLE IF NOT EXISTS {0} ( \ | |
labtest_id TEXT NOT NULL, \ | |
labtest_date_added TEXT NOT NULL, \ | |
labtest_data JSON, \ | |
PRIMARY KEY (labtest_id, labtest_date_added) \ | |
)".format( | |
table_name | |
) | |
insert_into_sql = "INSERT INTO {0} VALUES (?, ?, ?)".format(table_name) | |
with sqlite3.connect(data_db_filename) as conn: | |
curs = conn.cursor() | |
try: | |
curs.execute(create_table_sql) | |
if data: | |
curs.execute( | |
insert_into_sql, | |
[data["labtest_key"], data["labtest_date_added"], json.dumps(data)], | |
) | |
except Error as err: | |
LOGGER.error("Error in DB: {0}".format(err.args[0])) | |
finally: | |
if curs: | |
curs.close() | |
def saveDataToJsonFile(data, data_dir, data_filename): | |
""" | |
Save data to a JSON file | |
data: The data to save. | |
data_dir: The directory to save the data. | |
data_filename: The filename of the data. | |
""" | |
LOGGER.debug( | |
"data: %s, data_dir: %s, data_filename: %s", | |
data, | |
data_dir, | |
data_filename, | |
) | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir) | |
data_json_filename = os.path.join(data_dir, data_filename) | |
with open(data_json_filename, "w") as data_json_file: | |
json.dump(data, data_json_file, indent=2) | |
def getLabtestDataFromLabtestsonlineOrgUk( | |
host="labtestsonline.org.uk", | |
prot="https", | |
labtests_index="tests-index", | |
save_to_json=True, | |
labtest_dir="labtests", | |
): | |
""" | |
Get the Lab Tests from the labtestsonline.org.uk | |
host: The host. The default value is labtestsonline.org.uk. | |
prot: The protocol. The default va;ue is https. | |
labtests_index: The index page for the lab tests. The default value is tests-index. | |
save_to_json: Flag to save to a json file. The default value is True. | |
""" | |
LOGGER.debug( | |
"host: %s, prot: %s, labtests_index: %s, save_to_json: %s, labtest_dir: %s", | |
host, | |
prot, | |
labtests_index, | |
save_to_json, | |
labtest_dir, | |
) | |
prot_host = prot + "://" + host | |
labtestsonline_index_url = prot_host + "/" + labtests_index | |
labtestsonline_index_html_txt = getHTMLText(labtestsonline_index_url) | |
labtests = parseLabtestsonlineTestsIndexPage(labtestsonline_index_html_txt) | |
visited_labtest_urls = [] | |
for labtest in labtests: | |
labtest_url = prot_host + labtest["labtest_href"] | |
if labtest_url in visited_labtest_urls: | |
LOGGER.info("Labtest URL %s already visited.", labtest_url) | |
continue | |
labtest_html_txt = getHTMLText(labtest_url) | |
# When we get successfully a url, add it to the visited list | |
visited_labtest_urls.append(labtest_url) | |
labtest_info = parseLabtestsonlineTestPage(labtest_html_txt) | |
# Add here the date retrieved | |
labtest["labtest_date_added"] = datetime.utcnow().isoformat() | |
labtest["labtest_info"] = labtest_info | |
# Save each labtest to a separate json file with name labtest["labtest_key"].json | |
# Also save it to DB | |
if save_to_json: | |
saveDataToJsonFile(labtest, labtest_dir, labtest["labtest_key"] + ".json") | |
saveDataToSqliteDb(labtest, labtest_dir) | |
LOGGER.info(labtest) | |
# Sleep for 60 to 120 seconds | |
time.sleep(random.randrange(60, 120)) | |
# Save the visited labtest pages to a json file at the end. | |
saveDataToJsonFile(visited_labtest_urls, labtest_dir, "visited-labtest-urls.json") | |
def setupLogging(level): | |
""" | |
Setup the logging levels for LOGGER | |
level: Logging level to set | |
""" | |
LOGGER.debug("level: %s", level) | |
fmt = "%(asctime)s %(levelname)s: %(message)s [%(name)s:%(funcName)s:%(lineno)d] " | |
logging.basicConfig(level=logging.getLevelName(str(level).upper()), format=fmt) | |
LOGGER.info("Log level set to %s", level) | |
def main(): | |
""" | |
Main function | |
""" | |
setupLogging("info") | |
getLabtestDataFromLabtestsonlineOrgUk() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment