Created
April 19, 2019 22:54
-
-
Save jcrubino/6075469390e41f5499c2356046768bc2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
from requests import get | |
import json | |
import re | |
import emoji | |
css = "script" | |
def get_page(url): | |
r = get(url) | |
r.close() | |
return BeautifulSoup(r.text, features="lxml") | |
def extract_tag_data(url): | |
soup = get_page(url) | |
raw = soup.select('script')[3].string.split("window._sharedData = ")[-1][0:-1] | |
data = json.loads(raw) | |
return data['entry_data']['TagPage'][0]['graphql']['hashtag'] | |
class IGPage(object): | |
def __init__(self, tag_data): | |
self._tag_data = tag_data | |
self.keys = lambda: self._tag_data.keys() | |
self.page_info = self._tag_data['edge_hashtag_to_media']['page_info'] | |
self.edges = self._tag_data['edge_hashtag_to_media']['edges'] | |
self.nodes = sorted([IGMediaNode(x) for x in self.edges], key=lambda x: x.likes_count) | |
self.nodes.reverse() | |
self.top_posts = [IGMediaNode(x) for x in self._tag_data['edge_hashtag_to_top_posts']['edges']] | |
class Comment(object): | |
def __init__(self, node): | |
self._node = node | |
try: | |
self.text = self._node['edge_media_to_caption']['edges'][0]['node']['text'] | |
except Exception: | |
return None | |
self.hashtags = ['#'+x for x in re.findall(r"#(\w+)", self.text)] | |
self.emojis = [c for c in self.text if c in emoji.UNICODE_EMOJI] | |
self.user_refs = ['@'+x for x in re.findall(r"@(\w+)", self.text)] | |
class IGMediaNode(object): | |
""" | |
'comments_disabled', '__typename', 'id', 'edge_media_to_caption', | |
'shortcode', 'edge_media_to_comment', 'taken_at_timestamp', 'dimensions', | |
'display_url', 'edge_liked_by', 'edge_media_preview_like', 'owner', 'thumbnail_src', | |
'thumbnail_resources', 'is_video', 'accessibility_caption' | |
""" | |
def __init__(self, edgeNode): | |
if 'node' in edgeNode.keys() and len(edgeNode.keys()) == 1: | |
self._node = edgeNode['node'] | |
else: | |
self._node = edgeNode | |
def keys(self): | |
return self._node.keys() | |
@property | |
def id(self): | |
return self._node['id'] | |
@property | |
def shortcode(self): | |
return self._node['shortcode'] | |
@property | |
def unixTimeStamp(self): | |
return self._node['taken_at_timestamp'] | |
@property | |
def image_url(self): | |
return self._node['display_url'] | |
@property | |
def thumnail_src(self): | |
return self._node['thumnail_src'] | |
@property | |
def is_video(self): | |
return self._node['is_video'] | |
@property | |
def accessibility_caption(self): | |
return self._node['accessibility_caption'] | |
@property | |
def likes_count(self): | |
return self._node['edge_liked_by']['count'] | |
@property | |
def dimensions(self): | |
return self._node['dimensions'] | |
@property | |
def comment(self): | |
return Comment(self._node) | |
class MarkvovStructs(object): | |
def __init__(self): | |
self._struct = {} | |
self._pos_struct = {} | |
class MarkovChain(MarkvovStructs): | |
""" | |
classes might be needed for each hashtag | |
and topics: cohashtags, position scores, mentions, emojis | |
""" | |
def add_hashtags(self, hashtag_list): | |
# deduplicate | |
hashtag_list = list(set(hashtag_list)) | |
# create positional base score with epsilon nudge | |
bscore = (len(hashtag_list)-1)+.00000001 | |
# walk list to find cooccurances for | |
# binary tally (order not important) | |
for idx, ht in enumerate(hashtag_list): | |
cotags = list(hashtag_list) | |
cotags.remove(ht) | |
if ht in self._struct.keys(): | |
neighbors = self._struct[ht] | |
else: | |
neighbors = {} | |
# binary tally | |
for tag in cotags: | |
if tag in neighbors.keys(): | |
neighbors[tag] += 1 | |
else: | |
neighbors[tag] = 1 | |
self._struct[ht] = neighbors | |
# if not the last tag create positional score | |
pos_tags = hashtag_list[idx+1:] | |
if pos_tags == []: | |
continue # should return if last hashtag in list | |
# retrieve prior scores or make new scorebook (dict) | |
if ht in self._pos_struct.keys(): | |
neighbors = self._pos_struct[ht] | |
else: | |
neighbors = {} | |
# create positional scores tally | |
scores = {} | |
for idx, tag in enumerate(pos_tags): | |
score = 1-((idx+1)/bscore) | |
scores[tag] = score | |
# add scores to scorebook | |
for tag in scores.keys(): | |
if tag in neighbors.keys(): | |
neighbors[tag] += scores[tag] | |
else: | |
neighbors[tag] = scores[tag] | |
self._pos_struct[ht] = neighbors | |
def catch_keywords(keyword): | |
url = "https://www.instagram.com/explore/tags/{0}/".format(keyword) | |
keyword_posts = IGPage(extract_tag_data(url)) | |
mc = MarkovChain() | |
all_posts = keyword_posts.top_posts + keyword_posts.nodes | |
for post in all_posts: | |
try: | |
hashtags = post.comment.hashtags | |
if len(hashtags) > 0: | |
mc.add_hashtags(hashtags) | |
except AttributeError: | |
continue | |
scores = sorted(mc._pos_struct['#{0}'.format(keyword)].items(), key=lambda x: x[1])[-10:] | |
scores.reverse() | |
pscores = [x[0][1:] for x in scores] | |
print(' '.join(pscores)) | |
return scores | |
def compute_toptags(keyword, lim=3): | |
scores = {} | |
mainkw = catch_keywords(keyword) | |
scores[keyword] = mainkw | |
fetchlist = [x[0] for x in mainkw] | |
for kw in fetchlist: | |
try: | |
score = catch_keywords(kw) | |
scores[kw] = score | |
except AttributeError: | |
continue | |
scoretally = {} | |
for kw in scores.keys(): | |
results = scores[kw] | |
for tag, rank in results: | |
if tag in scoretally.keys(): | |
scoretally[tag] += rank | |
else: | |
scoretally[tag] = rank | |
scoreresults = sorted(scoretally.items(), key=lambda x:x[1], reverse=True) | |
return scoreresults | |
if __name__ == '__main__': | |
compute_toptags('sailing') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment