Created
February 3, 2020 16:18
-
-
Save gcsfred/82093324e623ed582b603cb3d89543ad to your computer and use it in GitHub Desktop.
Sample integration between Elasticsearch and Amazon Personalize - entire file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[DEFAULT] | |
product_ranking_start = 10 | |
product_ranking_steps_down = 0.2 | |
cat_ranking_start = 10 | |
cat_ranking_steps_down = 0.2 | |
product_recommendations_campaignArn=arn:aws:personalize:us-east-2:11123456:campaign/es-test03-hrnn | |
product_rankings_campaignArn=arn:aws:personalize:us-east-2:222789:campaign/es-test03-rank | |
property1_recommendations_campaignArn=arn:aws:personalize:us-east-2:3333456:campaign/es-test03-cat-hrnn | |
property1_rankings_campaignArn=arn:aws:personalize:us-east-2:444789:campaign/es-test03-cat-rank |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from elasticsearch import Elasticsearch | |
import sys | |
import getopt | |
import configparser | |
import os | |
os.environ['AWS_PROFILE'] = "default" | |
os.environ['AWS_DEFAULT_REGION'] = "us-east-2" | |
# Call with arguments -u <user> -s <search_text> | |
def main(argv): | |
opts, args = getopt.getopt(argv, "u:s:") | |
user = "_" | |
search = "_" | |
for opt, arg in opts: | |
if opt == '-u': | |
user = arg | |
if opt == '-s': | |
search = arg | |
search_with_personalization(user, search) | |
def search_with_personalization(user, search): | |
config = configparser.ConfigParser() | |
config.read('config.conf') | |
categories = get_category_recommendations(config, user) | |
ranked_categories = get_category_ranking(config, user, categories) | |
products = get_product_recommendations(config, user) | |
ranked_products = get_product_ranking(config, user, products) | |
query_es(search, ranked_categories, ranked_products) | |
def _log_info(msg): | |
print(msg) | |
def get_product_recommendations(config, user): | |
_log_info('Retrieving product recommendations') | |
return _get_recommendations(user, config['DEFAULT']['product_recommendations_campaignArn']) | |
def get_category_recommendations(config, user): | |
_log_info('Retrieving category (category) recommendations') | |
return _get_recommendations(user, config['DEFAULT']['category_recommendations_campaignArn']) | |
def _get_recommendations(user, campaign): | |
personalize = boto3.client('personalize-runtime', 'us-east-2') | |
response = personalize.get_recommendations(campaignArn=campaign, userId=user) | |
answer = [] | |
_log_info('Recommended items:') | |
for item in response['itemList']: | |
item_id = item['itemId'] | |
answer.append(item_id) | |
_log_info('itemId:' + item_id) | |
return answer | |
def get_product_ranking(config, user, input_list): | |
_log_info('Retrieving product ranking') | |
answer = _get_ranking(config['DEFAULT']['product_rankings_campaignArn'], user, input_list, | |
float(config['DEFAULT']['product_ranking_start']), | |
float(config['DEFAULT']['product_ranking_steps_down'])) | |
return answer | |
def get_category_ranking(config, user, input_list): | |
_log_info('Retrieving category (category) ranking') | |
answer = _get_ranking(config['DEFAULT']['category_rankings_campaignArn'], user, input_list, | |
float(config['DEFAULT']['cat_ranking_start']), int(config['DEFAULT']['cat_ranking_steps_down'])) | |
return answer | |
def _get_ranking(campaign, user, input_list, start, steps_down): | |
personalize = boto3.client('personalize-runtime', 'us-east-2') | |
response = personalize.get_personalized_ranking(campaignArn=campaign, userId=user, inputList=input_list) | |
answer = {} | |
i = 0 | |
_log_info('Ranked items:') | |
for item in response['personalizedRanking']: | |
k = start - (i*steps_down) | |
answer[item['itemId']] = k | |
_log_info('Ranking ' + item['itemId'] + ' to ' + str(k)) | |
i = i+1 | |
return answer | |
def transform_category_boost(category_boost_pairs): | |
root = [] | |
for key in category_boost_pairs.keys(): | |
root.append({"match": { | |
"category": { | |
"query": key, | |
"boost": category_boost_pairs[key] | |
} | |
}}) | |
return root | |
def transform_product_id_weights(product_id_weight_pairs): | |
root = [] | |
for key in product_id_weight_pairs.keys(): | |
root.append( | |
{ | |
"filter": { | |
"ids": { | |
"values": [ | |
key | |
] | |
} | |
}, | |
"weight": product_id_weight_pairs[key] | |
} | |
) | |
return root | |
def arrange_json_array(a, b): | |
a.append(b) | |
return a | |
def query_es(text_search, category_boost_pairs, product_id_weight_pairs): | |
client = Elasticsearch() | |
the_body = { | |
"query": { | |
"function_score": { | |
"query": { | |
"bool": { | |
"should": arrange_json_array( | |
transform_category_boost(category_boost_pairs), { | |
"match": { | |
"keywords": text_search | |
} | |
}) | |
} | |
}, | |
"boost": "5", | |
"functions": transform_product_id_weights(product_id_weight_pairs), | |
"score_mode": "max", | |
"boost_mode": "multiply" | |
} | |
} | |
} | |
response = client.search( | |
index="bank", | |
body=the_body | |
) | |
if response is None or response['hits'] is None or response['hits']['hits'] is None\ | |
or len(response['hits']['hits']) == 0: | |
_log_info('No search results.') | |
else: | |
_log_info('There are ' + str(len(response['hits']['hits'])) + ' search results.') | |
i = 1 | |
for hit in response['hits']['hits']: | |
_log_info('#' + str(i) + ': Score:' + str(hit['_score']) + ', search result:' + str(hit['_source'])) | |
i = i + 1 | |
if __name__ == "__main__": | |
main(sys.argv[1:]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment