|
import argparse |
|
from collections.abc import Iterable |
|
import csv |
|
import logging |
|
import requests |
|
import re |
|
import sys |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
PAGE_URL = "https://cms.westmercia.police.uk/article/{page_id}" |
|
PERSON_TITLE_RE = re.compile("(?P<role>.+)\s+(?P<id>\d+)\s+(?P<name>.+)") |
|
NEIGHBOURHOOD_TEAM_RE = re.compile("(?P<name>.*)\s+Team\s+Information") |
|
POLICING_AREA_RE = re.compile("(?P<name>.*)\s+policing\s+area") |
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
class Person: |
|
def __init__(self, id, url, name, role, telephone, mobile, email): |
|
self.id = int(id) |
|
self.url = str(url) |
|
self.name = str(name) |
|
self.role = str(role) |
|
self.telephone = str(telephone) |
|
self.mobile = str(mobile) |
|
self.email = str(email) |
|
|
|
@classmethod |
|
def fields(cls): |
|
return [ |
|
"id", |
|
"url", |
|
"name", |
|
"role", |
|
"telephone", |
|
"mobile", |
|
"email", |
|
] |
|
|
|
|
|
class Neighbourhood: |
|
def __init__(self, id, url, name, team=[]): |
|
self.id = int(id) |
|
self.url = str(url) |
|
self.name = str(name) |
|
self.team = team |
|
|
|
@classmethod |
|
def fields(cls): |
|
return [ |
|
"id", |
|
"url", |
|
"name", |
|
] |
|
|
|
|
|
class PolicingArea: |
|
def __init__(self, id, url, name, teams): |
|
self.id = int(id) |
|
self.url = str(url) |
|
self.name = str(name) |
|
self.teams = teams |
|
|
|
@classmethod |
|
def fields(cls): |
|
return [ |
|
"id", |
|
"url", |
|
"name", |
|
] |
|
|
|
|
|
def get_person(page_id, person_id): |
|
def get_definition_list_value(title): |
|
try: |
|
return soup.find("dt", text=title).findNext("dd").string |
|
except AttributeError: |
|
# The definition list doesn't contain a title element with this name |
|
log.warning(f"No title \"{title}\" in the definition list") |
|
return None |
|
|
|
def match_group_or_none(match, group): |
|
try: |
|
return match.group(group) |
|
except AttributeError: |
|
# The pattern didn't match |
|
log.warning(f"Failed to parse values from title; {group} will be None") |
|
return None |
|
|
|
log.info(f"Getting person {page_id}/{person_id}") |
|
|
|
url = PAGE_URL.format(page_id=page_id) |
|
res = requests.get(url, { |
|
"personid": person_id, |
|
}) |
|
soup = BeautifulSoup(res.text, features="html.parser") |
|
|
|
raw_title = soup.find(id="headingtext").get_text(strip=True) |
|
title = PERSON_TITLE_RE.match(raw_title) |
|
name = match_group_or_none(title, "name") |
|
role = match_group_or_none(title, "role") |
|
|
|
footer = soup.select(".personfooter dl") |
|
telephone = get_definition_list_value("Telephone") |
|
mobile = get_definition_list_value("Mobile") |
|
email = get_definition_list_value("Email") |
|
|
|
return Person( |
|
person_id, res.url, name, role, |
|
telephone, mobile, email) |
|
|
|
|
|
def get_neighbourhood(page_id): |
|
def extract_member_id(member): |
|
raw_url = member.find(class_="tile").get("href") |
|
query = parse_qs(urlparse(raw_url).query) |
|
return query["personid"][0] |
|
|
|
log.info(f"Getting neighbourhood {page_id}") |
|
|
|
url = PAGE_URL.format(page_id=page_id) |
|
res = requests.get(url) |
|
soup = BeautifulSoup(res.text, features="html.parser") |
|
|
|
team_contacts = soup.find(class_="boxheading", text="Team contacts") |
|
if team_contacts is not None: |
|
url = team_contacts.parent.get("href") |
|
log.warning(f"Asked to get a neighbourhood, but given a page other than team contacts; fetching {url}") |
|
res = requests.get(url) |
|
soup = BeautifulSoup(res.text, features="html.parser") |
|
|
|
raw_title = soup.find(id="headingtext").get_text(strip=True) |
|
title = NEIGHBOURHOOD_TEAM_RE.match(raw_title) |
|
|
|
raw_members = soup.find_all(class_="person") |
|
member_ids = [extract_member_id(m) for m in raw_members] |
|
log.info(f"Found {len(member_ids)} members ({member_ids})") |
|
members = [get_person(page_id, m) for m in member_ids] |
|
|
|
return Neighbourhood(page_id, url, title.group("name"), members) |
|
|
|
|
|
def get_policing_area(page_id): |
|
def extract_team_id(team): |
|
raw_url = team.get("href") |
|
path = urlparse(raw_url).path.split("/") |
|
return path[2] |
|
|
|
log.info(f"Getting policing area {page_id}") |
|
|
|
url = PAGE_URL.format(page_id=page_id) |
|
res = requests.get(url) |
|
soup = BeautifulSoup(res.text, features="html.parser") |
|
|
|
raw_title = soup.find(id="headingtext").get_text(strip=True) |
|
title = POLICING_AREA_RE.match(raw_title) |
|
|
|
raw_teams = soup.find(class_="maplinks").find_all("a") |
|
team_ids = [extract_team_id(t) for t in raw_teams] |
|
log.info(f"Found {len(team_ids)} teams ({team_ids})") |
|
teams = [get_neighbourhood(t) for t in team_ids] |
|
|
|
return PolicingArea(page_id, url, title.group("name"), teams) |
|
|
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig() |
|
|
|
parser = argparse.ArgumentParser(description="Get police team member data") |
|
parser.add_argument("-l", "--log-level", ) |
|
subparsers = parser.add_subparsers(dest="action") |
|
|
|
person = subparsers.add_parser("person") |
|
person.add_argument("page_id", type=int) |
|
person.add_argument("person_id", type=int) |
|
|
|
neighbourhood = subparsers.add_parser("neighbourhood") |
|
neighbourhood.add_argument("page_id", type=int) |
|
|
|
policing_area = subparsers.add_parser("policing_area") |
|
policing_area.add_argument("page_id", type=int) |
|
|
|
args = parser.parse_args() |
|
if args.log_level is not None: |
|
log.setLevel(args.log_level.upper()) |
|
|
|
if args.action == "person": |
|
writer = csv.DictWriter( |
|
sys.stdout, fieldnames=Person.fields(), dialect='excel') |
|
writer.writeheader() |
|
writer.writerow(vars(get_person(args.page_id, args.person_id))) |
|
elif args.action == "neighbourhood": |
|
writer = csv.DictWriter( |
|
sys.stdout, |
|
fieldnames=Person.fields() |
|
+ [f"neighbourhood_{f}" for f in Neighbourhood.fields()], |
|
dialect='excel') |
|
writer.writeheader() |
|
neighbourhood = get_neighbourhood(args.page_id) |
|
neighbourhood_fields = { |
|
f"neighbourhood_{f}": getattr(neighbourhood, f) |
|
for f in Neighbourhood.fields() |
|
} |
|
for member in neighbourhood.team: |
|
writer.writerow({ |
|
**neighbourhood_fields, |
|
**vars(member), |
|
}) |
|
elif args.action == "policing_area": |
|
writer = csv.DictWriter( |
|
sys.stdout, |
|
fieldnames=Person.fields() |
|
+ [f"neighbourhood_{f}" for f in Neighbourhood.fields()] |
|
+ [f"policing_area_{f}" for f in PolicingArea.fields()], |
|
dialect='excel') |
|
writer.writeheader() |
|
policing_area = get_policing_area(args.page_id) |
|
policing_area_fields = { |
|
f"policing_area_{f}": getattr(policing_area, f) |
|
for f in PolicingArea.fields() |
|
} |
|
for neighbourhood in policing_area.teams: |
|
neighbourhood_fields = { |
|
f"neighbourhood_{f}": getattr(neighbourhood, f) |
|
for f in Neighbourhood.fields() |
|
} |
|
for member in neighbourhood.team: |
|
writer.writerow({ |
|
**policing_area_fields, |
|
**neighbourhood_fields, |
|
**vars(member), |
|
}) |
|
|