-
-
Save jwest75674/e71a2634226a755781179ade4b3d598e to your computer and use it in GitHub Desktop.
Plain common crawl pre-processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import gzip | |
import time | |
import json | |
import shutil | |
import os,sys | |
import tldextract | |
import collections | |
import pandas as pd | |
from tqdm import tqdm | |
import urllib.request | |
storage_folder = 'data/' | |
file_prefix = 'https://commoncrawl.s3.amazonaws.com/' | |
def read_every_line(fname, | |
max_lines=-1): | |
lines = [] | |
with open(fname, encoding='utf-8') as f: | |
for i, l in enumerate(f): | |
lines.append(l) | |
if i>max_lines and max_lines>0: | |
break | |
return lines | |
def reporthook(count, block_size, total_size): | |
global start_time | |
if count == 0: | |
start_time = time.time() | |
return | |
duration = time.time() - start_time | |
progress_size = int(count * block_size) | |
speed = int(progress_size / (1024 * duration)) | |
percent = int(count * block_size * 100 / total_size) | |
sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" % | |
(percent, progress_size / (1024 * 1024), speed, duration)) | |
sys.stdout.flush() | |
def save(url, filename): | |
urllib.request.urlretrieve(url, filename, reporthook) | |
def process_index_file_line(line): | |
assert type(line)==str | |
try: | |
lst = line.replace('\n','').split() | |
ts = lst[1] | |
data = json.loads(line.replace('\n','').split(ts)[-1].strip()) | |
except: | |
return () | |
if data['status'] != '200': | |
return () | |
else: | |
try: | |
language = data['languages'] | |
except: | |
language = 'none' | |
try: | |
_tldextract = tldextract.extract(data['url']) | |
tup = (ts, | |
data['url'], | |
_tldextract.suffix, | |
data['length'], | |
data['offset'], | |
data['filename'], | |
language | |
) | |
return tup | |
except: | |
return () | |
def process_index_file(file_name): | |
print('Unzipping index file ... ') | |
df_name = file_name.replace('.gz','.feather') | |
file_unzipped = file_name.split('.gz')[0] | |
with gzip.open(file_name, 'rb') as f_in: | |
with open(file_unzipped, 'wb') as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
lines = read_every_line(file_unzipped, | |
1e8) | |
print('{} lines extracted'.format(len(lines))) | |
print('Pre-processing index lines ... ') | |
out = list_multiprocessing(lines, | |
process_index_file_line, | |
workers=8) | |
# filter our blank lines | |
out = [_ for _ in out if _ != ()] | |
print('Index pre-processed ... ') | |
print('Processing index dataframe ... ') | |
ts_list = [_[0] for _ in out] | |
url_list = [_[1] for _ in out] | |
tld = [_[2] for _ in out] | |
length_list = [_[3] for _ in out] | |
offset_list = [_[4] for _ in out] | |
warc_list = [_[5] for _ in out] | |
language_list = [_[6] for _ in out] | |
cols = ['ts','url','tld','length','offset','warc','language'] | |
df = pd.DataFrame(data={ | |
'ts':ts_list, | |
'url':url_list, | |
'tld':tld, | |
'length':length_list, | |
'offset':offset_list, | |
'warc':warc_list, | |
'language':language_list} | |
,columns=cols) | |
df = df[df.language=='rus'] | |
df['wet'] = df.warc.apply(lambda x: x.replace('/warc/','/wet/').replace('.warc.','.warc.wet.')) | |
df['wet'] = df['wet'].apply(lambda x: file_prefix + x) | |
print('Index dataframe is ready ... ') | |
os.remove(file_name) | |
os.remove(file_unzipped) | |
print('Files removed ... ') | |
df = df.dropna().drop_duplicates().reset_index(drop=True) | |
df.to_feather(df_name) | |
print('Df saved ... ') | |
def list_multiprocessing(param_lst, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)] | |
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst))) | |
# lists do not need such sorting, but this can be useful later | |
result=sorted(result,key=lambda x:x[0]) | |
return [_[1] for _ in result] | |
def _apply_lst(args): | |
params, func, num, kwargs = args | |
return num, func(*params,**kwargs) | |
cc_indexes = read_every_line('data/cc-index.paths') | |
# remove the meta-data / technical lines | |
cc_indexes = cc_indexes[:-2] | |
# remove line breaks | |
cc_indexes = [_.replace('\n','') for _ in cc_indexes] | |
file_dict = collections.OrderedDict() | |
# iterate over the index files | |
for i,cc_index in enumerate(cc_indexes): | |
if i>75: | |
cc_index_file = cc_index.split('/')[-1] | |
file_dict[os.path.join(storage_folder,cc_index_file)] = file_prefix + cc_index | |
else: | |
pass | |
for i,(file_name,url) in enumerate(tqdm(file_dict.items())): | |
print('PROCESSING INDEX FILE [{}]/[{}] ...'.format(i,len(file_dict))) | |
print('Downloading an index file {} ...'.format(file_name)) | |
save(url, file_name) | |
process_index_file(file_name) | |
gc.collect() | |
# print(i,(file_name,url)) | |
print('Downloaded an index file ...') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import time | |
import pickle | |
import pandas as pd | |
def pckl(obj,path): | |
with open(path, 'wb') as handle: | |
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
def upkl(path): | |
with open(path, 'rb') as handle: | |
_ = pickle.load(handle) | |
return _ | |
dfs = [] | |
# get whole index files | |
for i in range(0,10): | |
dfs.append(pd.read_feather('index_269_{}.feather'.format(str(i)))) | |
print('Index {} loaded'.format(i)) | |
df = pd.concat(dfs) | |
del dfs | |
gc.collect() | |
print('Memory released') | |
time.sleep(10) | |
# rank wet files by their popularity within Russian websites | |
wet_urls = list(df.wet.value_counts().index) | |
url_set = set(df.url.unique()) | |
pckl(wet_urls,'wet_urls.pickle') | |
pckl(url_set,'url_set.pickle') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import re | |
import gzip | |
import time | |
# just git clone https://github.com/erroneousboat/warc3.git | |
# you will need a warc subfolder from there | |
import warc | |
import nltk | |
import pickle | |
# you will need to download this | |
nltk.download('punkt') | |
import shutil | |
import os,sys | |
import tldextract | |
import pandas as pd | |
from tqdm import tqdm | |
import urllib.request | |
from multiprocessing import Pool | |
def pckl(obj,path): | |
with open(path, 'wb') as handle: | |
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
def upkl(path): | |
with open(path, 'rb') as handle: | |
_ = pickle.load(handle) | |
return _ | |
def reporthook(count, block_size, total_size): | |
global start_time | |
if count == 0: | |
start_time = time.time() | |
return | |
duration = time.time() - start_time | |
progress_size = int(count * block_size) | |
speed = int(progress_size / (1024 * duration)) | |
percent = int(count * block_size * 100 / total_size) | |
sys.stdout.write("\r...%d%%, %d MB, %d KB/s, %d seconds passed" % | |
(percent, progress_size / (1024 * 1024), speed, duration)) | |
sys.stdout.flush() | |
def save(url, filename): | |
urllib.request.urlretrieve(url, filename, reporthook) | |
def read_every_line(fname, | |
max_lines=-1): | |
lines = [] | |
with open(fname, encoding='utf-8') as f: | |
for i, l in enumerate(f): | |
lines.append(l) | |
if i>max_lines and max_lines>0: | |
break | |
return lines | |
def remove_special_chars(text,char_list): | |
for char in char_list: | |
text=text.replace(char,'') | |
return text.replace(u'\xa0', u' ') | |
def remove_html_tags(text): | |
"""Remove html tags from a string""" | |
import re | |
clean = re.compile('<.*?>') | |
return re.sub(clean, '', text) | |
def _remove_non_printed_chars(string): | |
reg = re.compile('[^a-zA-Zа-яА-ЯёЁ]') | |
return reg.sub('', string) | |
def process_web_text(text): | |
# fist remove any remaining HTML | |
text = remove_html_tags(text) | |
# then split by line | |
sentences = text.split('\n') | |
# then omit sentences with more than 50% non printable chars | |
sentences = [nltk.sent_tokenize(sentence) for sentence in sentences if len(sentence)//2<len(_remove_non_printed_chars(sentence))-2] | |
sentences = [item for sublist in sentences for item in sublist] | |
return sentences | |
def list_multiprocessing(param_lst, | |
func, | |
**kwargs): | |
workers = kwargs.pop('workers') | |
with Pool(workers) as p: | |
apply_lst = [([params], func, i, kwargs) for i,params in enumerate(param_lst)] | |
result = list(tqdm(p.imap(_apply_lst, apply_lst), total=len(apply_lst))) | |
# lists do not need such sorting, but this can be useful later | |
result=sorted(result,key=lambda x:x[0]) | |
return [_[1] for _ in result] | |
def _apply_lst(args): | |
params, func, num, kwargs = args | |
return num, func(*params,**kwargs) | |
def process_wet_file(wet_url): | |
global url_set | |
try: | |
print('Downloading WET file {} ... '.format(wet_url)) | |
file_name = wet_url.split('/')[-1] | |
file_unzipped = file_name.replace('.warc.wet.gz','.warc') | |
save(wet_url, file_name) | |
print('Download complete {} ... ') | |
print('Unzipping index file ... ') | |
df_name = file_name.replace('.warc.wet.gz','.feather') | |
cols = ['url','domain','tld','sentence'] | |
df = pd.DataFrame(columns=cols) | |
# unzip a file | |
with gzip.open(file_name, 'rb') as f_in: | |
with open(file_unzipped, 'wb') as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
lines = read_every_line(file_unzipped, | |
1e8) | |
print('File unzipped ... ') | |
print('Processing WET file ... ') | |
with warc.open(file_unzipped) as f: | |
for i,record in enumerate(f): | |
if record.url in url_set: | |
_tldextract = tldextract.extract(record.url) | |
d = _tldextract.domain | |
tld = _tldextract.suffix | |
text = record.payload.read().decode("utf-8") | |
sentences = process_web_text(text) | |
temp_df = pd.DataFrame(data={ | |
'url':[record.url]*len(sentences), | |
'domain':[d]*len(sentences), | |
'tld':[tld]*len(sentences), | |
'sentence':sentences} | |
,columns=cols) | |
df = df.append(temp_df) | |
print('WET file processed ... ') | |
os.remove(file_name) | |
os.remove(file_unzipped) | |
print('Files removed ... ') | |
df = df.dropna().drop_duplicates().reset_index(drop=True) | |
df.to_feather(df_name) | |
print('Df saved ... ') | |
except Exception as e: | |
print('{} is not processed'.format(wet_url)) | |
print(e) | |
# rank wet files by their popularity within Russian websites | |
wet_urls = upkl('wet_urls.pickle') | |
url_set = upkl('url_set.pickle') | |
print('Total {} WET files, Total {} URLs'.format(len(wet_urls),len(url_set))) | |
print('Downloads starting now ...') | |
list_multiprocessing(wet_urls[:], | |
process_wet_file, | |
workers=4) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
dfs = [] | |
cols = ['url','tld','wet'] | |
for i in range(1,100): | |
dfs.append(pd.read_feather('../common_crawl/data/cdx-000{}.feather'.format(str(i).zfill(2)))[cols]) | |
for i in range(100,269): | |
dfs.append(pd.read_feather('../common_crawl/data/cdx-00{}.feather'.format(str(i).zfill(3)))[cols]) | |
df = pd.concat(dfs) | |
df = df.reset_index(drop=True) | |
chunk_size = len(df) // 10 | |
start = 0 | |
end = chunk_size-1 | |
c = 0 | |
while end < df.shape[0]: | |
chunk = df.iloc[start:end].reset_index(drop=True) | |
try: | |
chunk.to_feather('index_269_{}.feather'.format(str(c))) | |
except (Exception) as e: | |
print (e) | |
print (chunk) | |
print (chunk.info()) | |
c+=1 | |
start += chunk_size | |
end += chunk_size |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment