Skip to content

Instantly share code, notes, and snippets.

@uluQulu
Created August 23, 2018 01:05
Show Gist options
  • Save uluQulu/75d5d114cdbdac05ce7ceeee3061bbbc to your computer and use it in GitHub Desktop.
Save uluQulu/75d5d114cdbdac05ce7ceeee3061bbbc to your computer and use it in GitHub Desktop.
Uncertain state handling for @tompicca
"""Module which handles the follow features like unfollowing and following"""
import time
import os
import random
import json
import csv
import sqlite3
from datetime import datetime, timedelta
from math import ceil
from selenium.common.exceptions import WebDriverException
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
from .time_util import sleep
from .util import delete_line_from_file
from .util import scroll_bottom
from .util import format_number
from .util import update_activity
from .util import add_user_to_blacklist
from .util import click_element
from .util import web_adress_navigator
from .util import interruption_handler
from .util import get_relationship_counts
from .util import emergency_exit
from .print_log_writer import log_followed_pool
from .print_log_writer import log_uncertain_unfollowed_pool
from .print_log_writer import log_record_all_unfollowed
from .relationship_tools import get_followers
from .relationship_tools import get_following
from .relationship_tools import get_nonfollowers
from .database_engine import get_database
def set_automated_followed_pool(username, unfollow_after, logger, logfolder, pool='followedPool'):
automatedFollowedPool = {"all":[], "eligible":[]}
try:
with open('{0}{1}_{2}.csv'.format(logfolder, username, pool), 'r+') as followedPoolFile:
reader = csv.reader(followedPoolFile)
for row in reader:
if unfollow_after is not None:
try:
ftime = datetime.strptime(row[0].split(' ~ ')[0], '%Y-%m-%d %H:%M')
ftimestamp = (ftime - datetime(1970, 1, 1)).total_seconds()
realtimestamp = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
fword = row[0].split(' ~ ')[1]
if realtimestamp - ftimestamp > unfollow_after:
automatedFollowedPool["eligible"].append(fword)
automatedFollowedPool["all"].append(fword)
except ValueError:
fword = row[0]
automatedFollowedPool["all"].append(fword)
automatedFollowedPool["eligible"].append(fword)
else:
try:
fword = row[0].split(' ~ ')[1]
except IndexError:
fword = row[0]
automatedFollowedPool["all"].append(fword)
automatedFollowedPool["eligible"].append(fword)
followedPoolFile.close()
except BaseException as e:
logger.error("set_automated_followed_pool error {}".format(str(e)))
return automatedFollowedPool
def get_following_status(browser, person, logger):
following = False
follow_button = None
try:
follow_button = browser.find_element_by_xpath(
"//*[contains(text(), 'Follow')]")
if follow_button.text == 'Following':
following = "Following"
else:
if follow_button.text in ['Follow', 'Follow Back']:
following = False
else:
follow_button = browser.find_element_by_xpath(
"//*[contains(text(), 'Requested')]")
if follow_button.text == "Requested":
following = "Requested"
except:
logger.error(
'--> Unfollow error with {},'
' maybe no longer exists...'
.format(person.encode('utf-8')))
following = None
return following, follow_button
def unfollow(browser,
username,
amount,
customList,
InstapyFollowed,
nonFollowers,
allFollowing,
style,
automatedFollowedPool,
relationship_data,
dont_include,
white_list,
sleep_delay,
logger,
logfolder):
""" Unfollows the given amount of users"""
if (customList is not None and
type(customList) in [tuple, list] and
len(customList) == 3 and
customList[0] == True and
type(customList[1]) in [list, tuple, set] and
len(customList[1]) > 0 and
customList[2] in ["all", "nonfollowers"]):
customList_data = customList[1]
if type(customList_data) != list:
customList_data = list(customList_data)
unfollow_track = customList[2]
customList = True
else:
customList = False
if (InstapyFollowed is not None and
type(InstapyFollowed) in [tuple, list] and
len(InstapyFollowed) == 2 and
InstapyFollowed[0] == True and
InstapyFollowed[1] in ["all", "nonfollowers"]):
unfollow_track = InstapyFollowed[1]
InstapyFollowed = True
else:
InstapyFollowed = False
unfollowNum = 0
user_link ='https://www.instagram.com/{}/'.format(username)
#Check URL of the webpage, if it already is the one to be navigated, then do not navigate to it again
web_adress_navigator(browser, user_link)
#check how many poeple we are following
allfollowers, allfollowing = get_relationship_counts(browser, username, logger)
if allfollowing is None:
logger.warning("Unable to find the count of users followed ~leaving unfollow feature")
return 0
elif allfollowing == 0:
logger.warning("There are 0 people to unfollow ~leaving unfollow feature")
return 0
if amount > allfollowing:
logger.info("There are less users to unfollow than you have requested: "
"{}/{} ~using available amount\n".format(allfollowing, amount))
amount = allfollowing
if (customList == True or
InstapyFollowed == True or
nonFollowers == True):
if customList == True:
logger.info("Unfollowing from the list of pre-defined usernames\n")
unfollow_list = customList_data
elif InstapyFollowed == True:
logger.info("Unfollowing the users followed by InstaPy\n")
unfollow_list = automatedFollowedPool["eligible"]
elif nonFollowers == True:
logger.info("Unfollowing the users who do not follow back\n")
""" Unfollow only the users who do not follow you back """
unfollow_list = get_nonfollowers(browser,
username,
relationship_data,
False,
True,
logger,
logfolder)
#pick only the users in the right track- ["all" or "nonfollowers"] for `customList` and `InstapyFollowed` technics
if customList == True or InstapyFollowed == True:
if unfollow_track == "nonfollowers":
all_followers = get_followers(browser, username, "full", relationship_data, False, True, logger, logfolder)
loyal_users = [user for user in unfollow_list if user in all_followers]
logger.info("Found {} loyal followers! ~will not unfollow them".format(len(loyal_users)))
unfollow_list = [user for user in unfollow_list if user not in loyal_users]
elif unfollow_track != "all":
logger.info("Unfollow track is not specified! ~choose \"all\" or \"nonfollowers\"")
return 0
#re-generate unfollow list according to the `unfollow_after` parameter for `customList` and `nonFollowers` technics
if customList == True or nonFollowers == True:
not_found = []
non_eligible = []
for person in unfollow_list:
if person not in automatedFollowedPool["all"]:
not_found.append(person)
elif (person in automatedFollowedPool["all"] and
person not in automatedFollowedPool["eligible"]):
non_eligible.append(person)
unfollow_list = [user for user in unfollow_list if user not in non_eligible]
logger.info("Total {} users available to unfollow"
" ~not found in 'followedPool.csv': {} | didn't pass `unfollow_after`: {}\n".format(
len(unfollow_list), len(not_found), len(non_eligible)))
elif InstapyFollowed == True:
non_eligible = [user for user in automatedFollowedPool["all"] if user not in automatedFollowedPool["eligible"]]
logger.info("Total {} users available to unfollow ~didn't pass `unfollow_after`: {}\n".format(len(unfollow_list), len(non_eligible)))
if len(unfollow_list) < 1:
logger.info("There are no any users available to unfollow")
return 0
#choose the desired order of the elements
if style == "LIFO":
unfollow_list = list(reversed(unfollow_list))
elif style == "RANDOM":
random.shuffle(unfollow_list)
if amount > len(unfollow_list):
logger.info("You have requested more amount: {} than {} of users available to unfollow ~using available amount".format(
amount, len(unfollow_list)))
amount = len(unfollow_list)
# unfollow loop
try:
sleep_counter = 0
sleep_after = random.randint(8, 12)
for person in unfollow_list:
if unfollowNum >= amount:
logger.warning(
"--> Total unfollows reached it's amount given {}\n"
.format(unfollowNum))
break
if sleep_counter >= sleep_after:
delay_random = random.randint(ceil(sleep_delay*0.85), ceil(sleep_delay*1.14))
logger.info("Unfollowed {} new users ~sleeping about {}\n".format(sleep_counter,
'{} seconds'.format(delay_random) if delay_random < 60 else
'{} minutes'.format(float("{0:.2f}".format(delay_random/60)))))
sleep(delay_random)
sleep_counter = 0
sleep_after = random.randint(8, 12)
pass
if person not in dont_include:
browser.get('https://www.instagram.com/' + person)
sleep(2)
following, follow_button = get_following_status(browser, person, logger)
if following is None:
sirens_wailing = emergency_exit(browser, username, logger)
if sirens_wailing == True:
break
if following:
# click the button
click_element(browser, follow_button) # follow_button.click()
sleep(4)
try:
browser.find_element_by_xpath("//button[contains(text(), 'Unfollow')]").click()
sleep(4)
except Exception:
pass
# double check not following
follow_button = browser.find_element_by_xpath(
"//*[contains(text(), 'Follow')]")
if follow_button.text in ['Follow','Follow Back']:
unfollowNum += 1
sleep_counter += 1
update_activity('unfollows')
if person in relationship_data[username]["all_following"]:
relationship_data[username]["all_following"].remove(person)
logger.info(
'--> Ongoing Unfollow From InstaPy {}/{},'
' now unfollowing: {}'
.format(str(unfollowNum), amount, person.encode('utf-8')))
delete_line_from_file('{0}{1}_followedPool.csv'.format(logfolder, username), person +
",\n", logger)
print('')
sleep(15)
else:
logger.error("Unfollow error! ~username {} might be blocked\n".format(username))
# stop the loop
break
else:
# this user found in our list of unfollow but is not followed
if follow_button is None or follow_button.text not in ['Follow', 'Follow Back']:
log_uncertain_unfollowed_pool(username, person, logger, logfolder)
# save any unfollowed person
log_record_all_unfollowed(username, person, logger, logfolder)
logger.warning(
"--> Cannot Unfollow From InstaPy {}"
", maybe {} was unfollowed before..."
.format(str(unfollowNum), person.encode('utf-8')))
delete_line_from_file('{0}{1}_followedPool.csv'.format(logfolder, username),
person + ",\n", logger)
print('')
sleep(2)
else:
# if the user in dont include (should not be) we shall remove him from the follow list
# if he is a white list user (set at init and not during run time)
if person in white_list:
delete_line_from_file('{0}{1}_followedPool.csv'.format(logfolder, username),
person + ",\n", logger)
list_type = 'whitelist'
else:
list_type = 'dont_include'
logger.info("Not unfollowing '{}'! ~user is in the list {}\n".format(person, list_type))
continue
except BaseException as e:
logger.error("Unfollow loop error: {}\n".format(str(e)))
elif allFollowing == True:
logger.info("Unfollowing the users you are following")
# unfollow from profile
try:
following_link = browser.find_elements_by_xpath(
'//section//ul//li[3]')
click_element(browser, following_link[0]) # following_link[0].click()
# update server calls
update_activity()
except BaseException as e:
logger.error("following_link error {}".format(str(e)))
return 0
#scroll down the page to get sufficient amount of usernames
get_users_through_dialog(browser, None, username, amount,
allfollowing, False, None, None,
None, {"enabled":False, "percentage":0},
"Unfollow", logger, logfolder)
# find dialog box
dialog = browser.find_element_by_xpath(
"//div[text()='Following']/following-sibling::div")
sleep(3)
# get persons, unfollow buttons, and length of followed pool
person_list_a = dialog.find_elements_by_tag_name("a")
person_list = []
for person in person_list_a:
if person and hasattr(person, 'text') and person.text:
person_list.append(person.text)
follow_buttons = dialog.find_elements_by_tag_name('button')
#re-generate person list to unfollow according to the `unfollow_after` parameter
user_info = list(zip(follow_buttons, person_list))
non_eligible = []
not_found = []
for button, person in user_info:
if person not in automatedFollowedPool["all"]:
not_found.append(person)
elif (person in automatedFollowedPool["all"] and
person not in automatedFollowedPool["eligible"]):
non_eligible.append(person)
user_info = [pair for pair in user_info if pair[1] not in non_eligible]
logger.info("Total {} users available to unfollow"
" ~not found in 'followedPool.csv': {} | didn't pass `unfollow_after`: {}".format(
len(user_info), len(not_found), len(non_eligible)))
if len(user_info) < 1:
logger.info("There are no any users to unfollow")
return 0
elif len(user_info) < amount:
logger.info("Could not grab requested amount of usernames to unfollow: "
"{}/{} ~using available amount".format(len(user_info), amount))
amount = len(user_info)
if style == "LIFO":
user_info = list(reversed(user_info))
elif style == "RANDOM":
random.shuffle(user_info)
# unfollow loop
try:
hasSlept = False
for button, person in user_info:
if unfollowNum >= amount:
logger.info(
"--> Total unfollowNum reached it's amount given: {}"
.format(unfollowNum))
break
if (unfollowNum != 0 and
hasSlept is False and
unfollowNum % 10 == 0):
logger.info('sleeping for about {}min'
.format(int(sleep_delay/60)))
sleep(sleep_delay)
hasSlept = True
pass
if person not in dont_include:
unfollowNum += 1
click_element(browser, button) # button.click()
sleep(4)
try:
browser.find_element_by_xpath("//button[contains(text(), 'Unfollow')]").click()
sleep(4)
except Exception:
pass
update_activity('unfollows')
if person in relationship_data[username]["all_following"]:
relationship_data[username]["all_following"].remove(person)
logger.info(
'--> Ongoing Unfollow {}/{}, now unfollowing: {}'
.format(str(unfollowNum), amount, person.encode('utf-8')))
delete_line_from_file('{0}{1}_followedPool.csv'.format(logfolder, username), person +
",\n", logger)
print('')
sleep(15)
# To only sleep once until there is the next unfollow
if hasSlept:
hasSlept = False
continue
else:
logger.info("Not unfollowing '{}'! ~user is in the whitelist\n".format(person))
continue
except Exception as exc:
logger.error("Unfollow loop error:\n\n{}\n\n".format(str(exc).encode('utf-8')))
else:
logger.info("Please select a proper unfollow method! ~leaving unfollow activity\n")
return unfollowNum
def follow_user(browser, login, user_name, blacklist, logger, logfolder):
"""Follows the user of the currently opened image"""
follow_xpath = "//button[text()='Follow']"
try:
sleep(2)
follow_button = browser.find_element_by_xpath(follow_xpath)
if follow_button.is_displayed():
click_element(browser, follow_button) # follow_button.click()
else:
browser.execute_script(
"arguments[0].style.visibility = 'visible'; "
"arguments[0].style.height = '10px'; "
"arguments[0].style.width = '10px'; "
"arguments[0].style.opacity = 1", follow_button)
click_element(browser, follow_button) # follow_button.click()
update_activity('follows')
logger.info('--> Now following')
logtime = datetime.now().strftime('%Y-%m-%d %H:%M')
log_followed_pool(login, user_name, logger, logfolder, logtime)
follow_restriction("write", user_name, None, logger)
if blacklist['enabled'] is True:
action = 'followed'
add_user_to_blacklist(
user_name, blacklist['campaign'], action, logger, logfolder
)
sleep(3)
return 1
except NoSuchElementException:
logger.info('--> Already following')
sleep(1)
return 0
except StaleElementReferenceException:
# https://stackoverflow.com/questions/16166261/selenium-webdriver-how-to-resolve-stale-element-reference-exception
# 1. An element that is found on a web page referenced as a WebElement in WebDriver then the DOM changes
# (probably due to JavaScript functions) that WebElement goes stale.
# 2. The element has been deleted entirely.
logger.error('--> element that is found on a web page referenced while the DOM changes')
sleep(1)
return 0
def unfollow_user(browser, username, person, relationship_data, logger, logfolder):
"""Unfollows the user of the currently opened image"""
try:
unfollow_button = browser.find_element_by_xpath(
"//*[text()='Following' or text()='Requested']")
#"//*[contains(text(), 'Following')]") # or Requested
except NoSuchElementException:
logger.error("Could not locate \"Following\" or \"Requested\" button in order to unfollow '{}'!".format(person))
return 0
if unfollow_button.text in ['Following', 'Requested']:
click_element(browser, unfollow_button) # unfollow_button.send_keys("\n")
logger.warning("--> Unfollowed '{}' due to Inappropriate Content".format(person))
delete_line_from_file('{0}{1}_followedPool.csv'.format(logfolder, username), person +
",\n", logger)
if person in relationship_data[username]["all_following"]:
relationship_data[username]["all_following"].remove(person)
update_activity('unfollows')
sleep(3)
return 1
def follow_given_user(browser,
login,
acc_to_follow,
blacklist,
logger,
logfolder):
"""Follows a given user"""
user_link = "https://www.instagram.com/{}/".format(acc_to_follow)
#Check URL of the webpage, if it already is user's profile page, then do not navigate to it again
web_adress_navigator(browser, user_link)
logger.info('--> {} instagram account is opened...'.format(acc_to_follow))
try:
sleep(10)
follow_button = browser.find_element_by_xpath(
"//*[text()='Follow' or text()='Follow Back']")
click_element(browser, follow_button) # unfollow_button.send_keys("\n")
update_activity('follows')
logger.info('---> Now following: {}'.format(acc_to_follow))
logtime = datetime.now().strftime('%Y-%m-%d %H:%M')
log_followed_pool(login, acc_to_follow, logger, logfolder, logtime)
follow_restriction("write", acc_to_follow, None, logger)
if blacklist['enabled'] is True:
action = 'followed'
add_user_to_blacklist(
acc_to_follow, blacklist['campaign'], action, logger, logfolder
)
sleep(3)
return 1
except NoSuchElementException:
logger.warning('---> {} is already followed'.format(acc_to_follow))
sleep(3)
return 0
def get_users_through_dialog(browser,
login,
user_name,
amount,
users_count,
randomize,
dont_include,
blacklist,
follow_times,
simulation,
channel,
logger,
logfolder):
sleep(2)
person_followed = []
real_amount = amount
if randomize and amount >= 3:
# expanding the popultaion for better sampling distribution
amount = amount * 3
if amount > int(users_count*0.85): #taking 85 percent of possible amounts is a safe study
amount = int(users_count*0.85)
try_again = 0
sc_rolled = 0
# find dialog box
dialog = browser.find_element_by_xpath(
"//div[text()='Followers' or text()='Following']/following-sibling::div")
if channel == "Follow":
# get follow buttons. This approach will find the follow buttons and
# ignore the Unfollow/Requested buttons.
buttons = dialog.find_elements_by_xpath(
"//button[text()='Follow']")
elif channel == "Unfollow":
buttons = dialog.find_elements_by_xpath(
"//button[text() = 'Following']")
abort = False
person_list = []
total_list = len(buttons)
simulated_list = []
simulator_counter = 0
# scroll down if the generated list of user to follow is not enough to
# follow amount set
while (total_list < amount) and not abort:
before_scroll = total_list
for i in range(3):
scroll_bottom(browser, dialog, 2)
sc_rolled += 1
simulator_counter += 1
if channel == "Follow":
buttons = dialog.find_elements_by_xpath(
"//button[text()='Follow']")
elif channel == "Unfollow":
buttons = dialog.find_elements_by_xpath(
"//button[text() = 'Following']")
total_list = len(buttons)
abort = (before_scroll == total_list)
if abort:
if total_list < real_amount:
logger.info("Failed to load desired amount of users")
if sc_rolled > 85: #you may want to use up to 100
if total_list < amount:
logger.info("Too many requests sent! attempt: {} | gathered links: {} ~sleeping a bit ".format(try_again+1, total_list))
sleep(random.randint(600, 655))
try_again += 1
sc_rolled = 0
# Will follow a little bit of users in order to simulate real interaction
if (simulation["enabled"] == True and
simulation["percentage"] >= random.randint(1, 100) and
(simulator_counter > random.randint(5, 17) or
abort == True or
total_list >= amount or
sc_rolled == random.randint(3, 5)) and
len(buttons) > 0):
quick_amount = 1 if not total_list >= amount else random.randint(1, 4)
for i in range(0, quick_amount):
logger.info("Simulated follow : {}".format(len(simulated_list)+1))
quick_index = random.randint(0, len(buttons)-1)
quick_button = buttons[quick_index]
quick_username = dialog_username_extractor(quick_button)
if quick_username[0] not in simulated_list:
quick_follow = follow_through_dialog(browser,
login,
quick_username,
quick_button,
quick_amount,
dont_include,
blacklist,
follow_times,
logger,
logfolder)
simulated_list.extend(quick_follow)
simulator_counter = 0
person_list = dialog_username_extractor(buttons)
if randomize:
random.shuffle(person_list)
person_list = person_list[:(real_amount-len(simulated_list))]
for user in simulated_list: #add simulated users to the `person_list` in random index
if user not in person_list:
person_list.insert(random.randint(0, abs(len(person_list)-1)), user)
return person_list, simulated_list
def dialog_username_extractor(buttons):
""" Extract username of a follow button from a dialog box """
if not isinstance(buttons, list):
buttons = [buttons]
person_list = []
for person in buttons:
if person and hasattr(person, 'text') and person.text:
try:
person_list.append(person.find_element_by_xpath("../../../*")
.find_elements_by_tag_name("a")[1].text)
except IndexError:
pass # Element list is too short to have a [1] element
return person_list
def follow_through_dialog(browser,
login,
person_list,
buttons,
amount,
dont_include,
blacklist,
follow_times,
logger,
logfolder):
""" Will follow username directly inside a dialog box """
if not isinstance(person_list, list):
person_list = [person_list]
if not isinstance(buttons, list):
buttons = [buttons]
person_followed = []
try:
for person, button in zip(person_list, buttons):
if (person not in dont_include and
not follow_restriction("read", person, follow_times, logger)):
# Register this session's followed user for further interaction
person_followed.append(person)
click_element(browser, button)
sleep(1)
logtime = datetime.now().strftime('%Y-%m-%d %H:%M')
log_followed_pool(login, person, logger, logfolder, logtime)
update_activity('follows')
follow_restriction("write", person, None, logger)
logger.info('--> Followed {}'.format(person.encode('utf-8')))
if blacklist['enabled'] is True:
action = 'followed'
add_user_to_blacklist(
person, blacklist['campaign'], action, logger, logfolder
)
sleep(3)
else:
logger.info("Not followed '{}' ~inappropriate user".format(person))
except BaseException as e:
logger.error("Error occured while following through dialog box:\n{}".format(str(e)))
return person_followed
def get_given_user_followers(browser,
login,
user_name,
amount,
dont_include,
randomize,
blacklist,
follow_times,
simulation,
logger,
logfolder):
"""
For the given username, follow their followers.
:param browser: webdriver instance
:param login:
:param user_name: given username of account to follow
:param amount: the number of followers to follow
:param dont_include: ignore these usernames
:param random: randomly select from users' followers
:param blacklist:
:param follow_times:
:param logger: the logger instance
:param logfolder: the logger folder
:return: list of user's followers also followed
"""
user_name = user_name.strip()
browser.get('https://www.instagram.com/{}/'.format(user_name))
update_activity()
# check how many people are following this user.
try:
allfollowers = format_number(browser.find_element_by_xpath("//a[contains"
"(@href,'followers')]/span").text)
except NoSuchElementException:
try:
allfollowers = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].graphql.user.edge_followed_by.count")
except WebDriverException:
try:
browser.execute_script("location.reload()")
allfollowers = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].graphql.user.edge_followed_by.count")
except WebDriverException:
try:
topCount_elements = browser.find_elements_by_xpath(
"//span[contains(@class,'g47SY')]")
if topCount_elements:
allfollowers = format_number(topCount_elements[1].text)
else:
logger.info("Failed to get followers count of '{}' ~empty list".format(user_name))
allfollowers = None
except NoSuchElementException:
logger.error("Error occured during getting the followers count of '{}'\n".format(user_name))
return [], []
# skip early for no followers
if not allfollowers:
logger.info('{} has no followers'.format(user_name))
return [], []
elif allfollowers < amount:
logger.warning('{} has less followers than given amount of {}'.format(
allfollowers, amount))
# locate element to user's followers
try:
followers_link = browser.find_elements_by_xpath(
'//a[@href="/{}/followers/"]'.format(user_name))
click_element(browser, followers_link[0])
# update server calls
update_activity()
except NoSuchElementException:
logger.error('Could not find followers\' link for {}'.format(user_name))
return [], []
except BaseException as e:
logger.error("`followers_link` error {}".format(str(e)))
return [], []
channel = "Follow"
person_list, simulated_list = get_users_through_dialog(browser, login, user_name, amount,
allfollowers, randomize, dont_include,
blacklist, follow_times, simulation,
channel, logger, logfolder)
return person_list, simulated_list
def get_given_user_following(browser,
login,
user_name,
amount,
dont_include,
randomize,
blacklist,
follow_times,
simulation,
logger,
logfolder):
user_name = user_name.strip()
browser.get('https://www.instagram.com/{}/'.format(user_name))
# update server calls
update_activity()
# check how many poeple are following this user.
# throw RuntimeWarning if we are 0 people following this user
try:
allfollowing = format_number(browser.find_element_by_xpath("//a[contains"
"(@href,'following')]/span").text)
except NoSuchElementException:
try:
allfollowing = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].graphql.user.edge_follow.count")
except WebDriverException:
try:
browser.execute_script("location.reload()")
allfollowing = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].graphql.user.edge_follow.count")
except WebDriverException:
try:
topCount_elements = browser.find_elements_by_xpath(
"//span[contains(@class,'g47SY')]")
if topCount_elements:
allfollowing = format_number(topCount_elements[2].text)
else:
logger.info("Failed to get following count of '{}' ~empty list".format(user_name))
allfollowing = None
except NoSuchElementException:
logger.error("\nError occured during getting the following count of '{}'\n".format(user_name))
return [], []
# skip early for no followers
if not allfollowing:
logger.info('{} has no any following'.format(user_name))
return [], []
elif allfollowing < amount:
logger.warning('{} has less following than given amount of {}'.format(
allfollowing, amount))
try:
following_link = browser.find_elements_by_xpath(
'//a[@href="/{}/following/"]'.format(user_name))
click_element(browser, following_link[0])
# update server calls
update_activity()
except NoSuchElementException:
logger.error('Could not find following\'s link for {}'.format(user_name))
return [], []
except BaseException as e:
logger.error("`following_link` error {}".format(str(e)))
return [], []
channel = "Follow"
person_list, simulated_list = get_users_through_dialog(browser, login, user_name, amount,
allfollowing, randomize, dont_include,
blacklist, follow_times, simulation,
channel, logger, logfolder)
return person_list, simulated_list
def dump_follow_restriction(profile_name, logger, logfolder):
""" Dump follow restriction data to a local human-readable JSON """
try:
# get a DB and start a connection
db, id = get_database()
conn = sqlite3.connect(db)
with conn:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM followRestriction WHERE profile_id=:var", {"var":id})
data = cur.fetchall()
if data:
# get the existing data
filename = "{}followRestriction.json".format(logfolder)
if os.path.isfile(filename):
with open(filename) as followResFile:
current_data = json.load(followResFile)
else:
current_data = {}
# pack the new data
follow_data = {user_data[1]: user_data[2] for user_data in data or []}
current_data[profile_name] = follow_data
# dump the fresh follow data to a local human readable JSON
with open(filename, 'w') as followResFile:
json.dump(current_data, followResFile)
except Exception as exc:
logger.error("Pow! Error occured while dumping follow restriction data to a local JSON:\n\t{}".format(str(exc).encode("utf-8")))
finally:
if conn:
# close the open connection
conn.close()
def follow_restriction(operation, username, limit, logger):
""" Keep track of the followed users and help avoid excessive follow of the same user """
try:
# get a DB and start a connection
db, id = get_database()
conn = sqlite3.connect(db)
with conn:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("SELECT * FROM followRestriction WHERE profile_id=:id_var AND username=:name_var", {"id_var":id, "name_var":username})
data = cur.fetchone()
follow_data = dict(data) if data else None
if operation == "write":
if follow_data is None:
# write a new record
cur.execute("INSERT INTO followRestriction (profile_id, username, times) VALUES (?, ?, ?)",
(id, username, 1))
else:
# update the existing record
follow_data["times"] += 1
sql = ("UPDATE followRestriction set times = ? WHERE profile_id=? AND username = ?")
cur.execute(sql, (follow_data["times"], id, username))
# commit the latest changes
conn.commit()
elif operation == "read":
if follow_data is None:
return False
elif follow_data["times"] < limit:
return False
else:
exceed_msg = "" if follow_data["times"] == limit else "more than "
logger.info("---> {} has already been followed {}{} times"
.format(username, exceed_msg, str(limit)))
return True
except Exception as exc:
logger.error("Dap! Error occured with follow Restriction:\n\t{}".format(str(exc).encode("utf-8")))
finally:
if conn:
#close the open connection
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment