Last active
March 8, 2021 10:56
-
-
Save edgartaor/f35ed064a45f6f677d19f05af12f1def to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import logging | |
import argparse | |
import codecs | |
import configparser | |
import errno | |
import glob | |
from operator import itemgetter | |
import json | |
import logging.config | |
import hashlib | |
import os | |
import re | |
import sys | |
import textwrap | |
import time | |
try: | |
from urllib.parse import urlparse | |
except ImportError: | |
from urlparse import urlparse | |
import warnings | |
import threading | |
import concurrent.futures | |
import requests | |
import tqdm | |
from instagram_scraper.constants import * | |
try: | |
reload(sys) # Python 2.7 | |
sys.setdefaultencoding("UTF8") | |
except NameError: | |
pass | |
warnings.filterwarnings('ignore') | |
input_lock = threading.RLock() | |
class LockedStream(object): | |
file = None | |
def __init__(self, file): | |
self.file = file | |
def write(self, x): | |
with input_lock: | |
self.file.write(x) | |
def flush(self): | |
return getattr(self.file, 'flush', lambda: None)() | |
original_stdout, original_stderr = sys.stdout, sys.stderr | |
sys.stdout, sys.stderr = map(LockedStream, (sys.stdout, sys.stderr)) | |
def threaded_input(prompt): | |
with input_lock: | |
try: | |
with tqdm.external_write_mode(): | |
original_stdout.write(prompt) | |
original_stdout.flush() | |
return sys.stdin.readline() | |
except AttributeError: | |
original_stdout.write('\n') | |
original_stdout.write(prompt) | |
original_stdout.flush() | |
return sys.stdin.readline() | |
input = threaded_input | |
class PartialContentException(Exception): | |
pass | |
class InstagramScraper(object): | |
"""InstagramScraper scrapes and downloads an instagram user's photos and videos""" | |
def __init__(self, **kwargs): | |
# try: # for Python 3 | |
# from http.client import HTTPConnection | |
# except ImportError: | |
# from httplib import HTTPConnection | |
# HTTPConnection.debuglevel = 1 | |
# logging.basicConfig() | |
# logging.getLogger().setLevel(logging.DEBUG) | |
# requests_log = logging.getLogger("urllib3") | |
# requests_log.setLevel(logging.DEBUG) | |
# requests_log.propagate = True | |
default_attr = dict(username='', usernames=[], filename=None, | |
login_user=None, login_pass=None, | |
destination='./', retain_username=False, interactive=False, | |
quiet=False, maximum=0, media_metadata=False, latest=False, | |
latest_stamps=False, | |
media_types=['image', 'video', 'story-image', 'story-video'], | |
tag=False, location=False, search_location=False, comments=False, | |
verbose=0, include_location=False, filter=None, | |
template='{urlname}') | |
allowed_attr = list(default_attr.keys()) | |
default_attr.update(kwargs) | |
for key in default_attr: | |
if key in allowed_attr: | |
self.__dict__[key] = default_attr.get(key) | |
# story media type means story-image & story-video | |
if 'story' in self.media_types: | |
self.media_types.remove('story') | |
if 'story-image' not in self.media_types: | |
self.media_types.append('story-image') | |
if 'story-video' not in self.media_types: | |
self.media_types.append('story-video') | |
# Read latest_stamps file with ConfigParser | |
self.latest_stamps_parser = None | |
if self.latest_stamps: | |
parser = configparser.ConfigParser() | |
parser.read(self.latest_stamps) | |
self.latest_stamps_parser = parser | |
# If we have a latest_stamps file, latest must be true as it's the common flag | |
self.latest = True | |
# Set up a logger | |
self.logger = InstagramScraper.get_logger(level=logging.DEBUG, verbose=default_attr.get('verbose')) | |
self.posts = [] | |
self.session = requests.Session() | |
self.session.headers = {'user-agent': CHROME_WIN_UA} | |
self.session.cookies.set('ig_pr', '1') | |
self.rhx_gis = None | |
self.cookies = None | |
self.logged_in = False | |
self.last_scraped_filemtime = 0 | |
if default_attr['filter']: | |
self.filter = list(self.filter) | |
self.quit = False | |
def sleep(self, secs): | |
min_delay = 1 | |
for _ in range(secs // min_delay): | |
time.sleep(min_delay) | |
if self.quit: | |
return | |
time.sleep(secs % min_delay) | |
def _retry_prompt(self, url, exception_message): | |
"""Show prompt and return True: retry, False: ignore, None: abort""" | |
answer = input( 'Repeated error {0}\n(A)bort, (I)gnore, (R)etry or retry (F)orever?'.format(exception_message) ) | |
if answer: | |
answer = answer[0].upper() | |
if answer == 'I': | |
self.logger.info( 'The user has chosen to ignore {0}'.format(url) ) | |
return False | |
elif answer == 'R': | |
return True | |
elif answer == 'F': | |
self.logger.info( 'The user has chosen to retry forever' ) | |
global MAX_RETRIES | |
MAX_RETRIES = sys.maxsize | |
return True | |
else: | |
self.logger.info( 'The user has chosen to abort' ) | |
return None | |
def safe_get(self, *args, **kwargs): | |
# out of the box solution | |
# session.mount('https://', HTTPAdapter(max_retries=...)) | |
# only covers failed DNS lookups, socket connections and connection timeouts | |
# It doesnt work when server terminate connection while response is downloaded | |
retry = 0 | |
retry_delay = RETRY_DELAY | |
while True: | |
if self.quit: | |
return | |
try: | |
response = self.session.get(timeout=CONNECT_TIMEOUT, cookies=self.cookies, *args, **kwargs) | |
if response.status_code == 404: | |
return | |
response.raise_for_status() | |
content_length = response.headers.get('Content-Length') | |
if content_length is None or len(response.content) != int(content_length): | |
#if content_length is None we repeat anyway to get size and be confident | |
raise PartialContentException('Partial response') | |
return response | |
except (KeyboardInterrupt): | |
raise | |
except (requests.exceptions.RequestException, PartialContentException) as e: | |
if 'url' in kwargs: | |
url = kwargs['url'] | |
elif len(args) > 0: | |
url = args[0] | |
if retry < MAX_RETRIES: | |
self.logger.warning('Retry after exception {0} on {1}'.format(repr(e), url)) | |
self.sleep(retry_delay) | |
retry_delay = min( 2 * retry_delay, MAX_RETRY_DELAY ) | |
retry = retry + 1 | |
continue | |
else: | |
keep_trying = self._retry_prompt(url, repr(e)) | |
if keep_trying == True: | |
retry = 0 | |
continue | |
elif keep_trying == False: | |
return | |
raise | |
def get_json(self, *args, **kwargs): | |
"""Retrieve text from url. Return text as string or None if no data present """ | |
resp = self.safe_get(*args, **kwargs) | |
if resp is not None: | |
return resp.text | |
def login(self): | |
"""Logs in to instagram.""" | |
self.session.headers = {'user-agent': CHROME_WIN_UA} | |
self.rhx_gis = self.get_shared_data()['rhx_gis'] | |
req = self.session.get(BASE_URL) | |
self.session.headers.update({'X-CSRFToken': req.cookies['csrftoken']}) | |
login_data = {'username': self.login_user, 'password': self.login_pass} | |
login = self.session.post(LOGIN_URL, data=login_data, allow_redirects=True) | |
self.session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) | |
self.cookies = login.cookies | |
login_text = json.loads(login.text) | |
if login_text.get('authenticated') and login.status_code == 200: | |
self.logged_in = True | |
self.rhx_gis = self.get_shared_data()['rhx_gis'] | |
else: | |
self.logger.error('Login failed for ' + self.login_user) | |
if 'checkpoint_url' in login_text: | |
checkpoint_url = login_text.get('checkpoint_url') | |
self.logger.error('Please verify your account at ' + BASE_URL[0:-1] + checkpoint_url) | |
if self.interactive is True: | |
self.login_challenge(checkpoint_url) | |
elif 'errors' in login_text: | |
for count, error in enumerate(login_text['errors'].get('error')): | |
count += 1 | |
self.logger.debug('Session error %(count)s: "%(error)s"' % locals()) | |
else: | |
self.logger.error(json.dumps(login_text)) | |
def login_challenge(self, checkpoint_url): | |
self.session.headers.update({'Referer': BASE_URL}) | |
req = self.session.get(BASE_URL[:-1] + checkpoint_url) | |
self.session.headers.update({'X-CSRFToken': req.cookies['csrftoken'], 'X-Instagram-AJAX': '1'}) | |
self.session.headers.update({'Referer': BASE_URL[:-1] + checkpoint_url}) | |
mode = input('Choose a challenge mode (0 - SMS, 1 - Email): ') | |
challenge_data = {'choice': mode} | |
challenge = self.session.post(BASE_URL[:-1] + checkpoint_url, data=challenge_data, allow_redirects=True) | |
self.session.headers.update({'X-CSRFToken': challenge.cookies['csrftoken'], 'X-Instagram-AJAX': '1'}) | |
code = input('Enter code received: ') | |
code_data = {'security_code': code} | |
code = self.session.post(BASE_URL[:-1] + checkpoint_url, data=code_data, allow_redirects=True) | |
self.session.headers.update({'X-CSRFToken': code.cookies['csrftoken']}) | |
self.cookies = code.cookies | |
code_text = json.loads(code.text) | |
if code_text.get('status') == 'ok': | |
self.logged_in = True | |
elif 'errors' in code.text: | |
for count, error in enumerate(code_text['challenge']['errors']): | |
count += 1 | |
self.logger.error('Session error %(count)s: "%(error)s"' % locals()) | |
else: | |
self.logger.error(json.dumps(code_text)) | |
def logout(self): | |
"""Logs out of instagram.""" | |
if self.logged_in: | |
try: | |
logout_data = {'csrfmiddlewaretoken': self.cookies['csrftoken']} | |
self.session.post(LOGOUT_URL, data=logout_data) | |
self.logged_in = False | |
except requests.exceptions.RequestException: | |
self.logger.warning('Failed to log out ' + self.login_user) | |
def get_dst_dir(self, username): | |
"""Gets the destination directory and last scraped file time.""" | |
if self.destination == './': | |
dst = './' + username | |
else: | |
if self.retain_username: | |
dst = self.destination + '/' + username | |
else: | |
dst = self.destination | |
# Resolve last scraped filetime | |
if self.latest_stamps_parser: | |
self.last_scraped_filemtime = self.get_last_scraped_timestamp(username) | |
elif os.path.isdir(dst): | |
self.last_scraped_filemtime = self.get_last_scraped_filemtime(dst) | |
return dst | |
def make_dir(self, dst): | |
try: | |
os.makedirs(dst) | |
except OSError as err: | |
if err.errno == errno.EEXIST and os.path.isdir(dst): | |
# Directory already exists | |
pass | |
else: | |
# Target dir exists as a file, or a different error | |
raise | |
def get_last_scraped_timestamp(self, username): | |
if self.latest_stamps_parser: | |
try: | |
return self.latest_stamps_parser.getint(LATEST_STAMPS_USER_SECTION, username) | |
except configparser.Error: | |
pass | |
return 0 | |
def set_last_scraped_timestamp(self, username, timestamp): | |
if self.latest_stamps_parser: | |
if not self.latest_stamps_parser.has_section(LATEST_STAMPS_USER_SECTION): | |
self.latest_stamps_parser.add_section(LATEST_STAMPS_USER_SECTION) | |
self.latest_stamps_parser.set(LATEST_STAMPS_USER_SECTION, username, str(timestamp)) | |
with open(self.latest_stamps, 'w') as f: | |
self.latest_stamps_parser.write(f) | |
def get_last_scraped_filemtime(self, dst): | |
"""Stores the last modified time of newest file in a directory.""" | |
list_of_files = [] | |
file_types = ('*.jpg', '*.mp4') | |
for type in file_types: | |
list_of_files.extend(glob.glob(dst + '/' + type)) | |
if list_of_files: | |
latest_file = max(list_of_files, key=os.path.getmtime) | |
return int(os.path.getmtime(latest_file)) | |
return 0 | |
def query_comments_gen(self, shortcode, end_cursor=''): | |
"""Generator for comments.""" | |
comments, end_cursor = self.__query_comments(shortcode, end_cursor) | |
if comments: | |
try: | |
while True: | |
for item in comments: | |
yield item | |
if end_cursor: | |
comments, end_cursor = self.__query_comments(shortcode, end_cursor) | |
else: | |
return | |
except ValueError: | |
self.logger.exception('Failed to query comments for shortcode ' + shortcode) | |
def __query_comments(self, shortcode, end_cursor=''): | |
params = QUERY_COMMENTS_VARS.format(shortcode, end_cursor) | |
self.update_ig_gis_header(params) | |
resp = self.get_json(QUERY_COMMENTS.format(params)) | |
if resp is not None: | |
payload = json.loads(resp)['data']['shortcode_media'] | |
if payload: | |
container = payload['edge_media_to_comment'] | |
comments = [node['node'] for node in container['edges']] | |
end_cursor = container['page_info']['end_cursor'] | |
return comments, end_cursor | |
return None, None | |
def scrape_hashtag(self): | |
self.__scrape_query(self.query_hashtag_gen) | |
def scrape_location(self): | |
self.__scrape_query(self.query_location_gen) | |
def worker_wrapper(self, fn, *args, **kwargs): | |
try: | |
if self.quit: | |
return | |
return fn(*args, **kwargs) | |
except: | |
self.logger.debug("Exception in worker thread", exc_info=sys.exc_info()) | |
raise | |
def __scrape_query(self, media_generator, executor=concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT_DOWNLOADS)): | |
"""Scrapes the specified value for posted media.""" | |
self.quit = False | |
try: | |
for value in self.usernames: | |
self.posts = [] | |
self.last_scraped_filemtime = 0 | |
greatest_timestamp = 0 | |
future_to_item = {} | |
dst = self.get_dst_dir(value) | |
if self.include_location: | |
media_exec = concurrent.futures.ThreadPoolExecutor(max_workers=5) | |
iter = 0 | |
for item in tqdm.tqdm(media_generator(value), desc='Searching {0} for posts'.format(value), unit=" media", | |
disable=self.quiet): | |
if ((item['is_video'] is False and 'image' in self.media_types) or \ | |
(item['is_video'] is True and 'video' in self.media_types) | |
) and self.is_new_media(item): | |
future = executor.submit(self.worker_wrapper, self.download, item, dst) | |
future_to_item[future] = item | |
if self.include_location and 'location' not in item: | |
media_exec.submit(self.worker_wrapper, self.__get_location, item) | |
if self.comments: | |
item['edge_media_to_comment']['data'] = list(self.query_comments_gen(item['shortcode'])) | |
if self.media_metadata or self.comments or self.include_location: | |
self.posts.append(item) | |
iter = iter + 1 | |
if self.maximum != 0 and iter >= self.maximum: | |
break | |
if future_to_item: | |
for future in tqdm.tqdm(concurrent.futures.as_completed(future_to_item), | |
total=len(future_to_item), | |
desc='Downloading', disable=self.quiet): | |
item = future_to_item[future] | |
if future.exception() is not None: | |
self.logger.warning( | |
'Media for {0} at {1} generated an exception: {2}'.format(value, item['urls'], | |
future.exception())) | |
else: | |
timestamp = self.__get_timestamp(item) | |
if timestamp > greatest_timestamp: | |
greatest_timestamp = timestamp | |
# Even bother saving it? | |
if greatest_timestamp > self.last_scraped_filemtime: | |
self.set_last_scraped_timestamp(value, greatest_timestamp) | |
if (self.media_metadata or self.comments or self.include_location) and self.posts: | |
self.save_json(self.posts, '{0}/{1}.json'.format(dst, value)) | |
finally: | |
self.quit = True | |
def query_hashtag_gen(self, hashtag): | |
return self.__query_gen(QUERY_HASHTAG, QUERY_HASHTAG_VARS, 'hashtag', hashtag) | |
def query_location_gen(self, location): | |
return self.__query_gen(QUERY_LOCATION, QUERY_LOCATION_VARS, 'location', location) | |
def __query_gen(self, url, variables, entity_name, query, end_cursor=''): | |
"""Generator for hashtag and location.""" | |
nodes, end_cursor = self.__query(url, variables, entity_name, query, end_cursor) | |
if nodes: | |
try: | |
while True: | |
for node in nodes: | |
yield node | |
if end_cursor: | |
nodes, end_cursor = self.__query(url, variables, entity_name, query, end_cursor) | |
else: | |
return | |
except ValueError: | |
self.logger.exception('Failed to query ' + query) | |
def __query(self, url, variables, entity_name, query, end_cursor): | |
params = variables.format(query, end_cursor) | |
self.update_ig_gis_header(params) | |
resp = self.get_json(url.format(params)) | |
if resp is not None: | |
payload = json.loads(resp)['data'][entity_name] | |
if payload: | |
nodes = [] | |
if end_cursor == '': | |
top_posts = payload['edge_' + entity_name + '_to_top_posts'] | |
nodes.extend(self._get_nodes(top_posts)) | |
posts = payload['edge_' + entity_name + '_to_media'] | |
nodes.extend(self._get_nodes(posts)) | |
end_cursor = posts['page_info']['end_cursor'] | |
return nodes, end_cursor | |
return None, None | |
def _get_nodes(self, container): | |
return [self.augment_node(node['node']) for node in container['edges']] | |
def augment_node(self, node): | |
self.extract_tags(node) | |
details = None | |
if self.include_location and 'location' not in node: | |
details = self.__get_media_details(node['shortcode']) | |
node['location'] = details.get('location') if details else None | |
if 'urls' not in node: | |
node['urls'] = [] | |
if node['is_video'] and 'video_url' in node: | |
node['urls'] = [node['video_url']] | |
elif '__typename' in node and node['__typename'] == 'GraphImage': | |
node['urls'] = [self.get_original_image(node['display_url'])] | |
else: | |
if details is None: | |
details = self.__get_media_details(node['shortcode']) | |
if details: | |
if '__typename' in details and details['__typename'] == 'GraphVideo': | |
node['urls'] = [details['video_url']] | |
elif '__typename' in details and details['__typename'] == 'GraphSidecar': | |
urls = [] | |
for carousel_item in details['edge_sidecar_to_children']['edges']: | |
urls += self.augment_node(carousel_item['node'])['urls'] | |
node['urls'] = urls | |
else: | |
node['urls'] = [self.get_original_image(details['display_url'])] | |
return node | |
def __get_media_details(self, shortcode): | |
resp = self.get_json(VIEW_MEDIA_URL.format(shortcode)) | |
if resp is not None: | |
try: | |
return json.loads(resp)['graphql']['shortcode_media'] | |
except ValueError: | |
self.logger.warning('Failed to get media details for ' + shortcode) | |
else: | |
self.logger.warning('Failed to get media details for ' + shortcode) | |
def __get_location(self, item): | |
code = item.get('shortcode', item.get('code')) | |
if code: | |
details = self.__get_media_details(code) | |
item['location'] = details.get('location') | |
def scrape(self, executor=concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT_DOWNLOADS)): | |
"""Crawls through and downloads user's media""" | |
self.session.headers = {'user-agent': STORIES_UA} | |
try: | |
for username in self.usernames: | |
self.posts = [] | |
self.last_scraped_filemtime = 0 | |
greatest_timestamp = 0 | |
future_to_item = {} | |
dst = self.get_dst_dir(username) | |
# Get the user metadata. | |
shared_data = self.get_shared_data(username) | |
user = self.deep_get(shared_data, 'entry_data.ProfilePage[0].graphql.user') | |
if not user: | |
self.logger.error( | |
'Error getting user details for {0}. Please verify that the user exists.'.format(username)) | |
continue | |
elif user and user['is_private'] and user['edge_owner_to_timeline_media']['count'] > 0 and not \ | |
user['edge_owner_to_timeline_media']['edges']: | |
self.logger.error('User {0} is private'.format(username)) | |
self.rhx_gis = shared_data['rhx_gis'] | |
self.get_profile_pic(dst, executor, future_to_item, user, username) | |
self.get_stories(dst, executor, future_to_item, user, username) | |
# Crawls the media and sends it to the executor. | |
try: | |
self.get_media(dst, executor, future_to_item, user) | |
# Displays the progress bar of completed downloads. Might not even pop up if all media is downloaded while | |
# the above loop finishes. | |
if future_to_item: | |
for future in tqdm.tqdm(concurrent.futures.as_completed(future_to_item), total=len(future_to_item), | |
desc='Downloading', disable=self.quiet): | |
item = future_to_item[future] | |
if future.exception() is not None: | |
self.logger.error( | |
'Media at {0} generated an exception: {1}'.format(item['urls'], future.exception())) | |
else: | |
timestamp = self.__get_timestamp(item) | |
if timestamp > greatest_timestamp: | |
greatest_timestamp = timestamp | |
# Even bother saving it? | |
if greatest_timestamp > self.last_scraped_filemtime: | |
self.set_last_scraped_timestamp(username, greatest_timestamp) | |
if (self.media_metadata or self.comments or self.include_location) and self.posts: | |
self.save_json(self.posts, '{0}/{1}.json'.format(dst, username)) | |
except ValueError: | |
self.logger.error("Unable to scrape user - %s" % username) | |
finally: | |
self.quit = True | |
self.logout() | |
def get_profile_pic(self, dst, executor, future_to_item, user, username): | |
if 'image' not in self.media_types: | |
return | |
url = USER_INFO.format(user['id']) | |
resp = self.get_json(url) | |
if resp is None: | |
self.logger.error('Error getting user info for {0}'.format(username)) | |
return | |
user_info = json.loads(resp)['user'] | |
if user_info['has_anonymous_profile_picture']: | |
return | |
try: | |
profile_pic_urls = [ | |
user_info['hd_profile_pic_url_info']['url'], | |
user_info['hd_profile_pic_versions'][-1]['url'], | |
] | |
profile_pic_url = next(url for url in profile_pic_urls if url is not None) | |
except (KeyError, IndexError, StopIteration): | |
self.logger.warning('Failed to get high resolution profile picture for {0}'.format(username)) | |
profile_pic_url = user['profile_pic_url_hd'] | |
item = {'urls': [profile_pic_url], 'username': username, 'shortcode':'', 'created_time': 1286323200, '__typename': 'GraphProfilePic'} | |
if self.latest is False or os.path.isfile(dst + '/' + item['urls'][0].split('/')[-1]) is False: | |
for item in tqdm.tqdm([item], desc='Searching {0} for profile pic'.format(username), unit=" images", | |
ncols=0, disable=self.quiet): | |
future = executor.submit(self.worker_wrapper, self.download, item, dst) | |
future_to_item[future] = item | |
def get_stories(self, dst, executor, future_to_item, user, username): | |
"""Scrapes the user's stories.""" | |
if self.logged_in and \ | |
('story-image' in self.media_types or 'story-video' in self.media_types): | |
# Get the user's stories. | |
stories = self.fetch_stories(user['id']) | |
# Downloads the user's stories and sends it to the executor. | |
iter = 0 | |
for item in tqdm.tqdm(stories, desc='Searching {0} for stories'.format(username), unit=" media", | |
disable=self.quiet): | |
if self.story_has_selected_media_types(item) and self.is_new_media(item): | |
item['username'] = username | |
item['shortcode'] = '' | |
future = executor.submit(self.worker_wrapper, self.download, item, dst) | |
future_to_item[future] = item | |
iter = iter + 1 | |
if self.maximum != 0 and iter >= self.maximum: | |
break | |
def get_media(self, dst, executor, future_to_item, user): | |
"""Scrapes the user's posts for media.""" | |
if 'image' not in self.media_types and 'video' not in self.media_types and 'none' not in self.media_types: | |
return | |
username = user['username'] | |
if self.include_location: | |
media_exec = concurrent.futures.ThreadPoolExecutor(max_workers=5) | |
iter = 0 | |
for item in tqdm.tqdm(self.query_media_gen(user), desc='Searching {0} for posts'.format(username), | |
unit=' media', disable=self.quiet): | |
# -Filter command line | |
if self.filter: | |
if 'tags' in item: | |
filtered = any(x in item['tags'] for x in self.filter) | |
if self.has_selected_media_types(item) and self.is_new_media(item) and filtered: | |
item['username']=username | |
future = executor.submit(self.worker_wrapper, self.download, item, dst) | |
future_to_item[future] = item | |
else: | |
# For when filter is on but media doesnt contain tags | |
pass | |
# --------------# | |
else: | |
if self.has_selected_media_types(item) and self.is_new_media(item): | |
item['username']=username | |
future = executor.submit(self.worker_wrapper, self.download, item, dst) | |
future_to_item[future] = item | |
if self.include_location: | |
item['username']=username | |
media_exec.submit(self.worker_wrapper, self.__get_location, item) | |
if self.comments: | |
item['username']=username | |
item['comments'] = {'data': list(self.query_comments_gen(item['shortcode']))} | |
if self.media_metadata or self.comments or self.include_location: | |
item['username']=username | |
self.posts.append(item) | |
iter = iter + 1 | |
if self.maximum != 0 and iter >= self.maximum: | |
break | |
def get_shared_data(self, username=''): | |
"""Fetches the user's metadata.""" | |
resp = self.get_json(BASE_URL + username) | |
if resp is not None and '_sharedData' in resp: | |
try: | |
shared_data = resp.split("window._sharedData = ")[1].split(";</script>")[0] | |
return json.loads(shared_data) | |
except (TypeError, KeyError, IndexError): | |
pass | |
def fetch_stories(self, user_id): | |
"""Fetches the user's stories.""" | |
resp = self.get_json(STORIES_URL.format(user_id)) | |
if resp is not None: | |
retval = json.loads(resp) | |
if retval['data'] and 'reels_media' in retval['data'] and len(retval['data']['reels_media']) > 0 and len(retval['data']['reels_media'][0]['items']) > 0: | |
return [self.set_story_url(item) for item in retval['data']['reels_media'][0]['items']] | |
return [] | |
def query_media_gen(self, user, end_cursor=''): | |
"""Generator for media.""" | |
media, end_cursor = self.__query_media(user['id'], end_cursor) | |
if media: | |
try: | |
while True: | |
for item in media: | |
if not self.is_new_media(item): | |
return | |
yield item | |
if end_cursor: | |
media, end_cursor = self.__query_media(user['id'], end_cursor) | |
else: | |
return | |
except ValueError: | |
self.logger.exception('Failed to query media for user ' + user['username']) | |
def __query_media(self, id, end_cursor=''): | |
params = QUERY_MEDIA_VARS.format(id, end_cursor) | |
self.update_ig_gis_header(params) | |
resp = self.get_json(QUERY_MEDIA.format(params)) | |
if resp is not None: | |
payload = json.loads(resp)['data']['user'] | |
if payload: | |
container = payload['edge_owner_to_timeline_media'] | |
nodes = self._get_nodes(container) | |
end_cursor = container['page_info']['end_cursor'] | |
return nodes, end_cursor | |
return None, None | |
def get_ig_gis(self, rhx_gis, params): | |
data = rhx_gis + ":" + params | |
if sys.version_info.major >= 3: | |
return hashlib.md5(data.encode('utf-8')).hexdigest() | |
else: | |
return hashlib.md5(data).hexdigest() | |
def update_ig_gis_header(self, params): | |
self.session.headers.update({ | |
'x-instagram-gis': self.get_ig_gis( | |
self.rhx_gis, | |
params | |
) | |
}) | |
def has_selected_media_types(self, item): | |
filetypes = {'jpg': 0, 'mp4': 0} | |
for url in item['urls']: | |
ext = self.__get_file_ext(url) | |
if ext not in filetypes: | |
filetypes[ext] = 0 | |
filetypes[ext] += 1 | |
if ('image' in self.media_types and filetypes['jpg'] > 0) or \ | |
('video' in self.media_types and filetypes['mp4'] > 0): | |
return True | |
return False | |
def story_has_selected_media_types(self, item): | |
# media_type 1 is image, 2 is video | |
if item['__typename'] == 'GraphStoryImage' and 'story-image' in self.media_types: | |
return True | |
if item['__typename'] == 'GraphStoryVideo' and 'story-video' in self.media_types: | |
return True | |
return False | |
def extract_tags(self, item): | |
"""Extracts the hashtags from the caption text.""" | |
caption_text = '' | |
if 'caption' in item and item['caption']: | |
if isinstance(item['caption'], dict): | |
caption_text = item['caption']['text'] | |
else: | |
caption_text = item['caption'] | |
elif 'edge_media_to_caption' in item and item['edge_media_to_caption'] and item['edge_media_to_caption'][ | |
'edges']: | |
caption_text = item['edge_media_to_caption']['edges'][0]['node']['text'] | |
if caption_text: | |
# include words and emojis | |
item['tags'] = re.findall( | |
r"(?<!&)#(\w+|(?:[\xA9\xAE\u203C\u2049\u2122\u2139\u2194-\u2199\u21A9\u21AA\u231A\u231B\u2328\u2388\u23CF\u23E9-\u23F3\u23F8-\u23FA\u24C2\u25AA\u25AB\u25B6\u25C0\u25FB-\u25FE\u2600-\u2604\u260E\u2611\u2614\u2615\u2618\u261D\u2620\u2622\u2623\u2626\u262A\u262E\u262F\u2638-\u263A\u2648-\u2653\u2660\u2663\u2665\u2666\u2668\u267B\u267F\u2692-\u2694\u2696\u2697\u2699\u269B\u269C\u26A0\u26A1\u26AA\u26AB\u26B0\u26B1\u26BD\u26BE\u26C4\u26C5\u26C8\u26CE\u26CF\u26D1\u26D3\u26D4\u26E9\u26EA\u26F0-\u26F5\u26F7-\u26FA\u26FD\u2702\u2705\u2708-\u270D\u270F\u2712\u2714\u2716\u271D\u2721\u2728\u2733\u2734\u2744\u2747\u274C\u274E\u2753-\u2755\u2757\u2763\u2764\u2795-\u2797\u27A1\u27B0\u27BF\u2934\u2935\u2B05-\u2B07\u2B1B\u2B1C\u2B50\u2B55\u3030\u303D\u3297\u3299]|\uD83C[\uDC04\uDCCF\uDD70\uDD71\uDD7E\uDD7F\uDD8E\uDD91-\uDD9A\uDE01\uDE02\uDE1A\uDE2F\uDE32-\uDE3A\uDE50\uDE51\uDF00-\uDF21\uDF24-\uDF93\uDF96\uDF97\uDF99-\uDF9B\uDF9E-\uDFF0\uDFF3-\uDFF5\uDFF7-\uDFFF]|\uD83D[\uDC00-\uDCFD\uDCFF-\uDD3D\uDD49-\uDD4E\uDD50-\uDD67\uDD6F\uDD70\uDD73-\uDD79\uDD87\uDD8A-\uDD8D\uDD90\uDD95\uDD96\uDDA5\uDDA8\uDDB1\uDDB2\uDDBC\uDDC2-\uDDC4\uDDD1-\uDDD3\uDDDC-\uDDDE\uDDE1\uDDE3\uDDEF\uDDF3\uDDFA-\uDE4F\uDE80-\uDEC5\uDECB-\uDED0\uDEE0-\uDEE5\uDEE9\uDEEB\uDEEC\uDEF0\uDEF3]|\uD83E[\uDD10-\uDD18\uDD80-\uDD84\uDDC0]|(?:0\u20E3|1\u20E3|2\u20E3|3\u20E3|4\u20E3|5\u20E3|6\u20E3|7\u20E3|8\u20E3|9\u20E3|#\u20E3|\\*\u20E3|\uD83C(?:\uDDE6\uD83C(?:\uDDEB|\uDDFD|\uDDF1|\uDDF8|\uDDE9|\uDDF4|\uDDEE|\uDDF6|\uDDEC|\uDDF7|\uDDF2|\uDDFC|\uDDE8|\uDDFA|\uDDF9|\uDDFF|\uDDEA)|\uDDE7\uD83C(?:\uDDF8|\uDDED|\uDDE9|\uDDE7|\uDDFE|\uDDEA|\uDDFF|\uDDEF|\uDDF2|\uDDF9|\uDDF4|\uDDE6|\uDDFC|\uDDFB|\uDDF7|\uDDF3|\uDDEC|\uDDEB|\uDDEE|\uDDF6|\uDDF1)|\uDDE8\uD83C(?:\uDDF2|\uDDE6|\uDDFB|\uDDEB|\uDDF1|\uDDF3|\uDDFD|\uDDF5|\uDDE8|\uDDF4|\uDDEC|\uDDE9|\uDDF0|\uDDF7|\uDDEE|\uDDFA|\uDDFC|\uDDFE|\uDDFF|\uDDED)|\uDDE9\uD83C(?:\uDDFF|\uDDF0|\uDDEC|\uDDEF|\uDDF2|\uDDF4|\uDDEA)|\uDDEA\uD83C(?:\uDDE6|\uDDE8|\uDDEC|\uDDF7|\uDDEA|\uDDF9|\uDDFA|\uDDF8|\uDDED)|\uDDEB\uD83C(?:\uDDF0|\uDDF4|\uDDEF|\uDDEE|\uDDF7|\uDDF2)|\uDDEC\uD83C(?:\uDDF6|\uDDEB|\uDDE6|\uDDF2|\uDDEA|\uDDED|\uDDEE|\uDDF7|\uDDF1|\uDDE9|\uDDF5|\uDDFA|\uDDF9|\uDDEC|\uDDF3|\uDDFC|\uDDFE|\uDDF8|\uDDE7)|\uDDED\uD83C(?:\uDDF7|\uDDF9|\uDDF2|\uDDF3|\uDDF0|\uDDFA)|\uDDEE\uD83C(?:\uDDF4|\uDDE8|\uDDF8|\uDDF3|\uDDE9|\uDDF7|\uDDF6|\uDDEA|\uDDF2|\uDDF1|\uDDF9)|\uDDEF\uD83C(?:\uDDF2|\uDDF5|\uDDEA|\uDDF4)|\uDDF0\uD83C(?:\uDDED|\uDDFE|\uDDF2|\uDDFF|\uDDEA|\uDDEE|\uDDFC|\uDDEC|\uDDF5|\uDDF7|\uDDF3)|\uDDF1\uD83C(?:\uDDE6|\uDDFB|\uDDE7|\uDDF8|\uDDF7|\uDDFE|\uDDEE|\uDDF9|\uDDFA|\uDDF0|\uDDE8)|\uDDF2\uD83C(?:\uDDF4|\uDDF0|\uDDEC|\uDDFC|\uDDFE|\uDDFB|\uDDF1|\uDDF9|\uDDED|\uDDF6|\uDDF7|\uDDFA|\uDDFD|\uDDE9|\uDDE8|\uDDF3|\uDDEA|\uDDF8|\uDDE6|\uDDFF|\uDDF2|\uDDF5|\uDDEB)|\uDDF3\uD83C(?:\uDDE6|\uDDF7|\uDDF5|\uDDF1|\uDDE8|\uDDFF|\uDDEE|\uDDEA|\uDDEC|\uDDFA|\uDDEB|\uDDF4)|\uDDF4\uD83C\uDDF2|\uDDF5\uD83C(?:\uDDEB|\uDDF0|\uDDFC|\uDDF8|\uDDE6|\uDDEC|\uDDFE|\uDDEA|\uDDED|\uDDF3|\uDDF1|\uDDF9|\uDDF7|\uDDF2)|\uDDF6\uD83C\uDDE6|\uDDF7\uD83C(?:\uDDEA|\uDDF4|\uDDFA|\uDDFC|\uDDF8)|\uDDF8\uD83C(?:\uDDFB|\uDDF2|\uDDF9|\uDDE6|\uDDF3|\uDDE8|\uDDF1|\uDDEC|\uDDFD|\uDDF0|\uDDEE|\uDDE7|\uDDF4|\uDDF8|\uDDED|\uDDE9|\uDDF7|\uDDEF|\uDDFF|\uDDEA|\uDDFE)|\uDDF9\uD83C(?:\uDDE9|\uDDEB|\uDDFC|\uDDEF|\uDDFF|\uDDED|\uDDF1|\uDDEC|\uDDF0|\uDDF4|\uDDF9|\uDDE6|\uDDF3|\uDDF7|\uDDF2|\uDDE8|\uDDFB)|\uDDFA\uD83C(?:\uDDEC|\uDDE6|\uDDF8|\uDDFE|\uDDF2|\uDDFF)|\uDDFB\uD83C(?:\uDDEC|\uDDE8|\uDDEE|\uDDFA|\uDDE6|\uDDEA|\uDDF3)|\uDDFC\uD83C(?:\uDDF8|\uDDEB)|\uDDFD\uD83C\uDDF0|\uDDFE\uD83C(?:\uDDF9|\uDDEA)|\uDDFF\uD83C(?:\uDDE6|\uDDF2|\uDDFC))))[\ufe00-\ufe0f\u200d]?)+", | |
caption_text, re.UNICODE) | |
item['tags'] = list(set(item['tags'])) | |
return item | |
def get_original_image(self, url): | |
"""Gets the full-size image from the specified url.""" | |
# these path parts somehow prevent us from changing the rest of media url | |
#url = re.sub(r'/vp/[0-9A-Fa-f]{32}/[0-9A-Fa-f]{8}/', '/', url) | |
# remove dimensions to get largest image | |
#url = re.sub(r'/[sp]\d{3,}x\d{3,}/', '/', url) | |
# get non-square image if one exists | |
#url = re.sub(r'/c\d{1,}.\d{1,}.\d{1,}.\d{1,}/', '/', url) | |
return url | |
def set_story_url(self, item): | |
"""Sets the story url.""" | |
urls = [] | |
if 'video_resources' in item: | |
urls.append(item['video_resources'][-1]['src']) | |
if 'display_resources' in item: | |
urls.append(item['display_resources'][-1]['src'].split('?')[0]) | |
item['urls'] = urls | |
return item | |
def download(self, item, save_dir='./'): | |
"""Downloads the media file.""" | |
for full_url, base_name in self.templatefilename(item): | |
url = full_url.split('?')[0] #try the static url first, stripping parameters | |
file_path = os.path.join(save_dir, base_name) | |
if not os.path.exists(os.path.dirname(file_path)): | |
self.make_dir(os.path.dirname(file_path)) | |
if not os.path.isfile(file_path): | |
headers = {'Host': urlparse(url).hostname} | |
part_file = file_path + '.part' | |
downloaded = 0 | |
total_length = None | |
with open(part_file, 'wb') as media_file: | |
try: | |
retry = 0 | |
retry_delay = RETRY_DELAY | |
while(True): | |
if self.quit: | |
return | |
try: | |
downloaded_before = downloaded | |
headers['Range'] = 'bytes={0}-'.format(downloaded_before) | |
with self.session.get(url, cookies=self.cookies, headers=headers, stream=True, timeout=CONNECT_TIMEOUT) as response: | |
if response.status_code == 404: | |
#instagram don't lie on this | |
break | |
if response.status_code == 403 and url != full_url: | |
#see issue #254 | |
url = full_url | |
continue | |
response.raise_for_status() | |
if response.status_code == 206: | |
try: | |
match = re.match(r'bytes (?P<first>\d+)-(?P<last>\d+)/(?P<size>\d+)', response.headers['Content-Range']) | |
range_file_position = int(match.group('first')) | |
if range_file_position != downloaded_before: | |
raise Exception() | |
total_length = int(match.group('size')) | |
media_file.truncate(total_length) | |
except: | |
raise requests.exceptions.InvalidHeader('Invalid range response "{0}" for requested "{1}"'.format( | |
response.headers.get('Content-Range'), headers.get('Range'))) | |
elif response.status_code == 200: | |
if downloaded_before != 0: | |
downloaded_before = 0 | |
downloaded = 0 | |
media_file.seek(0) | |
content_length = response.headers.get('Content-Length') | |
if content_length is None: | |
self.logger.warning('No Content-Length in response, the file {0} may be partially downloaded'.format(base_name)) | |
else: | |
total_length = int(content_length) | |
media_file.truncate(total_length) | |
else: | |
raise PartialContentException('Wrong status code {0}', response.status_code) | |
for chunk in response.iter_content(chunk_size=64*1024): | |
if chunk: | |
downloaded += len(chunk) | |
media_file.write(chunk) | |
if self.quit: | |
return | |
if downloaded != total_length and total_length is not None: | |
raise PartialContentException('Got first {0} bytes from {1}'.format(downloaded, total_length)) | |
break | |
# In case of exception part_file is not removed on purpose, | |
# it is easier to exemine it later when analising logs. | |
# Please do not add os.remove here. | |
except (KeyboardInterrupt): | |
raise | |
except (requests.exceptions.RequestException, PartialContentException) as e: | |
if downloaded - downloaded_before > 0: | |
# if we got some data on this iteration do not count it as a failure | |
self.logger.warning('Continue after exception {0} on {1}'.format(repr(e), url)) | |
retry = 0 # the next fail will be first in a row with no data | |
continue | |
if retry < MAX_RETRIES: | |
self.logger.warning('Retry after exception {0} on {1}'.format(repr(e), url)) | |
self.sleep(retry_delay) | |
retry_delay = min( 2 * retry_delay, MAX_RETRY_DELAY ) | |
retry = retry + 1 | |
continue | |
else: | |
keep_trying = self._retry_prompt(url, repr(e)) | |
if keep_trying == True: | |
retry = 0 | |
continue | |
elif keep_trying == False: | |
break | |
raise | |
finally: | |
media_file.truncate(downloaded) | |
if downloaded == total_length or total_length is None: | |
os.rename(part_file, file_path) | |
timestamp = self.__get_timestamp(item) | |
file_time = int(timestamp if timestamp else time.time()) | |
os.utime(file_path, (file_time, file_time)) | |
def templatefilename(self, item): | |
for url in item['urls']: | |
filename, extension = os.path.splitext(os.path.split(url.split('?')[0])[1]) | |
try: | |
template = self.template | |
template_values = { | |
'username' : item['username'], | |
'urlname': filename, | |
'shortcode': str(item['shortcode']), | |
'mediatype' : item['__typename'][5:], | |
'datetime': time.strftime('%Y%m%d %Hh%Mm%Ss', | |
time.localtime(self.__get_timestamp(item))), | |
'date': time.strftime('%Y%m%d', time.localtime(self.__get_timestamp(item))), | |
'year': time.strftime('%Y', time.localtime(self.__get_timestamp(item))), | |
'month': time.strftime('%m', time.localtime(self.__get_timestamp(item))), | |
'day': time.strftime('%d', time.localtime(self.__get_timestamp(item))), | |
'h': time.strftime('%Hh', time.localtime(self.__get_timestamp(item))), | |
'm': time.strftime('%Mm', time.localtime(self.__get_timestamp(item))), | |
's': time.strftime('%Ss', time.localtime(self.__get_timestamp(item)))} | |
customfilename = str(template.format(**template_values) + extension) | |
yield url, customfilename | |
except KeyError: | |
customfilename = str(filename + extension) | |
yield url, customfilename | |
def is_new_media(self, item): | |
"""Returns True if the media is new.""" | |
if self.latest is False or self.last_scraped_filemtime == 0: | |
return True | |
current_timestamp = self.__get_timestamp(item) | |
return current_timestamp > 0 and current_timestamp > self.last_scraped_filemtime | |
@staticmethod | |
def __get_timestamp(item): | |
if item: | |
for key in ['taken_at_timestamp', 'created_time', 'taken_at', 'date']: | |
found = item.get(key, 0) | |
try: | |
found = int(found) | |
if found > 1: # >1 to ignore any boolean casts | |
return found | |
except ValueError: | |
pass | |
return 0 | |
@staticmethod | |
def __get_file_ext(url): | |
return os.path.splitext(urlparse(url).path)[1][1:].strip().lower() | |
@staticmethod | |
def __search(query): | |
resp = requests.get(SEARCH_URL.format(query)) | |
return json.loads(resp.text) | |
def search_locations(self): | |
query = ' '.join(self.usernames) | |
result = self.__search(query) | |
if len(result['places']) == 0: | |
raise ValueError("No locations found for query '{0}'".format(query)) | |
sorted_places = sorted(result['places'], key=itemgetter('position')) | |
for item in sorted_places[0:5]: | |
place = item['place'] | |
print('location-id: {0}, title: {1}, subtitle: {2}, city: {3}, lat: {4}, lng: {5}'.format( | |
place['location']['pk'], | |
place['title'], | |
place['subtitle'], | |
place['location']['city'], | |
place['location']['lat'], | |
place['location']['lng'] | |
)) | |
@staticmethod | |
def save_json(data, dst='./'): | |
"""Saves the data to a json file.""" | |
if data: | |
with open(dst, 'wb') as f: | |
json.dump(data, codecs.getwriter('utf-8')(f), indent=4, sort_keys=True, ensure_ascii=False) | |
@staticmethod | |
def get_logger(level=logging.DEBUG, verbose=0): | |
"""Returns a logger.""" | |
logger = logging.getLogger(__name__) | |
fh = logging.FileHandler('instagram-scraper.log', 'w') | |
fh.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
fh.setLevel(level) | |
logger.addHandler(fh) | |
sh = logging.StreamHandler(sys.stdout) | |
sh.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) | |
sh_lvls = [logging.ERROR, logging.WARNING, logging.INFO] | |
sh.setLevel(sh_lvls[verbose]) | |
logger.addHandler(sh) | |
logger.setLevel(level) | |
return logger | |
@staticmethod | |
def parse_file_usernames(usernames_file): | |
"""Parses a file containing a list of usernames.""" | |
users = [] | |
try: | |
with open(usernames_file) as user_file: | |
for line in user_file.readlines(): | |
# Find all usernames delimited by ,; or whitespace | |
users += re.findall(r'[^,;\s]+', line.split("#")[0]) | |
except IOError as err: | |
raise ValueError('File not found ' + err) | |
return users | |
@staticmethod | |
def parse_delimited_str(input): | |
"""Parse the string input as a list of delimited tokens.""" | |
return re.findall(r'[^,;\s]+', input) | |
def deep_get(self, dict, path): | |
def _split_indexes(key): | |
split_array_index = re.compile(r'[.\[\]]+') # ['foo', '0'] | |
return filter(None, split_array_index.split(key)) | |
ends_with_index = re.compile(r'\[(.*?)\]$') # foo[0] | |
keylist = path.split('.') | |
val = dict | |
for key in keylist: | |
try: | |
if ends_with_index.search(key): | |
for prop in _split_indexes(key): | |
if prop.isdigit(): | |
val = val[int(prop)] | |
else: | |
val = val[prop] | |
else: | |
val = val[key] | |
except (KeyError, IndexError, TypeError): | |
return None | |
return val | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="instagram-scraper scrapes and downloads an instagram user's photos and videos.", | |
epilog=textwrap.dedent(""" | |
You can hide your credentials from the history, by reading your | |
username from a local file: | |
$ instagram-scraper @insta_args.txt user_to_scrape | |
with insta_args.txt looking like this: | |
-u=my_username | |
-p=my_password | |
You can add all arguments you want to that file, just remember to have | |
one argument per line. | |
Customize filename: | |
by adding option --template or -T | |
Default is: {urlname} | |
And there are some option: | |
{username}: Instagram user(s) to scrape. | |
{shortcode}: post shortcode, but profile_pic and story are none. | |
{urlname}: filename form url. | |
{mediatype}: type of media. | |
{datetime}: date and time that photo/video post on, | |
format is: 20180101 01h01m01s | |
{date}: date that photo/video post on, | |
format is: 20180101 | |
{year}: format is: 2018 | |
{month}: format is: 01-12 | |
{day}: format is: 01-31 | |
{h}: hour, format is: 00-23h | |
{m}: minute, format is 00-59m | |
{s}: second, format is 00-59s | |
"""), | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
fromfile_prefix_chars='@') | |
parser.add_argument('username', help='Instagram user(s) to scrape', nargs='*') | |
parser.add_argument('--destination', '-d', default='./', help='Download destination') | |
parser.add_argument('--login-user', '--login_user', '-u', default=None, help='Instagram login user', required=True) | |
parser.add_argument('--login-pass', '--login_pass', '-p', default=None, help='Instagram login password', required=True) | |
parser.add_argument('--filename', '-f', help='Path to a file containing a list of users to scrape') | |
parser.add_argument('--quiet', '-q', default=False, action='store_true', help='Be quiet while scraping') | |
parser.add_argument('--maximum', '-m', type=int, default=0, help='Maximum number of items to scrape') | |
parser.add_argument('--retain-username', '--retain_username', '-n', action='store_true', default=False, | |
help='Creates username subdirectory when destination flag is set') | |
parser.add_argument('--media-metadata', '--media_metadata', action='store_true', default=False, | |
help='Save media metadata to json file') | |
parser.add_argument('--include-location', '--include_location', action='store_true', default=False, | |
help='Include location data when saving media metadata') | |
parser.add_argument('--media-types', '--media_types', '-t', nargs='+', default=['image', 'video', 'story'], | |
help='Specify media types to scrape') | |
parser.add_argument('--latest', action='store_true', default=False, help='Scrape new media since the last scrape') | |
parser.add_argument('--latest-stamps', '--latest_stamps', default=None, | |
help='Scrape new media since timestamps by user in specified file') | |
parser.add_argument('--tag', action='store_true', default=False, help='Scrape media using a hashtag') | |
parser.add_argument('--filter', default=None, help='Filter by tags in user posts', nargs='*') | |
parser.add_argument('--location', action='store_true', default=False, help='Scrape media using a location-id') | |
parser.add_argument('--search-location', action='store_true', default=False, help='Search for locations by name') | |
parser.add_argument('--comments', action='store_true', default=False, help='Save post comments to json file') | |
parser.add_argument('--interactive', '-i', action='store_true', default=False, | |
help='Enable interactive login challenge solving') | |
parser.add_argument('--retry-forever', action='store_true', default=False, | |
help='Retry download attempts endlessly when errors are received') | |
parser.add_argument('--verbose', '-v', type=int, default=0, help='Logging verbosity level') | |
parser.add_argument('--template', '-T', type=str, default='{urlname}', help='Customize filename template') | |
args = parser.parse_args() | |
if (args.login_user and args.login_pass is None) or (args.login_user is None and args.login_pass): | |
parser.print_help() | |
raise ValueError('Must provide login user AND password') | |
if not args.username and args.filename is None: | |
parser.print_help() | |
raise ValueError('Must provide username(s) OR a file containing a list of username(s)') | |
elif args.username and args.filename: | |
parser.print_help() | |
raise ValueError('Must provide only one of the following: username(s) OR a filename containing username(s)') | |
if args.tag and args.location: | |
parser.print_help() | |
raise ValueError('Must provide only one of the following: hashtag OR location') | |
if args.tag and args.filter: | |
parser.print_help() | |
raise ValueError('Filters apply to user posts') | |
if args.filename: | |
args.usernames = InstagramScraper.parse_file_usernames(args.filename) | |
else: | |
args.usernames = InstagramScraper.parse_delimited_str(','.join(args.username)) | |
if args.media_types and len(args.media_types) == 1 and re.compile(r'[,;\s]+').findall(args.media_types[0]): | |
args.media_types = InstagramScraper.parse_delimited_str(args.media_types[0]) | |
if args.retry_forever: | |
global MAX_RETRIES | |
MAX_RETRIES = sys.maxsize | |
scraper = InstagramScraper(**vars(args)) | |
scraper.login() | |
if args.tag: | |
scraper.scrape_hashtag() | |
elif args.location: | |
scraper.scrape_location() | |
elif args.search_location: | |
scraper.search_locations() | |
else: | |
scraper.scrape() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment