Last active
August 28, 2024 13:58
-
-
Save atul161/0b2a0263dd33cf77e51ecfd647cb46f3 to your computer and use it in GitHub Desktop.
Data-exrteact
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import uuid | |
import requests | |
from PIL import Image | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from watchdog.observers import Observer | |
from watchdog.events import FileSystemEventHandler | |
from lxml import html | |
# Constants | |
STATES_AND_UTS = [ | |
"Andaman And Nicobar Islands", "Andhra Pradesh", "Arunachal Pradesh", "Assam", "Bihar", | |
"Chandigarh", "Chhattisgarh", "Delhi", "Goa", "Gujarat", "Haryana", "Himachal Pradesh", | |
"Jammu And Kashmir", "Jharkhand", "Karnataka", "Kerala", "Ladakh", "Lakshadweep", | |
"Madhya Pradesh", "Maharashtra", "Manipur", "Meghalaya", "Mizoram", "Nagaland", | |
"Odisha", "Puducherry", "Punjab", "Rajasthan", "Sikkim", "Tamil Nadu", "Telangana", | |
"The Dadra And Nagar Haveli And Daman And Diu", "Tripura", "Uttarakhand", "Uttar Pradesh", | |
"West Bengal" | |
] | |
URL = "https://lgdirectory.gov.in" | |
DOWNLOAD_DIR = "/Users/atulanand/Desktop/etl-temporal/csv" | |
WIDTH = 1200 | |
HEIGHT = 800 | |
WAIT_TIME = 50 | |
FILE_EXTENSION = '.csv' | |
COMPLETED = [] | |
# Setup WebDriver with Chrome options | |
options = webdriver.ChromeOptions() | |
prefs = {"download.default_directory": DOWNLOAD_DIR} | |
options.add_experimental_option("prefs", prefs) | |
driver = webdriver.Chrome(options=options) | |
driver.set_window_size(WIDTH, HEIGHT) | |
driver.implicitly_wait(WAIT_TIME) | |
driver.delete_all_cookies() | |
def open_browser_with_size(): | |
driver.get(URL) | |
time.sleep(2) | |
# Navigate to Urban view | |
urban_xpath = "//a[contains(@href, 'globalviewvillageforcitizen') and contains(text(),'Urban')]" | |
try: | |
osrf_url = driver.find_element(By.XPATH, urban_xpath).get_attribute("href") | |
driver.get(osrf_url) | |
except NoSuchElementException: | |
print("Urban view link not found.") | |
driver.quit() | |
return | |
xpath_hierarchy = "//input[@id='searchByHierarchy']" | |
driver.find_element(By.XPATH, xpath_hierarchy).click() | |
for state in STATES_AND_UTS: | |
select_state(state) | |
# Uncomment the next line if you want to process all states | |
# break | |
def select_state(state_name): | |
try: | |
select_element_state = driver.find_element(By.ID, 'ddSourceState') | |
select_element_state.click() | |
time.sleep(1) | |
state_option_xpath = f"//select[@id='ddSourceState']/option[text()='{state_name}']" | |
driver.find_element(By.XPATH, state_option_xpath).click() | |
time.sleep(1) | |
select_districts_in_state(state_name) | |
except NoSuchElementException: | |
print(f"State selection elements not found for {state_name}.") | |
def check_element_by_xpath(xpath_expression): | |
try: | |
html_content = driver.page_source | |
tree = html.fromstring(html_content) | |
elements = tree.xpath(xpath_expression) | |
if elements: | |
print(f'{len(elements)} element(s) found.') | |
return True | |
else: | |
print('Element not found.') | |
return False | |
except requests.exceptions.RequestException as e: | |
return f'Error fetching the page: {e}' | |
def select_districts_in_state(state_name): | |
try: | |
select_element_district = driver.find_element(By.ID, 'ddSourceDistrict') | |
districts = select_element_district.find_elements(By.TAG_NAME, 'option') | |
for district in districts: | |
if "Select" in district.text: | |
continue | |
print(f"Processing district: {district.text}") | |
district.click() | |
time.sleep(5) | |
file_name = f"{district.text}.csv" | |
result = enter_captcha_code(file_name, district_name=district.text, state_name=state_name) | |
if result: | |
# todo | |
driver.quit() | |
exit(0) | |
break | |
except NoSuchElementException: | |
print("District selection elements not found.") | |
def image_to_text(image_path): | |
LicenseCode = 'CMt4/BxR1sW1Af6JUykUtQ==EXpqHE0B3W1B1tp2' | |
api_url = 'https://api.api-ninjas.com/v1/imagetotext' | |
try: | |
with open(image_path, 'rb') as image_file_descriptor: | |
files = {'image': image_file_descriptor} | |
response = requests.post(api_url, files=files) | |
response.raise_for_status() | |
if response.status_code == 200: | |
resps = response.json() | |
txt = [] | |
final_txt = "" | |
for resp in resps: | |
text = str(resp["text"]).strip().replace("Captcha", "").replace("Audio", "") | |
text = ''.join(text.splitlines()) | |
if text: | |
final_txt += text | |
print(final_txt) | |
return final_txt | |
else: | |
return {"error": f"Unexpected status code {response.status_code}: {response.text}"} | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
return {"error": str(e)} | |
except FileNotFoundError: | |
print("The image file was not found.") | |
return {"error": "The image file was not found."} | |
def capture_and_crop_captcha(driver, crop_height_percentage=75): | |
image_name = uuid.uuid4() | |
screenshot_path_full = f'{image_name}.png' | |
driver.save_screenshot(screenshot_path_full) | |
with Image.open(screenshot_path_full) as image: | |
width, height = image.size | |
crop_height = height * crop_height_percentage / 100 | |
cropped_image = image.crop((0, crop_height, width, height)) | |
crop_image_name = f"{image_name}_cropped.png" | |
cropped_image.save(crop_image_name) | |
os.remove(screenshot_path_full) | |
return crop_image_name | |
def enter_captcha_code(file_name, state_name, district_name): | |
import re | |
count = 0 | |
new_file_name = f"{state_name}_{district_name}" | |
while count < 20: | |
driver.execute_script("window.scrollTo(0, 0);") | |
crop_image_name = capture_and_crop_captcha(driver) | |
captcha_text = image_to_text(crop_image_name) | |
print("Got captcha text", captcha_text) | |
if not re.fullmatch(r'[A-Z0-9]{6}', captcha_text): | |
try: | |
driver.find_element(By.XPATH, "//i[@class='fa fa-retweet fa-2x']").click() | |
time.sleep(2) | |
except NoSuchElementException: | |
print("Change captcha button not found.") | |
os.remove(crop_image_name) | |
time.sleep(3) | |
count += 1 | |
print("Captcha changed....") | |
continue | |
driver.find_element(By.XPATH, '//input[@id="captchaAnswer"]').send_keys(captcha_text) | |
time.sleep(3) | |
driver.execute_script("window.scrollTo(0, 0);") | |
driver.find_element(By.XPATH, '//button[@id="actionFetchDetails"]').click() | |
time.sleep(2) | |
strong_error_xpath = "//strong[contains(text(), 'CAPTCHA image code was entered incorrectly')]" | |
if check_element_by_xpath(strong_error_xpath): | |
os.remove(crop_image_name) | |
time.sleep(1) | |
count += 1 | |
print("Captcha changed....") | |
continue | |
try: | |
download_element = driver.find_element(By.XPATH, "//button[contains(text(), 'Spreadsheet')]") | |
time.sleep(2) | |
download_element.click() | |
time.sleep(3) | |
print("Successfully downloaded.....") | |
return True | |
except NoSuchElementException: | |
print("Download button not found.") | |
count += 1 | |
driver.execute_script("window.scrollTo(0, 0);") | |
os.remove(crop_image_name) | |
try: | |
driver.find_element(By.XPATH, "//i[@class='fa fa-retweet fa-2x']").click() | |
except NoSuchElementException: | |
print("Change captcha button not found.") | |
time.sleep(1) | |
continue | |
return False | |
class NewFileHandler(FileSystemEventHandler): | |
def on_created(self, event): | |
if not event.is_directory: | |
print(f"New file created: {event.src_path}") | |
def rename_downloaded_file(download_dir, new_file_name, file_extension, retry_limit=5, retry_delay=2): | |
before_files = set(os.listdir(download_dir)) | |
for attempt in range(retry_limit): | |
time.sleep(retry_delay) | |
after_files = set(os.listdir(download_dir)) | |
new_files = after_files - before_files | |
if new_files: | |
downloaded_file = new_files.pop() | |
old_file_path = os.path.join(download_dir, downloaded_file) | |
new_file_path = os.path.join(download_dir, new_file_name + file_extension) | |
os.rename(old_file_path, new_file_path) | |
print(f"File renamed to: {new_file_name + file_extension}") | |
return new_file_path | |
else: | |
print(f"Attempt {attempt + 1} of {retry_limit}: File not found, retrying in {retry_delay} seconds...") | |
print("File not found after maximum retries.") | |
return None | |
# Start the process | |
try: | |
open_browser_with_size() | |
except StaleElementReferenceException: | |
print("Element became stale while quitting.") | |
finally: | |
# Ensure driver.quit() is called in all scenarios | |
try: | |
driver.quit() | |
except StaleElementReferenceException: | |
print("Element became stale while quitting.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment