Last active
July 18, 2018 13:58
-
-
Save derickson/ff0aeb22630433008372d19487725f89 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
import json | |
import moment | |
from elasticsearch import helpers | |
from elasticsearch import Elasticsearch | |
esConnString = 'http://elastic:changeme@localhost:9200' | |
esTo = Elasticsearch([esConnString],request_timeout=100) | |
## utility function | |
def prettyPrint(doc): | |
print(json.dumps(doc, indent=4, sort_keys=True)) | |
def loadObjFromJSONFile(fn): | |
with open(fn,'r') as infile: | |
return json.load(infile) | |
### Untappd Checkin download, a list of json objects representing checkins | |
fn = 'checkin-report_07_27_17.json' | |
checkins = loadObjFromJSONFile(fn) | |
BEER_INDEX_NAME = 'beer' | |
BEER_TYPE_NAME = "checkins" | |
BEER_PERCOLATE_NAME = "checkinspercolate" | |
BREWERY_COUNTRY_FIELD = "brewery_country" | |
LOCATION_FIELD = "location" | |
PERCOLATE_FIELD = "query" | |
SECURITY_TAGS_FIELD = "securityTags" | |
CREATED_AT_FIELD = "created_at" | |
beerBody = { | |
"settings": { | |
"number_of_shards" : 1, | |
"number_of_replicas": 0 | |
}, | |
"mappings": { | |
BEER_TYPE_NAME: { | |
"properties": { | |
"@timestamp": { | |
"type": "date", | |
"format": "strict_date_optional_time||epoch_millis" | |
}, | |
BREWERY_COUNTRY_FIELD: { "type": "keyword"}, | |
LOCATION_FIELD: { "type": "geo_point"} | |
} | |
}, | |
BEER_PERCOLATE_NAME : { | |
"properties": { | |
PERCOLATE_FIELD: {"type": "percolator"}, | |
SECURITY_TAGS_FIELD: {"type": "keyword"} | |
} | |
} | |
} | |
} | |
esTo.indices.delete(index=BEER_INDEX_NAME) | |
esTo.indices.create(index=BEER_INDEX_NAME, body=beerBody) | |
allBeerMatcher = { | |
SECURITY_TAGS_FIELD: "Beer", | |
PERCOLATE_FIELD: { | |
"match_all": {} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=allBeerMatcher, refresh=True) | |
print("insert all beer matcher") | |
domesticMatcher = { | |
SECURITY_TAGS_FIELD: "DomesticBeer", | |
PERCOLATE_FIELD : { | |
"match": { | |
"brewery_country": "United States" | |
} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=domesticMatcher, refresh=True) | |
print("insert domestic matcher") | |
homeLocation = [<<LON>>, <<LAT>>] ## the thing I am trying to protect | |
homeMatcher = { | |
SECURITY_TAGS_FIELD: "HomeDrinking", | |
PERCOLATE_FIELD : { | |
"geo_distance": { | |
"distance": "100m", | |
"location": homeLocation | |
} | |
} | |
} | |
esTo.index( index=BEER_INDEX_NAME, doc_type=BEER_PERCOLATE_NAME, body=homeMatcher, refresh=True) | |
print("insert home matcher") | |
esTo.indices.refresh( index=BEER_INDEX_NAME) | |
def genPercolateQuery(doc): | |
return { | |
"query": { | |
"percolate": { | |
"field": PERCOLATE_FIELD, | |
"document_type": BEER_TYPE_NAME, | |
"document": doc | |
} | |
} | |
} | |
## the percolate queries have a security tags field in the percolate result | |
## grab them as a list | |
def tagsFromPercolateHits(returnObj): | |
tags = [] | |
for hit in returnObj["hits"]["hits"]: | |
if "_source" in hit: | |
if SECURITY_TAGS_FIELD in hit["_source"]: | |
tags.append(hit["_source"][SECURITY_TAGS_FIELD]) | |
return tags | |
## convert an untappd string into a python date object ... or is this a moment object? | |
def dateFromUntappdString( dateString ): | |
if(dateString != ""): | |
return moment.date(dateString, "YYYY-MM-DD HH:mm:ss").date | |
else: | |
return None | |
## Transformations before checkin | |
def prepCheckin(checkin): | |
## Location | |
if "venue_lat" in checkin and checkin["venue_lat"] != None: | |
checkin[LOCATION_FIELD] = [float(checkin["venue_lng"]), float(checkin["venue_lat"])] | |
## Clean timestamp | |
if(CREATED_AT_FIELD in checkin): | |
checkin["@timestamp"] = dateFromUntappdString(checkin[CREATED_AT_FIELD]).isoformat() | |
if("beer_abv" in checkin and checkin["beer_abv"] != None ): | |
checkin["beer_abv"] = float(checkin["beer_abv"]) | |
if("beer_ibu" in checkin and checkin["beer_ibu"] != None ): | |
checkin["beer_ibu"] = float(checkin["beer_ibu"]) | |
return checkin | |
## the bulk buffer and bulk action size | |
bulkActions = [] | |
writeBatchSize = 250 | |
for doc in checkins: | |
## static transforms | |
doc = prepCheckin(doc) | |
## percolate tagging | |
returnObj = esTo.search(index=BEER_INDEX_NAME,body=genPercolateQuery(doc),size=100,_source_include=SECURITY_TAGS_FIELD) | |
tags = tagsFromPercolateHits(returnObj) | |
doc["securityTags"] = tags | |
doc["securityTag_Count"] = len(tags) | |
## insert this document (build up bulk actions) | |
action = { | |
"_index": BEER_INDEX_NAME, | |
"_type": BEER_TYPE_NAME, | |
"_source": doc | |
} | |
bulkActions.append( action ) | |
## if the bulk buffer is full, execute the bulk | |
actionCount = len(bulkActions) | |
if(actionCount >= writeBatchSize ): | |
helpers.bulk(esTo, bulkActions) | |
bulkActions = [] | |
print("executed "+str(actionCount) + " actions") | |
## execute any remaning items in bulk buffer | |
if(actionCount > 0 ): | |
helpers.bulk(esTo, bulkActions) | |
bulkActions = [] | |
print("executed "+str(actionCount) + " actions") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment