Created
May 28, 2020 10:15
-
-
Save ShawonAshraf/a8173adadd5aab3c0a66ca4271caa1bd to your computer and use it in GitHub Desktop.
Text Classification CNN Keras
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Taken from https://github.com/alexander-rakhlin/CNN-for-Sentence-Classification-in-Keras | |
""" | |
import numpy as np | |
import re | |
import itertools | |
from collections import Counter | |
""" | |
Original taken from https://github.com/dennybritz/cnn-text-classification-tf | |
""" | |
def clean_str(string): | |
""" | |
Tokenization/string cleaning for all datasets except for SST. | |
Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py | |
""" | |
string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string) | |
string = re.sub(r"\'s", " \'s", string) | |
string = re.sub(r"\'ve", " \'ve", string) | |
string = re.sub(r"n\'t", " n\'t", string) | |
string = re.sub(r"\'re", " \'re", string) | |
string = re.sub(r"\'d", " \'d", string) | |
string = re.sub(r"\'ll", " \'ll", string) | |
string = re.sub(r",", " , ", string) | |
string = re.sub(r"!", " ! ", string) | |
string = re.sub(r"\(", " \( ", string) | |
string = re.sub(r"\)", " \) ", string) | |
string = re.sub(r"\?", " \? ", string) | |
string = re.sub(r"\s{2,}", " ", string) | |
return string.strip().lower() | |
def load_data_and_labels(): | |
""" | |
Loads MR polarity data from files, splits the data into words and generates labels. | |
Returns split sentences and labels. | |
""" | |
# Load data from files | |
positive_examples = list(open("./data/rt-polarity.pos").readlines()) | |
positive_examples = [s.strip() for s in positive_examples] | |
negative_examples = list(open("./data/rt-polarity.neg").readlines()) | |
negative_examples = [s.strip() for s in negative_examples] | |
# Split by words | |
x_text = positive_examples + negative_examples | |
x_text = [clean_str(sent) for sent in x_text] | |
x_text = [s.split(" ") for s in x_text] | |
# Generate labels | |
positive_labels = [[0, 1] for _ in positive_examples] | |
negative_labels = [[1, 0] for _ in negative_examples] | |
y = np.concatenate([positive_labels, negative_labels], 0) | |
return [x_text, y] | |
def pad_sentences(sentences, padding_word="<PAD/>"): | |
""" | |
Pads all sentences to the same length. The length is defined by the longest sentence. | |
Returns padded sentences. | |
""" | |
sequence_length = max(len(x) for x in sentences) | |
padded_sentences = [] | |
for i in range(len(sentences)): | |
sentence = sentences[i] | |
num_padding = sequence_length - len(sentence) | |
new_sentence = sentence + [padding_word] * num_padding | |
padded_sentences.append(new_sentence) | |
return padded_sentences | |
def build_vocab(sentences): | |
""" | |
Builds a vocabulary mapping from word to index based on the sentences. | |
Returns vocabulary mapping and inverse vocabulary mapping. | |
""" | |
# Build vocabulary | |
word_counts = Counter(itertools.chain(*sentences)) | |
# Mapping from index to word | |
vocabulary_inv = [x[0] for x in word_counts.most_common()] | |
# Mapping from word to index | |
vocabulary = {x: i for i, x in enumerate(vocabulary_inv)} | |
return [vocabulary, vocabulary_inv] | |
def build_input_data(sentences, labels, vocabulary): | |
""" | |
Maps sentencs and labels to vectors based on a vocabulary. | |
""" | |
x = np.array([[vocabulary[word] for word in sentence] for sentence in sentences]) | |
y = np.array(labels) | |
return [x, y] | |
def load_data(): | |
""" | |
Loads and preprocessed data for the MR dataset. | |
Returns input vectors, labels, vocabulary, and inverse vocabulary. | |
""" | |
# Load and preprocess data | |
sentences, labels = load_data_and_labels() | |
sentences_padded = pad_sentences(sentences) | |
vocabulary, vocabulary_inv = build_vocab(sentences_padded) | |
x, y = build_input_data(sentences_padded, labels, vocabulary) | |
return [x, y, vocabulary, vocabulary_inv] | |
def batch_iter(data, batch_size, num_epochs): | |
""" | |
Generates a batch iterator for a dataset. | |
""" | |
data = np.array(data) | |
data_size = len(data) | |
num_batches_per_epoch = int(len(data) / batch_size) + 1 | |
for epoch in range(num_epochs): | |
# Shuffle the data at each epoch | |
shuffle_indices = np.random.permutation(np.arange(data_size)) | |
shuffled_data = data[shuffle_indices] | |
for batch_num in range(num_batches_per_epoch): | |
start_index = batch_num * batch_size | |
end_index = min((batch_num + 1) * batch_size, data_size) | |
yield shuffled_data[start_index:end_index] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import data_helpers as df | |
from w2v import train_word2vec | |
from tensorflow.keras.models import Sequential, Model | |
from tensorflow.keras.layers import Dense, Dropout, Flatten, Input, MaxPooling1D, Convolution1D, Embedding, concatenate | |
from tensorflow.keras.datasets import imdb | |
from tensorflow.keras.preprocessing import sequence | |
np.random.seed(0) | |
# ---------------------- Parameters section ------------------- | |
# | |
# Model type. See Kim Yoon's Convolutional Neural Networks for Sentence Classification, Section 3 | |
model_type = "CNN-non-static" # CNN-rand|CNN-non-static|CNN-static | |
# Data source | |
data_source = "keras_data_set" # keras_data_set|local_dir | |
# Model Hyperparameters | |
embedding_dim = 50 | |
filter_sizes = (3, 8) | |
num_filters = 10 | |
dropout_prob = (0.5, 0.8) | |
hidden_dims = 50 | |
# Training parameters | |
batch_size = 64 | |
num_epochs = 10 | |
# Prepossessing parameters | |
sequence_length = 400 | |
max_words = 5000 | |
# Word2Vec parameters (see train_word2vec) | |
min_word_count = 1 | |
context = 10 | |
def load_data(data_source): | |
assert data_source in ["keras_data_set", "local_dir"], "Unknown data source" | |
if data_source == "keras_data_set": | |
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_words, start_char=None, | |
oov_char=None, index_from=None) | |
x_train = sequence.pad_sequences(x_train, maxlen=sequence_length, padding="post", truncating="post") | |
x_test = sequence.pad_sequences(x_test, maxlen=sequence_length, padding="post", truncating="post") | |
vocabulary = imdb.get_word_index() | |
vocabulary_inv = dict((v, k) for k, v in vocabulary.items()) | |
vocabulary_inv[0] = "<PAD/>" | |
else: | |
x, y, vocabulary, vocabulary_inv_list = df.load_data() | |
vocabulary_inv = {key: value for key, value in enumerate(vocabulary_inv_list)} | |
y = y.argmax(axis=1) | |
# Shuffle data | |
shuffle_indices = np.random.permutation(np.arange(len(y))) | |
x = x[shuffle_indices] | |
y = y[shuffle_indices] | |
train_len = int(len(x) * 0.9) | |
x_train = x[:train_len] | |
y_train = y[:train_len] | |
x_test = x[train_len:] | |
y_test = y[train_len:] | |
return x_train, y_train, x_test, y_test, vocabulary_inv | |
# Data Preparation | |
print("Load data...") | |
x_train, y_train, x_test, y_test, vocabulary_inv = load_data(data_source) | |
if sequence_length != x_test.shape[1]: | |
print("Adjusting sequence length for actual size") | |
sequence_length = x_test.shape[1] | |
print("x_train shape:", x_train.shape) | |
print("x_test shape:", x_test.shape) | |
print("Vocabulary Size: {:d}".format(len(vocabulary_inv))) | |
# Prepare embedding layer weights and convert inputs for static model | |
print("Model type is", model_type) | |
if model_type in ["CNN-non-static", "CNN-static"]: | |
embedding_weights = train_word2vec(np.vstack((x_train, x_test)), vocabulary_inv, num_features=embedding_dim, | |
min_word_count=min_word_count, context=context) | |
if model_type == "CNN-static": | |
x_train = np.stack([np.stack([embedding_weights[word] for word in sentence]) for sentence in x_train]) | |
x_test = np.stack([np.stack([embedding_weights[word] for word in sentence]) for sentence in x_test]) | |
print("x_train static shape:", x_train.shape) | |
print("x_test static shape:", x_test.shape) | |
elif model_type == "CNN-rand": | |
embedding_weights = None | |
else: | |
raise ValueError("Unknown model type") | |
# Build model | |
if model_type == "CNN-static": | |
input_shape = (sequence_length, embedding_dim) | |
else: | |
input_shape = (sequence_length,) | |
model_input = Input(shape=input_shape) | |
# Static model does not have embedding layer | |
if model_type == "CNN-static": | |
z = model_input | |
else: | |
z = Embedding(len(vocabulary_inv), embedding_dim, input_length=sequence_length, name="embedding")(model_input) | |
z = Dropout(dropout_prob[0])(z) | |
# Convolutional block | |
conv_blocks = [] | |
for sz in filter_sizes: | |
conv = Convolution1D(filters=num_filters, | |
kernel_size=sz, | |
padding="valid", | |
activation="relu", | |
strides=1)(z) | |
conv = MaxPooling1D(pool_size=2)(conv) | |
conv = Flatten()(conv) | |
conv_blocks.append(conv) | |
z = concatenate(inputs=conv_blocks) | |
z = Dropout(dropout_prob[1])(z) | |
z = Dense(hidden_dims, activation="relu")(z) | |
model_output = Dense(1, activation="sigmoid")(z) | |
model = Model(model_input, model_output) | |
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"]) | |
# Initialize weights with word2vec | |
if model_type == "CNN-non-static": | |
weights = np.array([v for v in embedding_weights.values()]) | |
print("Initializing embedding layer with word2vec weights, shape", weights.shape) | |
embedding_layer = model.get_layer("embedding") | |
embedding_layer.set_weights([weights]) | |
# Train the model | |
history = model.fit(x_train, y_train, batch_size=batch_size, epochs=num_epochs, | |
validation_data=(x_test, y_test), verbose=2) | |
results = model.evaluate(x_test, y_test) | |
print(results) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
taken from https://github.com/alexander-rakhlin/CNN-for-Sentence-Classification-in-Keras | |
""" | |
from gensim.models import word2vec | |
from os.path import join, exists, split | |
import os | |
import numpy as np | |
def train_word2vec(sentence_matrix, vocabulary_inv, | |
num_features=300, min_word_count=1, context=10): | |
""" | |
Trains, saves, loads Word2Vec model | |
Returns initial weights for embedding layer. | |
inputs: | |
sentence_matrix # int matrix: num_sentences x max_sentence_len | |
vocabulary_inv # dict {int: str} | |
num_features # Word vector dimensionality | |
min_word_count # Minimum word count | |
context # Context window size | |
""" | |
model_dir = 'models' | |
model_name = "{:d}features_{:d}minwords_{:d}context".format(num_features, min_word_count, context) | |
model_name = join(model_dir, model_name) | |
if exists(model_name): | |
embedding_model = word2vec.Word2Vec.load(model_name) | |
print('Load existing Word2Vec model \'%s\'' % split(model_name)[-1]) | |
else: | |
# Set values for various parameters | |
num_workers = 2 # Number of threads to run in parallel | |
downsampling = 1e-3 # Downsample setting for frequent words | |
# Initialize and train the model | |
print('Training Word2Vec model...') | |
sentences = [[vocabulary_inv[w] for w in s] for s in sentence_matrix] | |
embedding_model = word2vec.Word2Vec(sentences, workers=num_workers, | |
size=num_features, min_count=min_word_count, | |
window=context, sample=downsampling) | |
# If we don't plan to train the model any further, calling | |
# init_sims will make the model much more memory-efficient. | |
embedding_model.init_sims(replace=True) | |
# Saving the model for later use. You can load it later using Word2Vec.load() | |
if not exists(model_dir): | |
os.mkdir(model_dir) | |
print('Saving Word2Vec model \'%s\'' % split(model_name)[-1]) | |
embedding_model.save(model_name) | |
# add unknown words | |
embedding_weights = {key: embedding_model[word] if word in embedding_model else | |
np.random.uniform(-0.25, 0.25, embedding_model.vector_size) | |
for key, word in vocabulary_inv.items()} | |
return embedding_weights | |
if __name__ == '__main__': | |
import data_helpers | |
print("Loading data...") | |
x, _, _, vocabulary_inv_list = data_helpers.load_data() | |
vocabulary_inv = {key: value for key, value in enumerate(vocabulary_inv_list)} | |
w = train_word2vec(x, vocabulary_inv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment