Skip to content

Instantly share code, notes, and snippets.

@jcrubino
Created April 19, 2019 22:54
Show Gist options
  • Save jcrubino/6075469390e41f5499c2356046768bc2 to your computer and use it in GitHub Desktop.
Save jcrubino/6075469390e41f5499c2356046768bc2 to your computer and use it in GitHub Desktop.
from bs4 import BeautifulSoup
from requests import get
import json
import re
import emoji
css = "script"
def get_page(url):
r = get(url)
r.close()
return BeautifulSoup(r.text, features="lxml")
def extract_tag_data(url):
soup = get_page(url)
raw = soup.select('script')[3].string.split("window._sharedData = ")[-1][0:-1]
data = json.loads(raw)
return data['entry_data']['TagPage'][0]['graphql']['hashtag']
class IGPage(object):
def __init__(self, tag_data):
self._tag_data = tag_data
self.keys = lambda: self._tag_data.keys()
self.page_info = self._tag_data['edge_hashtag_to_media']['page_info']
self.edges = self._tag_data['edge_hashtag_to_media']['edges']
self.nodes = sorted([IGMediaNode(x) for x in self.edges], key=lambda x: x.likes_count)
self.nodes.reverse()
self.top_posts = [IGMediaNode(x) for x in self._tag_data['edge_hashtag_to_top_posts']['edges']]
class Comment(object):
def __init__(self, node):
self._node = node
try:
self.text = self._node['edge_media_to_caption']['edges'][0]['node']['text']
except Exception:
return None
self.hashtags = ['#'+x for x in re.findall(r"#(\w+)", self.text)]
self.emojis = [c for c in self.text if c in emoji.UNICODE_EMOJI]
self.user_refs = ['@'+x for x in re.findall(r"@(\w+)", self.text)]
class IGMediaNode(object):
"""
'comments_disabled', '__typename', 'id', 'edge_media_to_caption',
'shortcode', 'edge_media_to_comment', 'taken_at_timestamp', 'dimensions',
'display_url', 'edge_liked_by', 'edge_media_preview_like', 'owner', 'thumbnail_src',
'thumbnail_resources', 'is_video', 'accessibility_caption'
"""
def __init__(self, edgeNode):
if 'node' in edgeNode.keys() and len(edgeNode.keys()) == 1:
self._node = edgeNode['node']
else:
self._node = edgeNode
def keys(self):
return self._node.keys()
@property
def id(self):
return self._node['id']
@property
def shortcode(self):
return self._node['shortcode']
@property
def unixTimeStamp(self):
return self._node['taken_at_timestamp']
@property
def image_url(self):
return self._node['display_url']
@property
def thumnail_src(self):
return self._node['thumnail_src']
@property
def is_video(self):
return self._node['is_video']
@property
def accessibility_caption(self):
return self._node['accessibility_caption']
@property
def likes_count(self):
return self._node['edge_liked_by']['count']
@property
def dimensions(self):
return self._node['dimensions']
@property
def comment(self):
return Comment(self._node)
class MarkvovStructs(object):
def __init__(self):
self._struct = {}
self._pos_struct = {}
class MarkovChain(MarkvovStructs):
"""
classes might be needed for each hashtag
and topics: cohashtags, position scores, mentions, emojis
"""
def add_hashtags(self, hashtag_list):
# deduplicate
hashtag_list = list(set(hashtag_list))
# create positional base score with epsilon nudge
bscore = (len(hashtag_list)-1)+.00000001
# walk list to find cooccurances for
# binary tally (order not important)
for idx, ht in enumerate(hashtag_list):
cotags = list(hashtag_list)
cotags.remove(ht)
if ht in self._struct.keys():
neighbors = self._struct[ht]
else:
neighbors = {}
# binary tally
for tag in cotags:
if tag in neighbors.keys():
neighbors[tag] += 1
else:
neighbors[tag] = 1
self._struct[ht] = neighbors
# if not the last tag create positional score
pos_tags = hashtag_list[idx+1:]
if pos_tags == []:
continue # should return if last hashtag in list
# retrieve prior scores or make new scorebook (dict)
if ht in self._pos_struct.keys():
neighbors = self._pos_struct[ht]
else:
neighbors = {}
# create positional scores tally
scores = {}
for idx, tag in enumerate(pos_tags):
score = 1-((idx+1)/bscore)
scores[tag] = score
# add scores to scorebook
for tag in scores.keys():
if tag in neighbors.keys():
neighbors[tag] += scores[tag]
else:
neighbors[tag] = scores[tag]
self._pos_struct[ht] = neighbors
def catch_keywords(keyword):
url = "https://www.instagram.com/explore/tags/{0}/".format(keyword)
keyword_posts = IGPage(extract_tag_data(url))
mc = MarkovChain()
all_posts = keyword_posts.top_posts + keyword_posts.nodes
for post in all_posts:
try:
hashtags = post.comment.hashtags
if len(hashtags) > 0:
mc.add_hashtags(hashtags)
except AttributeError:
continue
scores = sorted(mc._pos_struct['#{0}'.format(keyword)].items(), key=lambda x: x[1])[-10:]
scores.reverse()
pscores = [x[0][1:] for x in scores]
print(' '.join(pscores))
return scores
def compute_toptags(keyword, lim=3):
scores = {}
mainkw = catch_keywords(keyword)
scores[keyword] = mainkw
fetchlist = [x[0] for x in mainkw]
for kw in fetchlist:
try:
score = catch_keywords(kw)
scores[kw] = score
except AttributeError:
continue
scoretally = {}
for kw in scores.keys():
results = scores[kw]
for tag, rank in results:
if tag in scoretally.keys():
scoretally[tag] += rank
else:
scoretally[tag] = rank
scoreresults = sorted(scoretally.items(), key=lambda x:x[1], reverse=True)
return scoreresults
if __name__ == '__main__':
compute_toptags('sailing')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment