Last active
January 18, 2023 20:24
-
-
Save orendon/e18e54b65c101817cb54b0f8c8561152 to your computer and use it in GitHub Desktop.
Visa appointment (renewal)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[USVISA] | |
; Account and current appointment info from https://ais.usvisa-info.com | |
USERNAME = YOUR_EMAIL | |
PASSWORD = YOUR_PASSWORD | |
SCHEDULE_ID = YOUR_ID | |
MY_SCHEDULE_DATE = YYYY-MM-DD | |
; Spanish - Colombia | |
COUNTRY_CODE = es-co | |
; Bogotá | |
FACILITY_ID = 26 | |
[CHROMEDRIVER] | |
; Details for the script to control Chrome | |
LOCAL_USE = True | |
; Optional: HUB_ADDRESS is mandatory only when LOCAL_USE = False | |
HUB_ADDRESS = http://localhost:9515/wd/hub | |
[PUSHOVER] | |
; Get push notifications via https://pushover.net/ (optional) | |
PUSH_TOKEN = | |
PUSH_USER = | |
[SENDGRID] | |
; Get email notifications via https://sendgrid.com/ (optional) | |
SENDGRID_API_KEY = |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
selenium==4.2.0 | |
webdriver-manager==3.7.0 | |
requests==2.27.1 | |
sendgrid==6.9.7 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf8 -*- | |
import time | |
import json | |
import random | |
import platform | |
import configparser | |
from datetime import datetime | |
import requests | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait as Wait | |
from selenium.webdriver.common.by import By | |
from webdriver_manager.chrome import ChromeDriverManager | |
from sendgrid import SendGridAPIClient | |
from sendgrid.helpers.mail import Mail | |
config = configparser.ConfigParser() | |
config.read('config.ini') | |
USERNAME = config['USVISA']['USERNAME'] | |
PASSWORD = config['USVISA']['PASSWORD'] | |
SCHEDULE_ID = config['USVISA']['SCHEDULE_ID'] | |
MY_SCHEDULE_DATE = config['USVISA']['MY_SCHEDULE_DATE'] | |
COUNTRY_CODE = config['USVISA']['COUNTRY_CODE'] | |
FACILITY_ID = config['USVISA']['FACILITY_ID'] | |
SENDGRID_API_KEY = config['SENDGRID']['SENDGRID_API_KEY'] | |
PUSH_TOKEN = config['PUSHOVER']['PUSH_TOKEN'] | |
PUSH_USER = config['PUSHOVER']['PUSH_USER'] | |
LOCAL_USE = config['CHROMEDRIVER'].getboolean('LOCAL_USE') | |
HUB_ADDRESS = config['CHROMEDRIVER']['HUB_ADDRESS'] | |
REGEX_CONTINUE = "//a[contains(text(),'Continuar')]" | |
def MY_CONDITION(year, month, day): | |
if int(year) == 2022 and int(month) == 8: | |
return False | |
return True | |
STEP_TIME = 0.5 # time between steps (interactions with forms): 0.5 seconds | |
RETRY_TIME = 60*5 # wait time between retries/checks for available dates: 5 minutes | |
EXCEPTION_TIME = 60*15 # wait time when an exception occurs: 15 minutes | |
COOLDOWN_TIME = 60*90 # wait time when temporary banned (empty list): 90 minutes | |
DATE_URL = f"https://ais.usvisa-info.com/{COUNTRY_CODE}/niv/schedule/{SCHEDULE_ID}/appointment/days/{FACILITY_ID}.json?appointments[expedite]=false" | |
TIME_URL = f"https://ais.usvisa-info.com/{COUNTRY_CODE}/niv/schedule/{SCHEDULE_ID}/appointment/times/{FACILITY_ID}.json?date=%s&appointments[expedite]=false" | |
APPOINTMENT_URL = f"https://ais.usvisa-info.com/{COUNTRY_CODE}/niv/schedule/{SCHEDULE_ID}/appointment" | |
EXIT = False | |
def send_notification(msg): | |
print(f"Sending notification: {msg}") | |
if SENDGRID_API_KEY: | |
message = Mail( | |
from_email=USERNAME, | |
to_emails=USERNAME, | |
subject=msg, | |
html_content=msg) | |
try: | |
sg = SendGridAPIClient(SENDGRID_API_KEY) | |
response = sg.send(message) | |
print(response.status_code) | |
print(response.body) | |
print(response.headers) | |
except Exception as e: | |
print(e.message) | |
if PUSH_TOKEN: | |
url = "https://api.pushover.net/1/messages.json" | |
data = { | |
"token": PUSH_TOKEN, | |
"user": PUSH_USER, | |
"message": msg | |
} | |
requests.post(url, data) | |
def get_driver(): | |
if LOCAL_USE: | |
dr = webdriver.Chrome(service=Service(ChromeDriverManager().install())) | |
else: | |
dr = webdriver.Remote(command_executor=HUB_ADDRESS, options=webdriver.ChromeOptions()) | |
return dr | |
driver = get_driver() | |
def login(): | |
# Bypass reCAPTCHA | |
driver.get(f"https://ais.usvisa-info.com/{COUNTRY_CODE}/niv") | |
time.sleep(STEP_TIME) | |
a = driver.find_element(By.XPATH, '//a[@class="down-arrow bounce"]') | |
a.click() | |
time.sleep(STEP_TIME) | |
print("Login start...") | |
href = driver.find_element(By.XPATH, '//*[@id="header"]/nav/div[2]/div[1]/ul/li[3]/a') | |
href.click() | |
time.sleep(STEP_TIME) | |
Wait(driver, 60).until(EC.presence_of_element_located((By.NAME, "commit"))) | |
print("\tclick bounce") | |
a = driver.find_element(By.XPATH, '//a[@class="down-arrow bounce"]') | |
a.click() | |
time.sleep(STEP_TIME) | |
do_login_action() | |
def do_login_action(): | |
print("\tinput email") | |
user = driver.find_element(By.ID, 'user_email') | |
user.send_keys(USERNAME) | |
time.sleep(random.randint(1, 3)) | |
print("\tinput pwd") | |
pw = driver.find_element(By.ID, 'user_password') | |
pw.send_keys(PASSWORD) | |
time.sleep(random.randint(1, 3)) | |
print("\tclick privacy") | |
box = driver.find_element(By.CLASS_NAME, 'icheckbox') | |
box .click() | |
time.sleep(random.randint(1, 3)) | |
print("\tcommit") | |
btn = driver.find_element(By.NAME, 'commit') | |
btn.click() | |
time.sleep(random.randint(1, 3)) | |
Wait(driver, 60).until( | |
EC.presence_of_element_located((By.XPATH, REGEX_CONTINUE))) | |
print("\tlogin successful!") | |
def get_date(): | |
driver.get(DATE_URL) | |
if not is_logged_in(): | |
login() | |
return get_date() | |
else: | |
content = driver.find_element(By.TAG_NAME, 'pre').text | |
date = json.loads(content) | |
return date | |
def get_time(date): | |
time_url = TIME_URL % date | |
driver.get(time_url) | |
content = driver.find_element(By.TAG_NAME, 'pre').text | |
data = json.loads(content) | |
time = data.get("available_times")[-1] | |
print(f"Got time successfully! {date} {time}") | |
return time | |
def reschedule(date): | |
global EXIT | |
print(f"Starting Reschedule ({date})") | |
time = get_time(date) | |
driver.get(APPOINTMENT_URL) | |
data = { | |
"utf8": driver.find_element(by=By.NAME, value='utf8').get_attribute('value'), | |
"authenticity_token": driver.find_element(by=By.NAME, value='authenticity_token').get_attribute('value'), | |
"confirmed_limit_message": driver.find_element(by=By.NAME, value='confirmed_limit_message').get_attribute('value'), | |
"appointments[asc_appointment][facility_id]": FACILITY_ID, | |
"appointments[asc_appointment][date]": date, | |
"appointments[asc_appointment][time]": time, | |
} | |
headers = { | |
"User-Agent": driver.execute_script("return navigator.userAgent;"), | |
"Referer": APPOINTMENT_URL, | |
"Cookie": "_yatri_session=" + driver.get_cookie("_yatri_session")["value"] | |
} | |
r = requests.post(APPOINTMENT_URL, headers=headers, data=data) | |
print(r.text) | |
if(r.text.find('Successfully Scheduled') != -1 or r.text.find('programado exitosamente')): | |
msg = f"Rescheduled Successfully! {date} {time}" | |
send_notification(msg) | |
EXIT = True | |
else: | |
msg = f"Reschedule Failed. {date} {time}" | |
send_notification(msg) | |
def is_logged_in(): | |
content = driver.page_source | |
if(content.find("error") != -1): | |
return False | |
return True | |
def print_dates(dates): | |
print("Available dates:") | |
for d in dates: | |
print("%s \t business_day: %s" % (d.get('date'), d.get('business_day'))) | |
print() | |
last_seen = None | |
def get_available_date(dates): | |
global last_seen | |
def is_earlier(date): | |
my_date = datetime.strptime(MY_SCHEDULE_DATE, "%Y-%m-%d") | |
new_date = datetime.strptime(date, "%Y-%m-%d") | |
result = my_date > new_date | |
print(f'Is {my_date} > {new_date}:\t{result}') | |
return result | |
print("Checking for an earlier date:") | |
for d in dates: | |
date = d.get('date') | |
if is_earlier(date) and date != last_seen: | |
year, month, day = date.split('-') | |
if MY_CONDITION(year, month, day): | |
last_seen = date | |
return date | |
def push_notification(dates): | |
msg = "date: " | |
for d in dates: | |
msg = msg + d.get('date') + '; ' | |
send_notification(msg) | |
if __name__ == "__main__": | |
login() | |
retry_count = 0 | |
while 1: | |
if retry_count > 6: | |
break | |
try: | |
print("------------------") | |
print(datetime.today()) | |
print(f"Retry count: {retry_count}") | |
print() | |
dates = get_date()[:5] | |
if not dates: | |
msg = "List is empty (ban?)" | |
send_notification(msg) | |
# EXIT = True | |
else: | |
print_dates(dates) | |
date = get_available_date(dates) | |
print() | |
print(f"New date: {date}") | |
if date: | |
reschedule(date) | |
push_notification(dates) | |
if(EXIT): | |
print("------------------exit") | |
break | |
if not dates: | |
time.sleep(COOLDOWN_TIME) | |
else: | |
time.sleep(RETRY_TIME) | |
except Exception as err: | |
retry_count += 1 | |
time.sleep(EXCEPTION_TIME) | |
if(not EXIT): | |
send_notification("HELP! Crashed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment