Created
March 27, 2020 16:46
-
-
Save Ladsgroup/953021356f3ec0a7a3a3ed375666a969 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from w3lib.html import get_base_url | |
from collections import defaultdict, OrderedDict | |
import requests | |
import extruct | |
import pprint | |
import sys | |
from wikidatarefisland.external_identifier import ExternalIdentifier | |
from wikidatarefisland.wdqs_reader import WdqsReader | |
from wikidatarefisland.storage import Storage | |
from wikidatarefisland.config import BLACKLISTED_EXTERNAL_IDENTIFIERS | |
storage = Storage.newFromScript(os.path.realpath(__file__)) | |
wdqs_reader = WdqsReader() | |
whitelisted_ext_idefs = storage.get('whitelisted_ext_idefs.json') | |
external_identifier = ExternalIdentifier() | |
pp = pprint.PrettyPrinter(indent=4) | |
schemaorg_mapping = wdqs_reader.get_schemaorg_mapping() | |
wtf_mapping = defaultdict(list) | |
for case in schemaorg_mapping: | |
wtf_mapping[case['url']['value']].append(case['property']['value']) | |
non_existing_schemaorg_types = defaultdict(int) | |
class hashabledict(dict): | |
def __hash__(self): | |
return hash(tuple(sorted(self.items()))) | |
def check_pid(pid, extracted_data): | |
formatter_urls = external_identifier.get_formatter(pid) | |
if not formatter_urls: | |
return extracted_data | |
for case in wdqs_reader.get_usecases(pid): | |
value = case['value']['value'] | |
item_id = case['item']['value'].replace('http://www.wikidata.org/entity/', '') | |
url = formatter_urls[0].replace('$1', value) | |
try: | |
r = requests.get(url, timeout=30) | |
base_url = get_base_url(r.text, r.url) | |
data = extruct.extract(r.text, base_url=base_url) | |
if not data.get('microdata'): | |
return extracted_data | |
except Exception as err: | |
#print(err) | |
return extracted_data | |
for datum in data['microdata']: | |
for property_ in datum.get('properties', []): | |
if not datum['properties'][property_]: | |
continue | |
if 'http://schema.org/' + property_ in wtf_mapping: | |
data_set = extracted_data[item_id].get(property_, set()) | |
if isinstance(datum['properties'][property_], dict): | |
datum['properties'][property_] = hashabledict(datum['properties'][property_]) | |
data_set.add(datum['properties'][property_]) | |
extracted_data[item_id][property_] = data_set | |
else: | |
non_existing_schemaorg_types[property_] += 1 | |
ordered_stats = OrderedDict( | |
sorted(non_existing_schemaorg_types.items(), key=lambda t: t[1], | |
reverse=True)) | |
storage.store('non_existing_schemaorg_types.json', ordered_stats) | |
return extracted_data | |
def main(): | |
extracted_data = defaultdict(dict) | |
for pid in whitelisted_ext_idefs: | |
if pid in BLACKLISTED_EXTERNAL_IDENTIFIERS: | |
continue | |
extracted_data = check_pid(pid, extracted_data) | |
serializable_extracted_data = defaultdict(dict) | |
for qid in extracted_data: | |
for prop in extracted_data[qid]: | |
serializable_extracted_data[qid][prop] = list(extracted_data[qid][prop]) | |
storage.store('extracted_data.json', serializable_extracted_data) | |
print(non_existing_schemaorg_types) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment