Skip to content

Instantly share code, notes, and snippets.

@atul161
Last active August 28, 2024 13:58
Show Gist options
  • Save atul161/0b2a0263dd33cf77e51ecfd647cb46f3 to your computer and use it in GitHub Desktop.
Save atul161/0b2a0263dd33cf77e51ecfd647cb46f3 to your computer and use it in GitHub Desktop.
Data-exrteact
import os
import time
import uuid
import requests
from PIL import Image
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from lxml import html
# Constants
STATES_AND_UTS = [
"Andaman And Nicobar Islands", "Andhra Pradesh", "Arunachal Pradesh", "Assam", "Bihar",
"Chandigarh", "Chhattisgarh", "Delhi", "Goa", "Gujarat", "Haryana", "Himachal Pradesh",
"Jammu And Kashmir", "Jharkhand", "Karnataka", "Kerala", "Ladakh", "Lakshadweep",
"Madhya Pradesh", "Maharashtra", "Manipur", "Meghalaya", "Mizoram", "Nagaland",
"Odisha", "Puducherry", "Punjab", "Rajasthan", "Sikkim", "Tamil Nadu", "Telangana",
"The Dadra And Nagar Haveli And Daman And Diu", "Tripura", "Uttarakhand", "Uttar Pradesh",
"West Bengal"
]
URL = "https://lgdirectory.gov.in"
DOWNLOAD_DIR = "/Users/atulanand/Desktop/etl-temporal/csv"
WIDTH = 1200
HEIGHT = 800
WAIT_TIME = 50
FILE_EXTENSION = '.csv'
COMPLETED = []
# Setup WebDriver with Chrome options
options = webdriver.ChromeOptions()
prefs = {"download.default_directory": DOWNLOAD_DIR}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
driver.set_window_size(WIDTH, HEIGHT)
driver.implicitly_wait(WAIT_TIME)
driver.delete_all_cookies()
def open_browser_with_size():
driver.get(URL)
time.sleep(2)
# Navigate to Urban view
urban_xpath = "//a[contains(@href, 'globalviewvillageforcitizen') and contains(text(),'Urban')]"
try:
osrf_url = driver.find_element(By.XPATH, urban_xpath).get_attribute("href")
driver.get(osrf_url)
except NoSuchElementException:
print("Urban view link not found.")
driver.quit()
return
xpath_hierarchy = "//input[@id='searchByHierarchy']"
driver.find_element(By.XPATH, xpath_hierarchy).click()
for state in STATES_AND_UTS:
select_state(state)
# Uncomment the next line if you want to process all states
# break
def select_state(state_name):
try:
select_element_state = driver.find_element(By.ID, 'ddSourceState')
select_element_state.click()
time.sleep(1)
state_option_xpath = f"//select[@id='ddSourceState']/option[text()='{state_name}']"
driver.find_element(By.XPATH, state_option_xpath).click()
time.sleep(1)
select_districts_in_state(state_name)
except NoSuchElementException:
print(f"State selection elements not found for {state_name}.")
def check_element_by_xpath(xpath_expression):
try:
html_content = driver.page_source
tree = html.fromstring(html_content)
elements = tree.xpath(xpath_expression)
if elements:
print(f'{len(elements)} element(s) found.')
return True
else:
print('Element not found.')
return False
except requests.exceptions.RequestException as e:
return f'Error fetching the page: {e}'
def select_districts_in_state(state_name):
try:
select_element_district = driver.find_element(By.ID, 'ddSourceDistrict')
districts = select_element_district.find_elements(By.TAG_NAME, 'option')
for district in districts:
if "Select" in district.text:
continue
print(f"Processing district: {district.text}")
district.click()
time.sleep(5)
file_name = f"{district.text}.csv"
result = enter_captcha_code(file_name, district_name=district.text, state_name=state_name)
if result:
# todo
driver.quit()
exit(0)
break
except NoSuchElementException:
print("District selection elements not found.")
def image_to_text(image_path):
LicenseCode = 'CMt4/BxR1sW1Af6JUykUtQ==EXpqHE0B3W1B1tp2'
api_url = 'https://api.api-ninjas.com/v1/imagetotext'
try:
with open(image_path, 'rb') as image_file_descriptor:
files = {'image': image_file_descriptor}
response = requests.post(api_url, files=files)
response.raise_for_status()
if response.status_code == 200:
resps = response.json()
txt = []
final_txt = ""
for resp in resps:
text = str(resp["text"]).strip().replace("Captcha", "").replace("Audio", "")
text = ''.join(text.splitlines())
if text:
final_txt += text
print(final_txt)
return final_txt
else:
return {"error": f"Unexpected status code {response.status_code}: {response.text}"}
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return {"error": str(e)}
except FileNotFoundError:
print("The image file was not found.")
return {"error": "The image file was not found."}
def capture_and_crop_captcha(driver, crop_height_percentage=75):
image_name = uuid.uuid4()
screenshot_path_full = f'{image_name}.png'
driver.save_screenshot(screenshot_path_full)
with Image.open(screenshot_path_full) as image:
width, height = image.size
crop_height = height * crop_height_percentage / 100
cropped_image = image.crop((0, crop_height, width, height))
crop_image_name = f"{image_name}_cropped.png"
cropped_image.save(crop_image_name)
os.remove(screenshot_path_full)
return crop_image_name
def enter_captcha_code(file_name, state_name, district_name):
import re
count = 0
new_file_name = f"{state_name}_{district_name}"
while count < 20:
driver.execute_script("window.scrollTo(0, 0);")
crop_image_name = capture_and_crop_captcha(driver)
captcha_text = image_to_text(crop_image_name)
print("Got captcha text", captcha_text)
if not re.fullmatch(r'[A-Z0-9]{6}', captcha_text):
try:
driver.find_element(By.XPATH, "//i[@class='fa fa-retweet fa-2x']").click()
time.sleep(2)
except NoSuchElementException:
print("Change captcha button not found.")
os.remove(crop_image_name)
time.sleep(3)
count += 1
print("Captcha changed....")
continue
driver.find_element(By.XPATH, '//input[@id="captchaAnswer"]').send_keys(captcha_text)
time.sleep(3)
driver.execute_script("window.scrollTo(0, 0);")
driver.find_element(By.XPATH, '//button[@id="actionFetchDetails"]').click()
time.sleep(2)
strong_error_xpath = "//strong[contains(text(), 'CAPTCHA image code was entered incorrectly')]"
if check_element_by_xpath(strong_error_xpath):
os.remove(crop_image_name)
time.sleep(1)
count += 1
print("Captcha changed....")
continue
try:
download_element = driver.find_element(By.XPATH, "//button[contains(text(), 'Spreadsheet')]")
time.sleep(2)
download_element.click()
time.sleep(3)
print("Successfully downloaded.....")
return True
except NoSuchElementException:
print("Download button not found.")
count += 1
driver.execute_script("window.scrollTo(0, 0);")
os.remove(crop_image_name)
try:
driver.find_element(By.XPATH, "//i[@class='fa fa-retweet fa-2x']").click()
except NoSuchElementException:
print("Change captcha button not found.")
time.sleep(1)
continue
return False
class NewFileHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
print(f"New file created: {event.src_path}")
def rename_downloaded_file(download_dir, new_file_name, file_extension, retry_limit=5, retry_delay=2):
before_files = set(os.listdir(download_dir))
for attempt in range(retry_limit):
time.sleep(retry_delay)
after_files = set(os.listdir(download_dir))
new_files = after_files - before_files
if new_files:
downloaded_file = new_files.pop()
old_file_path = os.path.join(download_dir, downloaded_file)
new_file_path = os.path.join(download_dir, new_file_name + file_extension)
os.rename(old_file_path, new_file_path)
print(f"File renamed to: {new_file_name + file_extension}")
return new_file_path
else:
print(f"Attempt {attempt + 1} of {retry_limit}: File not found, retrying in {retry_delay} seconds...")
print("File not found after maximum retries.")
return None
# Start the process
try:
open_browser_with_size()
except StaleElementReferenceException:
print("Element became stale while quitting.")
finally:
# Ensure driver.quit() is called in all scenarios
try:
driver.quit()
except StaleElementReferenceException:
print("Element became stale while quitting.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment