Skip to content

Instantly share code, notes, and snippets.

@ab3llini
Last active July 23, 2020 12:27
Show Gist options
  • Save ab3llini/2e55f29e0298df933bd94345414e313e to your computer and use it in GitHub Desktop.
Save ab3llini/2e55f29e0298df933bd94345414e313e to your computer and use it in GitHub Desktop.
TweetPreprocessor
from multiprocessing import cpu_count
import pandas as pd
import numpy as np
from multiprocessing import Pool
from transformers import AutoTokenizer
import spacy
class TweetProcessor:
def __init__(self, df, tokenizer, nlp, text_col='text', label_col='sentiment', max_len=80):
"""
Preprocessor constructor
:param df: a pandas data frame. Two columns only (text and sentiment)
:param tokenizer: a transformer tokenizer specific for the language of the dataset.
:param nlp: an instance of SpaCy model specific for the language of the dataset.
:param text_col: the name of the column containing the sentences
:param label_col: the name of the column containing the labels (i.e. sentiment)
:param max_len: The maximum length of each sequence in the pre-processed dataset.
"""
self.df = df
self.nlp = nlp
self.label_col = label_col
self.text_col = text_col
self.max_len = max_len
self.tokenizer = tokenizer
def clean(self, doc):
"""
A cleaning utility specifically designed for tweets
:param doc: a SpaCy Doc
:return: a cleaned SpaCy Doc
"""
out = ''
for t in doc:
bad = t.like_url
bad |= t.text.startswith('@')
bad |= t.like_email
if not bad:
out += t.text_with_ws
return self.nlp(out)
def process(self, df_row):
"""
Process a chunk of the data frame. The operation is carried out in parallel on all detected threads.
:param df_row: The data frame chunk that the worker will manipulate and process
:return: a tuple of four lists, representing the model input
"""
# Parse the doc using SpaCy
doc = self.nlp(df_row[self.text_col])
# Clean text (remove mentions etc)
doc = self.clean(doc)
# Keep only the first max_len tokens
doc = doc[:self.max_len]
# Tokenize text and add cls / sep
input_ids = self.tokenizer.encode(doc.text, add_special_tokens=True)
# How long is this chunk?
size = len(input_ids)
# Check if after the encoding the size grew
if size > self.max_len:
input_ids = input_ids[:self.max_len]
size = len(input_ids)
# Add pad tokens if the sequence is too short
if size < self.max_len:
# Pad
input_ids += [self.tokenizer.pad_token_id] * (self.max_len - size)
# This should always be true
assert len(input_ids) == self.max_len
# BERT - Create token_type_ids
token_type_ids = [0] * size + [1] * (self.max_len - size) if size < self.max_len else [0] * size
# BERT - Create attention mask
attention_mask = [1] * size + [0] * (self.max_len - size) if size < self.max_len else [1] * size
# BERT - Create numerical labels
label = df_row[self.label_col]
labels = 1 if label == 'neutral' else 2 if label == 'positive' else 0
# Return the output as a tuple
return input_ids, attention_mask, token_type_ids, labels
def transform(self, df):
"""
Transforms the data frame into a pre-processed frame
:param df: the worker data frame
:return: the pre-processed data frame
"""
processed = []
for _, row in df.iterrows():
processed.append(self.process(row))
return pd.DataFrame(data=processed, columns=['input_ids', 'attention_mask', 'token_type_ids', 'labels'])
@staticmethod
def balance(df):
"""
Utility to balance the dataset
:param df: the data frame to balance
:return: the balanced data frame
"""
# Get same amount of samples in each class
max_samples = min(df.sentiment.value_counts())
neg = df[df.sentiment == 'negative'].sample(n=max_samples)
neu = df[df.sentiment == 'neutral'].sample(n=max_samples)
pos = df[df.sentiment == 'positive'].sample(n=max_samples)
return pd.concat([pos, neu, neg]).sample(frac=1)
def run(self, save_path):
"""
Run the pre-processor
:return: the balanced dataset
"""
# Balance the dataset
self.df = self.balance(self.df)
# Split the data frame across the workers
df_split = np.array_split(self.df, cpu_count())
# Make a worker pool
pool = Pool(cpu_count())
# Invoke the pool and assign to each worker a data frame chunk
df = pd.concat(pool.map(self.transform, df_split))
# Close the pool
pool.close()
# Join the process & wait termination
pool.join()
df.to_json(save_path, orient='values')
if __name__ == '__main__':
# Your CSV dataset path goes here
csv_path = 'path/to/your/well/formatted/csv/goes/here.csv'
# Your pre-trained BERT model name goes here (e.g. bert-base-cased)
transformer_model_name = ''
# Your SpaCy model name goes here (e.g. it_core_news_sm)
spacy_model_name = ''
# Your pre-processed file path goes here. Spacify .json
preproc_path = 'path/to/your/preprocessed/file.json'
df = pd.read_csv(csv_path)
tokenizer = AutoTokenizer.from_pretrained(transformer_model_name)
nlp = spacy.load(spacy_model_name)
processor = TweetProcessor(
df=df,
tokenizer=tokenizer,
nlp=nlp,
text_col='text',
label_col='sentiment',
max_len=80
)
processor.run(preproc_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment