Last active
July 23, 2020 12:27
-
-
Save ab3llini/2e55f29e0298df933bd94345414e313e to your computer and use it in GitHub Desktop.
TweetPreprocessor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import cpu_count | |
import pandas as pd | |
import numpy as np | |
from multiprocessing import Pool | |
from transformers import AutoTokenizer | |
import spacy | |
class TweetProcessor: | |
def __init__(self, df, tokenizer, nlp, text_col='text', label_col='sentiment', max_len=80): | |
""" | |
Preprocessor constructor | |
:param df: a pandas data frame. Two columns only (text and sentiment) | |
:param tokenizer: a transformer tokenizer specific for the language of the dataset. | |
:param nlp: an instance of SpaCy model specific for the language of the dataset. | |
:param text_col: the name of the column containing the sentences | |
:param label_col: the name of the column containing the labels (i.e. sentiment) | |
:param max_len: The maximum length of each sequence in the pre-processed dataset. | |
""" | |
self.df = df | |
self.nlp = nlp | |
self.label_col = label_col | |
self.text_col = text_col | |
self.max_len = max_len | |
self.tokenizer = tokenizer | |
def clean(self, doc): | |
""" | |
A cleaning utility specifically designed for tweets | |
:param doc: a SpaCy Doc | |
:return: a cleaned SpaCy Doc | |
""" | |
out = '' | |
for t in doc: | |
bad = t.like_url | |
bad |= t.text.startswith('@') | |
bad |= t.like_email | |
if not bad: | |
out += t.text_with_ws | |
return self.nlp(out) | |
def process(self, df_row): | |
""" | |
Process a chunk of the data frame. The operation is carried out in parallel on all detected threads. | |
:param df_row: The data frame chunk that the worker will manipulate and process | |
:return: a tuple of four lists, representing the model input | |
""" | |
# Parse the doc using SpaCy | |
doc = self.nlp(df_row[self.text_col]) | |
# Clean text (remove mentions etc) | |
doc = self.clean(doc) | |
# Keep only the first max_len tokens | |
doc = doc[:self.max_len] | |
# Tokenize text and add cls / sep | |
input_ids = self.tokenizer.encode(doc.text, add_special_tokens=True) | |
# How long is this chunk? | |
size = len(input_ids) | |
# Check if after the encoding the size grew | |
if size > self.max_len: | |
input_ids = input_ids[:self.max_len] | |
size = len(input_ids) | |
# Add pad tokens if the sequence is too short | |
if size < self.max_len: | |
# Pad | |
input_ids += [self.tokenizer.pad_token_id] * (self.max_len - size) | |
# This should always be true | |
assert len(input_ids) == self.max_len | |
# BERT - Create token_type_ids | |
token_type_ids = [0] * size + [1] * (self.max_len - size) if size < self.max_len else [0] * size | |
# BERT - Create attention mask | |
attention_mask = [1] * size + [0] * (self.max_len - size) if size < self.max_len else [1] * size | |
# BERT - Create numerical labels | |
label = df_row[self.label_col] | |
labels = 1 if label == 'neutral' else 2 if label == 'positive' else 0 | |
# Return the output as a tuple | |
return input_ids, attention_mask, token_type_ids, labels | |
def transform(self, df): | |
""" | |
Transforms the data frame into a pre-processed frame | |
:param df: the worker data frame | |
:return: the pre-processed data frame | |
""" | |
processed = [] | |
for _, row in df.iterrows(): | |
processed.append(self.process(row)) | |
return pd.DataFrame(data=processed, columns=['input_ids', 'attention_mask', 'token_type_ids', 'labels']) | |
@staticmethod | |
def balance(df): | |
""" | |
Utility to balance the dataset | |
:param df: the data frame to balance | |
:return: the balanced data frame | |
""" | |
# Get same amount of samples in each class | |
max_samples = min(df.sentiment.value_counts()) | |
neg = df[df.sentiment == 'negative'].sample(n=max_samples) | |
neu = df[df.sentiment == 'neutral'].sample(n=max_samples) | |
pos = df[df.sentiment == 'positive'].sample(n=max_samples) | |
return pd.concat([pos, neu, neg]).sample(frac=1) | |
def run(self, save_path): | |
""" | |
Run the pre-processor | |
:return: the balanced dataset | |
""" | |
# Balance the dataset | |
self.df = self.balance(self.df) | |
# Split the data frame across the workers | |
df_split = np.array_split(self.df, cpu_count()) | |
# Make a worker pool | |
pool = Pool(cpu_count()) | |
# Invoke the pool and assign to each worker a data frame chunk | |
df = pd.concat(pool.map(self.transform, df_split)) | |
# Close the pool | |
pool.close() | |
# Join the process & wait termination | |
pool.join() | |
df.to_json(save_path, orient='values') | |
if __name__ == '__main__': | |
# Your CSV dataset path goes here | |
csv_path = 'path/to/your/well/formatted/csv/goes/here.csv' | |
# Your pre-trained BERT model name goes here (e.g. bert-base-cased) | |
transformer_model_name = '' | |
# Your SpaCy model name goes here (e.g. it_core_news_sm) | |
spacy_model_name = '' | |
# Your pre-processed file path goes here. Spacify .json | |
preproc_path = 'path/to/your/preprocessed/file.json' | |
df = pd.read_csv(csv_path) | |
tokenizer = AutoTokenizer.from_pretrained(transformer_model_name) | |
nlp = spacy.load(spacy_model_name) | |
processor = TweetProcessor( | |
df=df, | |
tokenizer=tokenizer, | |
nlp=nlp, | |
text_col='text', | |
label_col='sentiment', | |
max_len=80 | |
) | |
processor.run(preproc_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment