Last active
June 2, 2020 15:38
-
-
Save fbparis/45eb7bb867ed7aef3c2c4fe6075addb7 to your computer and use it in GitHub Desktop.
extract text content from a source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
New kind of text extractor: | |
Phase 1: | |
- remove scripts css svg comments canvas etc | |
- add space after each html tag | |
- strip tags | |
- normalize spaces/newlines | |
- keep the result | |
Phase 2: | |
- do x times: | |
- sample n pages from N | |
- store exact matches with more than y tokens in bad_sequences | |
Phase 3: | |
- for each page: | |
- remove everything that is in bad_sequences | |
- keep the rest | |
TODO: | |
add some document from a different source, and everything that match won't be removed | |
""" | |
__title__ = 'extractor' | |
__author__ = '[email protected]' | |
__license__ = 'IV' | |
version_info = (0, 0, 1) | |
__version__ = '.'.join(map(str, version_info)) | |
import sys | |
MIN_PYTHON = (3, 7, 1) | |
assert sys.version_info >= MIN_PYTHON, f"requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer" | |
import logging | |
import pickle | |
import re | |
import requests | |
import time | |
from collections import defaultdict | |
from difflib import SequenceMatcher | |
from itertools import combinations | |
from random import sample | |
logging.basicConfig( | |
level=logging.WARNING, | |
format='%(asctime)s %(name)s %(levelname)s %(message)s' | |
) | |
logger = logging.getLogger('%s.%s' % (__title__, __name__)) | |
logger.setLevel(level=logging.INFO) | |
URLS_IN, URLS_OUT = [], [] | |
CACHE_IN, CACHE_OUT = [], [] | |
def timeit(func): | |
def timed(*args, **kwargs): | |
ts = time.time() | |
r = func(*args, **kwargs) | |
te = time.time() | |
logger.info('%s took %.4fs', func.__name__, te - ts) | |
return r | |
return timed | |
def download_html(): | |
logger.info('downloading html from sources...') | |
for url in URLS_IN: | |
r = requests.get(url) | |
CACHE_IN.append(r.text) | |
for url in URLS_OUT: | |
r = requests.get(url) | |
CACHE_OUT.append(r.text) | |
def test(patterns_file=None, remove_tags=None): | |
logger.info('learning patterns from sources...') | |
patterns = learn_patterns_from_sources([html for html in CACHE_IN], [html for html in CACHE_OUT], remove_tags=remove_tags) | |
if patterns_file is not None: | |
logger.info(f'saving patterns in {patterns_file}...') | |
try: | |
with open(patterns_file, 'wb') as f: | |
pickle.dump(patterns, f) | |
except Exception as err: | |
logger.error(f'unable to save patterns to {patterns_file}: {err}') | |
logger.info('extracting text from sources...') | |
return patterns, [text_from_html(html, patterns) for html in CACHE_IN] | |
def is_html_tag(word): | |
return re.fullmatch(r'<\w+>', word) is not None | |
@timeit | |
def learn_patterns_from_sources(html_in_source, html_outside_source=[], sample_max_size=30, remove_tags=None): | |
""" | |
html_in_source :: [raw_html, ...] :: raw html strings from source to scrap | |
html_outside_source :: [raw_html, ...] :: raw html strings from another sources | |
""" | |
n_samples = min(len(html_in_source), sample_max_size) | |
indexes = sample(range(len(html_in_source)), n_samples) | |
texts = [clean_html(html_in_source[i], remove_tags).split(' ') for i in indexes] | |
matches = defaultdict(int) | |
N = n_samples * (n_samples - 1) / 2 | |
n = N - 1 | |
logger.info(f'{N} combinations of documents to be compared') | |
for i, j in combinations(indexes, 2): | |
s = SequenceMatcher(None, texts[i], texts[j], autojunk=False) | |
matches_ = defaultdict(int) | |
for m in s.get_matching_blocks(): | |
if m.size > 0: | |
matches_[tuple(texts[i][m.a:m.a + m.size])] += 1 | |
for k, count in matches_.items(): | |
k_len = len([w for w in k if not is_html_tag(w)]) | |
if k_len >= count: | |
matches[k] += min(n, k_len) | |
logger.debug(f'{len(matches)} patterns to test, those with score >= {N} will be accepted:') | |
for k, count in matches.items(): | |
logger.debug(f' - {len(k)} words, score={count}, accepted={count >= N}') | |
logger.debug(summary(str(k))) | |
patterns = sorted([(list(k), len(k)) for k, count in matches.items() if count >= N], key=lambda x: -x[1]) | |
logger.info(f'{len(patterns)} patterns have been found') | |
if patterns: | |
n_samples = min(len(html_outside_source), sample_max_size) | |
texts = [clean_html(text).split(' ') for text in sample(html_outside_source, n_samples)] | |
for text in texts: | |
i, text_len = 0, len(text) | |
while i < text_len: | |
pattern_to_remove = None | |
word = text[i] | |
for j, (pattern, pattern_len) in enumerate(patterns): | |
if word == pattern[0] and text[i:i+pattern_len] == pattern: | |
pattern_to_remove = j | |
i += pattern_len | |
break | |
if pattern_to_remove is not None: | |
del(patterns[pattern_to_remove]) | |
logger.info(f'pattern {pattern_to_remove} of len {pattern_len} removed because it matched an outside source') | |
logger.debug(summary(pattern)) | |
continue | |
i += 1 | |
logger.debug(f'{len(patterns)} patterns accepted:') | |
for pattern, pattern_len in patterns: | |
logger.debug(f' - {summary(str(pattern))} (len={pattern_len})') | |
return patterns | |
@timeit | |
def text_from_html(html, patterns, remove_tags=['nav', 'footer']): | |
text = clean_html(html, remove_tags=remove_tags).split(' ') | |
i, text_len = 0, len(text) | |
new_text = [] | |
while i < text_len: | |
removed_sequence = None | |
word = text[i] | |
for pattern, pattern_len in patterns: | |
if word == pattern[0] and text[i:i+pattern_len] == pattern: | |
removed_sequence = pattern | |
i += pattern_len | |
break | |
if removed_sequence is not None: | |
if len(new_text) and '\n' in removed_sequence: | |
if new_text[-1] != '\n': | |
new_text.append('\n') | |
continue | |
new_text.append(word) | |
i += 1 | |
return normalize_spaces(' '.join([w for w in new_text if not is_html_tag(w)])) | |
def summary(text, maxlen=100, separator=' [...] '): | |
line = re.sub(r'\n+', '', text) | |
maxlen -= len(separator) | |
if len(line) <= maxlen: | |
return line | |
else: | |
return line[:maxlen // 2] + separator + line[-maxlen // 2:] | |
def clean_tag(match): | |
paragraph_tags = set(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'article', 'blockquote', 'aside', 'caption', 'pre', 'code', 'center', 'header', 'footer', 'main', 'section', 'summary', 'table', 'tr', 'tbody', 'tfoot', 'thead', 'label', 'figcaption', 'button', 'nav']) | |
tag = re.sub(r'\W+', '', match.group(2)).lower() | |
if match.group(1) == '/' and tag in paragraph_tags: | |
return f' <{tag}> \n ' | |
return f' <{tag}> ' | |
@timeit | |
def clean_html(html, remove_tags=None): | |
tags_to_decompose = set(['script', 'style', 'canvas', 'svg', 'video', 'audio', 'object', 'applet', 'noframes', 'noscript', 'iframe']) | |
if remove_tags is not None: | |
tags_to_decompose |= set(remove_tags) | |
# keep only body | |
html = re.sub(r'^.*?<\s*body.*?>+', '<body>', html, flags=re.I+re.S) | |
logger.debug('**** REMOVING ALL BEFORE <BODY> ****') | |
logger.debug(summary(html)) | |
# remove html comments <!-- --> | |
html = re.sub(r'<!--.*?-->', ' ', html, flags=re.S) | |
logger.debug('**** REMOVING COMMENTS ****') | |
logger.debug(summary(html)) | |
# remove script, style, canvas, svg, video, audio, object, applet, noframes, noscript and their content | |
for tag in tags_to_decompose: | |
html = re.sub(rf'<+\s*{tag}.*?>+.*?(<+\s*/\s*{tag}\s*>+)', ' \n ', html, flags=re.I+re.S) | |
logger.debug(f'**** REMOVING <{tag}>...</{tag} ****') | |
logger.debug(summary(html)) | |
# normalize html tags | |
html = re.sub(r'<+\s*(/?)\s*(\w+).*?>+', clean_tag, html, flags=re.S) | |
logger.debug('**** NORMALIZING HTML TAGS ****') | |
logger.debug(summary(html)) | |
# normalize spaces and newlines | |
return normalize_spaces(html) | |
def normalize_spaces(text): | |
""" | |
trim text | |
multi spaces are replaced by: one newline if newline in the spaces sequence, space otherwise | |
""" | |
text = re.sub(r'\s*\n\s*', ' \n ', text.strip()) | |
parts = re.split(r'(\s+)', text) | |
for i in range(1, len(parts), 2): | |
if parts[i] != ' \n ': | |
parts[i] = ' ' | |
return ''.join(parts) | |
def main(args=None): | |
""" | |
""" | |
global CACHE_IN, CACHE_OUT | |
patterns_file = f'{__file__}.patterns.pickle' | |
remove_tags = ['nav', 'footer'] | |
cache_loaded = False | |
try: | |
logger.info('loading data...') | |
with open(args.datafile, 'rb') as f: | |
CACHE_IN, CACHE_OUT = pickle.load(f) | |
except Exception as err: | |
logger.warning('unable to load data: %s', err) | |
download_html() | |
if len(CACHE_IN) or len(CACHE_OUT): | |
try: | |
with open(args.datafile, 'wb') as f: | |
pickle.dump((CACHE_IN, CACHE_OUT), f) | |
except Exception as err: | |
logger.error(f'unable to save cached data: {err}') | |
else: | |
cache_loaded = True | |
else: | |
cache_loaded = True | |
finally: | |
logger.debug('len(CACHE_IN)=%d, len(CACHE_OUT)=%d', len(CACHE_IN), len(CACHE_OUT)) | |
if args.html: | |
logger.setLevel(logging.DEBUG) | |
for i, cleaned_html in enumerate([clean_html(html) for html in CACHE_OUT]): | |
print(f'---------- CLEANED HTML {i} ----------') | |
print(cleaned_html) | |
print('--' * 50) | |
elif args.addin or args.addout: | |
items_added = 0 | |
if args.addin: | |
try: | |
r = requests.get(args.addin) | |
except Exception as err: | |
logger.error(f'unable to retrieve html: {err}') | |
else: | |
if r.text not in CACHE_IN: | |
CACHE_IN.append(r.text) | |
items_added += 1 | |
else: | |
logger.warning(f'{args.addin} is already cached') | |
if args.addout: | |
try: | |
r = requests.get(args.addout) | |
except Exception as err: | |
logger.error(f'unable to retrieve html: {err}') | |
else: | |
if r.text not in CACHE_OUT: | |
CACHE_OUT.append(r.text) | |
items_added += 1 | |
else: | |
logger.warning(f'{args.addout} is already cached') | |
if items_added: | |
try: | |
with open(args.datafile, 'wb') as f: | |
pickle.dump((CACHE_IN, CACHE_OUT), f) | |
except Exception as err: | |
logger.error(f'unable to save cached data: {err}') | |
else: | |
logger.info('cached data saved') | |
elif args.test: | |
try: | |
r = requests.get(args.test) | |
except Exception as err: | |
logger.error(f'unable to download test url: {err}') | |
else: | |
try: | |
with open(patterns_file, 'rb') as f: | |
patterns = pickle.load(f) | |
except Exception as err: | |
logger.error(f'unable to load patterns from {patterns_file}: {err}') | |
logger.info('regenerating patterns from known sources...') | |
patterns = learn_patterns_from_sources([html for html in CACHE_IN], [html for html in CACHE_OUT], remove_tags=remove_tags) | |
logger.info(f'extracting text from {r.url}...') | |
res = text_from_html(r.text, patterns) | |
logger.info('done') | |
print('---------- EXTRACTED TEXT ----------') | |
print(res) | |
print('--' * 50) | |
else: | |
pat, ret = test(patterns_file, remove_tags=remove_tags) | |
print('*** PATTERNS ***') | |
for pattern, pattern_len in pat: | |
print(f" pattern with len {pattern_len}: {pattern}") | |
print(f'{len(pat)} patterns') | |
print('*** TEXTS EXTRACTED ***') | |
for i, text in enumerate(ret): | |
print(f'--------- TEXT {i} ----------') | |
print(text) | |
print('--' * 50) | |
if __name__ == '__main__': | |
import argparse | |
parser=argparse.ArgumentParser() | |
parser.add_argument('datafile', type=str, help='file used to store and retrieve raw html downloaded from sources') | |
parser.add_argument('--addin', type=str, help='add an url to download in the source list') | |
parser.add_argument('--addout', type=str, help='add an url to download in the outside source list') | |
parser.add_argument('--test', type=str, help='test content extraction for a new url of the source site') | |
parser.add_argument('--debug', '-d', action='store_true', help='verbose mode') | |
parser.add_argument('--html', action='store_true', help='check html of outside sources') | |
args = parser.parse_args() | |
if args.debug: | |
logger.setLevel(logging.DEBUG) | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment