Created
September 16, 2020 13:16
-
-
Save micaleel/2c9fa0c02bf76b0791ab3ae711a345b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Yelp review processor. | |
The module clusters Yelp features based on their sentence similarities. | |
This module takes a set of reviews as inputs, then generates two files: | |
- items.csv - item and their properties. | |
- extractions.csv - features and sentiments mined from item reviews. | |
""" | |
import sys | |
import warnings | |
import numpy as np | |
import pandas as pd | |
from logbook import Logger, StreamHandler | |
from pymongo import MongoClient | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sklearn.neighbors import KNeighborsClassifier | |
import argparse | |
import os | |
import sys | |
import pandas as pd | |
from logbook import Logger, StreamHandler | |
from yelp.feature_clustering import group_features | |
from yelp.opinionminer import extractions_from_mongodb, filter_extractions | |
from yelp.recommender import recommend, build_similarities | |
warnings.filterwarnings(action='ignore') | |
StreamHandler(sys.stdout).push_application() | |
log = Logger(__name__) | |
AMENITIES = ['Decor', 'Food', 'None', 'Overall', 'Service'] | |
def build_similarities(df_extractions): | |
def get_item_features(): | |
for idx, (item_id, df) in enumerate(df_extractions.groupby('item_id')): | |
yield (item_id, ' '.join(df.feature.tolist())) | |
df_item_features = pd.DataFrame(get_item_features(), columns=['item_id', 'feature']) | |
tfidf = TfidfVectorizer() | |
item_ids = df_item_features.item_id.tolist() | |
item_similarities = pd.DataFrame(cosine_similarity(tfidf.fit_transform(df_item_features.feature)), columns=item_ids, | |
index=item_ids).fillna(0) | |
return item_similarities | |
def recommend(item_id, item_similarities, recommend_size=9): | |
recommendations = item_similarities.ix[item_id].sort_values(ascending=False) | |
item_recs = recommendations.index.values[1:recommend_size + 1].tolist() | |
item_sims = recommendations.values[1:recommend_size + 1].tolist() | |
return item_recs, item_sims | |
def get_items_and_recommendations(df_extractions, yelp_business_csv_path): | |
"""Create a DataFrame of items and their recommendations | |
Parameters | |
---------- | |
df_extractions: DataFrame | |
Extractions obtained using our opinion mining framework. | |
yelp_business_csv_path: str | |
Path to business records of Yelp Data Challenge in CSV format | |
""" | |
log.info('Generating recommendations ...') | |
def _recommend_all(item_similarities): | |
for item_id in df_extractions.item_id.unique(): | |
item_recs, item_sims = recommend(item_id, item_similarities) | |
yield dict(item_id=item_id, related_items=item_recs, related_items_sims=item_sims) | |
def get_restaurants(df_extractions, yelp_business_csv_path): | |
"""Load records of Yelp restaurants corresponding to a set of extractions | |
Args: | |
df_extractions (DataFrame): Extractions obtained using our opinion | |
mining framework. | |
yelp_business_csv_path (str) :Path to business records of Yelp Dataset | |
Challenge in CSV format. | |
""" | |
businesses = pd.read_csv(yelp_business_csv_path) | |
cols = ['name', 'business_id', 'stars'] | |
return businesses[businesses.business_id.isin(df_extractions.item_id)][cols] | |
df_item_similarities = build_similarities(df_extractions) | |
recommendations = pd.DataFrame(_recommend_all(df_item_similarities)) | |
restaurants = get_restaurants(df_extractions, yelp_business_csv_path) | |
items = pd.merge(recommendations, restaurants, how='left', left_on='item_id', right_on='business_id') | |
if 'business_id' in items.columns: | |
items.drop('business_id', axis=1, inplace=True) | |
items['average_rating'] = items.stars.copy() | |
return items | |
def filter_extractions(df_extractions, feature_freq_percentile=0.15, op_percentile=0.25): | |
log.info('Filtering extractions ....') | |
# remove extractions without opinions. | |
df_extractions.dropna(subset=['opinion'], inplace=True) | |
# remove infrequent features | |
feature_count = df_extractions.groupby('feature').size() | |
if isinstance(feature_freq_percentile, float): | |
min_threshold = np.percentile(df_extractions.groupby('feature').size(), 100 * feature_freq_percentile) | |
else: | |
min_threshold = feature_freq_percentile | |
infrequent_features = feature_count[feature_count <= min_threshold].index | |
df_extractions = df_extractions[~df_extractions.feature.isin(infrequent_features)] | |
# remove infrequent opinion patterns | |
pattern_count = df_extractions.groupby('opinion_pattern').size() | |
if isinstance(feature_freq_percentile, float): | |
min_threshold = np.percentile(df_extractions.groupby('opinion_pattern').size(), 100 * op_percentile) | |
else: | |
min_threshold = op_percentile | |
infrequent_patterns = pattern_count[pattern_count <= min_threshold].index | |
df_extractions = df_extractions[~df_extractions.opinion_pattern.isin(infrequent_patterns)] | |
return df_extractions | |
def extractions_from_mongodb(collection_name='reviews_Yelp_omf'): | |
"""Load extractions from MongoDB""" | |
log.info('Loading extractions from MongoDB ({}) ...'.format(collection_name)) | |
client = MongoClient() | |
db = client.reviewsanalyser | |
assert collection_name in db.collection_names() | |
cursor = db['reviews_Yelp_omf'].find() | |
def gen(): | |
for review in cursor: | |
for sentence in review['sentences']: | |
for feature in sentence['features']: | |
yield {'feature': feature['feature'], | |
'opinion': feature['opinion'], | |
'feature_index': feature['feature_index'], | |
'opinion_index': feature['opinion_index'], | |
'opinion_pattern': feature['opinion_pattern'], | |
'original_feature': feature['original_feature'], | |
'sentiment': feature['sentiment'], | |
'sentence_str': sentence['sentence_str'], | |
'user_id': review['user_id'], | |
'sentence_idx': sentence['sentence_idx'], | |
'item_id': review['item_id'], | |
'rating': review['rating'], | |
'rating_date': review['rating_date'], | |
'review_id': review['review_id'] | |
} | |
df_extractions = pd.DataFrame(list(gen())) | |
return df_extractions | |
def get_restaurants(df_businesses): | |
df_businesses['is_restaurant'] = df_businesses.categories.apply(lambda c: c == "['Restaurants']") | |
cols = ['name', 'categories', 'city', 'state', 'stars', 'review_count', 'open', 'business_id'] | |
return df_businesses.query('is_restaurant == True')[cols] | |
def csv_to_mongodb(data_dir): | |
df_businesses = pd.read_csv(data_dir + 'yelp_academic_dataset_business.csv') | |
df_reviews = pd.read_csv(data_dir + 'yelp_academic_dataset_review.csv') | |
df_restaurants = get_restaurants(df_businesses) | |
df_reviews = df_reviews[df_reviews.business_id.isin(df_restaurants.business_id)] | |
client = MongoClient() | |
db = client.reviewsanalyser | |
col_aliases = {'stars': 'rating', 'text': 'review_text', 'business_id': 'item_id', 'date': 'rating_date'} | |
df_reviews['batch_id'] = '(none)' | |
df_reviews['url'] = '(none)' | |
df_reviews['review_title'] = '(none)' | |
df_reviews['site_name'] = '(none)' | |
cols = ['rating', 'user_id', 'review_id', 'url', 'review_text', 'item_id', 'review_title', | |
'batch_id', 'site_name', 'rating_date'] | |
if 'reviews_Yelp' in db.collection_names(): | |
db.reviews_Yelp.delete_many({}) | |
db.reviews_Yelp.insert_many(df_reviews.rename(columns=col_aliases)[cols].to_dict('records')) | |
def load_clustering_data(file_path): | |
""" | |
Args: | |
file_path: Path to training or test file. | |
Returns: | |
A DataFrame of feature categories. Features are used as index, and a `category` column for feature categories. | |
""" | |
with open(file_path) as fp: | |
training = [(f[1:].strip(), f[0]) for f in fp.readlines()] | |
return pd.DataFrame.from_records(training, columns=['feature', 'category']).set_index('feature') | |
def get_feature_sentences(df_extractions): | |
"""Get features and the sentences they occur in. | |
Args: | |
df_extractions: DataFrame of extractions | |
Returns: | |
A list of tuples. In each tuple, the first item is feature and the second item | |
is a string of all sentences the feature occurs in. | |
""" | |
for idx, (feature, df) in enumerate(df_extractions.groupby('feature')): | |
sentences = '\n'.join(df.sentence_str) | |
yield (feature, sentences) | |
def group_features(df_extractions, train_dir): | |
log.info('Grouping features into amenities ...') | |
train_categories = load_clustering_data(train_dir + 'training.txt') | |
test_categories = load_clustering_data(train_dir + 'test.txt') | |
assert len(set(train_categories.index).intersection(test_categories.index)) == 0 | |
train_categories = pd.concat([train_categories, test_categories]) # combine test and training data :) | |
feature_sentences = pd.DataFrame(get_feature_sentences(df_extractions), columns=['feature', 'sentences']) | |
tfidf = TfidfVectorizer() | |
feature_sentences_dtm = tfidf.fit_transform(feature_sentences.sentences) | |
df_train = feature_sentences[feature_sentences.feature.isin(train_categories.index)].copy() | |
df_train['target'] = df_train.feature.apply(lambda f: train_categories.ix[f]) | |
df_test = feature_sentences[feature_sentences.feature.isin(test_categories.index)].copy() | |
df_test['target'] = df_test.feature.apply(lambda f: test_categories.ix[f]) | |
train_dtm = tfidf.transform(df_train.sentences) | |
knn = KNeighborsClassifier(n_neighbors=1) | |
knn.fit(train_dtm, df_train.target) | |
feature_sentences['category'] = knn.predict(feature_sentences_dtm) | |
category_mapping = {'n': 'None', 'f': 'Food', 's': 'Service', 'd': 'Decor', 'o': 'Overall'} | |
feature_sentences['category'] = feature_sentences.category.map(category_mapping) | |
df_extractions = pd.merge(df_extractions, feature_sentences[['feature', 'category']]) | |
return df_extractions | |
def main(mongodb_collection_name, yelp_business_csv_file, output_dir, train_dir): | |
df_extractions = extractions_from_mongodb(collection_name=mongodb_collection_name) | |
df_extractions = filter_extractions(df_extractions) | |
df_extractions = group_features(df_extractions, train_dir) | |
print(df_extractions.columns.tolist()) | |
df_items = get_items_and_recommendations(df_extractions, yelp_business_csv_file) | |
df_items.to_csv('{}items.csv'.format(output_dir), index=False) | |
cols = ['feature', 'review_id', 'user_id', 'item_id', 'sentiment', 'amenity'] | |
df_extractions.rename(columns={'category': 'amenity'}, inplace=True) | |
df_extractions[cols].to_csv('{}extractions.csv'.format(output_dir), index=False) | |
cols = ['item_id', 'related_items', 'related_items_sims'] | |
df_recommendations = df_items[cols].copy() | |
for col in ['related_items', 'related_items_sims']: | |
df_recommendations[col] = df_recommendations[col].apply(lambda x: ','.join([str(y) for y in x])) | |
df_recommendations[cols].to_csv('{}recommendations.csv'.format(output_dir), index=False) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description='Convert Yelp Dataset Challenge data from JSON format to explanation input' | |
) | |
parser.add_argument('--mongodb_collection', type=str, default='reviews_Yelp_omf', | |
help='Collection name for Yelp opinions in MongoDB.') | |
parser.add_argument('--yelp_business_csv_file', type=str, | |
help='Path to business records of Yelp Data Challenge in CSV format.') | |
parser.add_argument('--outdir', type=str, | |
help='Path to output directory to store CSV files.') | |
parser.add_argument('--traindir', type=str, default='../data/yelp/labeled_features/', | |
help='Path to directory with training data.') | |
args = parser.parse_args() | |
assert os.path.isfile(args.yelp_business_csv_file), 'Failed to find CSV file of businesses' | |
# fix output directory path & ensure it exists. | |
output_dir = args.outdir if args.outdir.endswith('/') else args.outdir + '/' | |
train_dir = args.traindir if args.traindir.endswith('/') else args.traindir + '/' | |
if not os.path.exists(output_dir): | |
os.mkdir(output_dir) | |
assert os.path.isdir(output_dir) | |
assert os.path.isdir(train_dir) | |
main(mongodb_collection_name=args.mongodb_collection, yelp_business_csv_file=args.yelp_business_csv_file, | |
output_dir=output_dir, train_dir=train_dir) | |
import subprocess | |
uri_in = 'mongodb://127.0.0.1:27017' | |
uri_out = 'mongodb://127.0.0.1:27017' | |
df_input = 'reviewsanalyser' | |
df_output = 'reviewsanalyser' | |
coll_input = 'reviews_Yelp' | |
coll_output = 'reviews_Yelp_omf' | |
n_jobs = 2 | |
cmd = 'time omf_pipeline -n {n_jobs} --uri_in {uri_in} --uri_out {uri_out} --db_input {df_input} ' + \ | |
'--db_output {df_output} --find_query={} --coll_input {coll_input} ' + \ | |
'--coll_output {coll_output} --log 3 > run_omf.log' | |
subprocess.call(cmd) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment