Last active
June 19, 2019 21:08
-
-
Save ripiuk/be3bfe334b31306f8921acbee2ecc371 to your computer and use it in GitHub Desktop.
Get instagram images/videos of some user
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import time | |
import uuid | |
import json | |
import asyncio | |
import typing as typ | |
import urllib.parse as urlparse | |
from argparse import ArgumentParser | |
from aiohttp import ClientSession | |
def flatten(li: typ.List[typ.Union[str, typ.List[str]]]) -> typ.Iterable[str]: | |
for el in li: | |
if isinstance(el, list): | |
yield from flatten(el) | |
else: | |
yield el | |
async def download_imgs(imgs: typ.List[typ.Union[str, typ.List[str]]], | |
download_dir: str, session: ClientSession) -> None: | |
tasks = [] | |
if not os.path.exists(download_dir): | |
os.makedirs(download_dir) | |
async def _download_image(data_url: str): | |
async with session.get(data_url) as response: | |
resp_data = await response.read() | |
extention = "mp4" if "mp4" in data_url else "jpg" | |
with open(f"{download_dir}/{str(uuid.uuid4())}.{extention}", 'wb') as file: | |
file.write(resp_data) | |
for img in flatten(imgs): | |
task = asyncio.ensure_future(_download_image(img)) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
async def get_imgs(session: ClientSession, user: str, limit: int, | |
with_child: bool = True, with_video: bool = True) -> typ.List[typ.Union[str, typ.List[str]]]: | |
data, posts_num, has_next_page = [], 0, True | |
headers = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;" | |
"q=0.8,application/signed-exchange;v=b3", | |
"accept-encoding": "gzip, deflate, br", | |
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6", | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " | |
"(KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36", | |
} | |
async with session.get("https://www.instagram.com/" + user, headers=headers) as response: | |
resp = await response.text() | |
shared_data = re.search((r'(?<=window._sharedData = ).*?(?=;<)'), resp).group(0) | |
shared_data = json.loads(shared_data) | |
user_info = shared_data['entry_data']['ProfilePage'][0]['graphql']['user'] | |
user_id = user_info['id'] | |
timeline_media = user_info['edge_owner_to_timeline_media'] | |
print(f"Max img in the profile: {timeline_media['count']}") | |
def get_data(edges, aim=None): | |
aim = data if not isinstance(aim, list) else aim | |
for el in edges: | |
if with_child and el['node'].get('edge_sidecar_to_children'): | |
aim.append(list()) | |
get_data(el['node']['edge_sidecar_to_children']['edges'], aim=aim[-1]) | |
elif with_video and el['node']['is_video']: | |
aim.append(el['node']['video_url']) | |
else: | |
aim.append(el['node']['display_url']) | |
while posts_num < limit: | |
if not has_next_page: | |
return data[:limit] | |
variables = dict(id=user_id, first=50) | |
params = { | |
"query_hash": "f2405b236d85e8296cf30347c9f08c2a", | |
"variables": json.dumps(variables) | |
} | |
query = urlparse.urlencode(params) | |
url = "https://www.instagram.com/graphql/query/?" + query | |
headers = { | |
"accept": "*/*", | |
"accept-encoding": "gzip, deflate, br", | |
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6", | |
"referer": f"https://www.instagram.com/{user}/", | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " | |
"(KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36", | |
"x-requested-with": "XMLHttpRequest" | |
} | |
async with session.get(url, headers=headers) as response: | |
resp = await response.json() | |
timeline_media = resp['data']['user']['edge_owner_to_timeline_media'] | |
has_next_page = timeline_media['page_info']['has_next_page'] | |
next_hash = timeline_media['page_info']['end_cursor'] | |
variables['after'] = next_hash | |
get_data(timeline_media['edges']) | |
posts_num += len(timeline_media['edges']) | |
return data[:limit] | |
async def main(args): | |
sm = asyncio.Semaphore(100) | |
async with ClientSession() as session: | |
async with sm: | |
data = await get_imgs(session, args.user, limit=args.limit, with_child=args.child, with_video=args.video) | |
data_count = len(data) | |
post_length = len(str(data_count)) * 2 | |
print(f"Got {data_count} posts (limit: {args.limit}):") | |
print(f"{'post №':>{post_length}} | url") | |
for i, data_url in enumerate(data, 1): | |
if isinstance(data_url, list): | |
for j, sub_url in enumerate(data_url, 1): | |
print(f"{str(i)+'.'+str(j):>{post_length}} | {sub_url}") | |
continue | |
print(f"{i:>{post_length}} | {data_url}") | |
if args.save: | |
args.dir = f"{args.dir}/{args.user}" | |
await download_imgs(data, args.dir, session) | |
print(f"Saved {data_count} posts to the {args.dir} directory") | |
if __name__ == "__main__": | |
parser = ArgumentParser() | |
parser.add_argument("user", type=str, help="username in instagram") | |
parser.add_argument("-l", "--limit", type=int, default=12, help="number of posts") | |
parser.add_argument("-s", "--save", action="store_true", help="download images") | |
parser.add_argument("-d", "--dir", type=str, help="save images to the directory", default="imgs") | |
parser.add_argument("-v", "--video", action="store_false", help="ignore videos, store only cover images for them") | |
parser.add_argument("-c", "--child", action="store_false", help="ignore images from collections") | |
arguments = parser.parse_args() | |
start = time.time() | |
try: | |
loop = asyncio.get_event_loop() | |
future = asyncio.ensure_future(main(arguments)) | |
loop.run_until_complete(future) | |
except (KeyError, IndexError, json.JSONDecodeError) as err: | |
print("Got an error:", type(err), err) | |
print("Time:", time.time() - start) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment