Last active
November 8, 2019 16:17
-
-
Save arieltorti/bcc435e517a5f0fde7f4104980f07bb8 to your computer and use it in GitHub Desktop.
PASO 2019 Scraper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# @Author: Ariel Torti | |
# @GitHub: github.com/maks500 | |
import argparse | |
import logging | |
import os | |
import shutil | |
import sys | |
import time | |
import itertools | |
from threading import Thread, Lock | |
import requests | |
from queue import Queue | |
logger = logging.getLogger(__name__) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
BASE_DIR = os.path.dirname(os.path.realpath(__file__)) | |
MAX_RETRIES = 5 | |
def _client_blocked(): | |
return requests.get('https://resultados2019.gob.ar').status_code == 403 | |
class PasoScrapper: | |
"""The election process is divided intro 6 hierarchical levels. | |
1 - Country. Not needed here. | |
2 - Province. | |
3 - Department. | |
4 - Circuit. | |
5 - Region. | |
6 - Precinct. | |
We can get the first 4 levels from a single file. From then on we have | |
to request info for each region inside each circuit, and each precinct inside | |
each region. | |
""" | |
PROVINCE, DEPARTMENT, CIRCUIT, REGION, PRECINCT = range(2, 7) | |
STARTING_LEVEL = PROVINCE | |
LAST_LEVEL = CIRCUIT | |
LEVELS_TO_CC = [0, 2, 5, 11] | |
def __init__(self, threads, output, flat=False, info_thread=False, corrupt_file_threshold=150000): | |
self.result_name = output | |
self.results_dir = os.path.join(BASE_DIR, self.result_name) | |
self.flat = flat | |
self.sites_base_url = 'https://resultados2019.gob.ar/assets/data/{}' | |
self.image_base_url = 'https://resultados2019.gob.ar/opt/jboss/rct/tally/pages/{}/{}.png' | |
self.initial_json = None | |
self.cc_to_folder = {} | |
self.region_queue = Queue() | |
self.telegram_queue = Queue() | |
self.THREADS = threads | |
self.failed = 0 | |
self.failed_lock = Lock() | |
self.info_thread = info_thread | |
self.downloaded = 0 | |
self.downloaded_lock = Lock() | |
self.already_downloaded = 0 | |
self.already_downloaded_lock = Lock() | |
# Lets be a little conservative here, better to have | |
# a false positive than a false negative. | |
self.CORRUPT_FILE_SIZE_THRESHOLD = corrupt_file_threshold | |
def requests_information_thread_logger(self): | |
SLEEP_TIME = 60*5 | |
old_failed = old_downloaded = old_already_downloaded = 0 | |
while True: | |
print("[x] {} failed requests".format(self.failed)) | |
print("[x] {} succeded requests".format(self.downloaded)) | |
print("[x] {} skipped images".format(self.already_downloaded)) | |
failed_diff = self.failed - old_failed | |
print("[x] {} New failed requests." | |
" {:.2f} p/min".format(failed_diff, failed_diff/SLEEP_TIME*60)) | |
downloaded_diff = self.downloaded - old_downloaded | |
print("[x] {} New downloaded images." | |
" {:.2f} p/min".format(downloaded_diff, downloaded_diff/SLEEP_TIME*60)) | |
already_downloaded_diff = self.already_downloaded - old_already_downloaded | |
print("[x] {} New skipped images." | |
" {:.2f} p/min".format(already_downloaded_diff, already_downloaded_diff/SLEEP_TIME*60)) | |
# Bear in mind that as we are not using any locks here the values may be | |
# slightly wrong. | |
old_failed = self.failed | |
old_downloaded = self.downloaded | |
old_already_downloaded = self.already_downloaded | |
time.sleep(SLEEP_TIME) | |
def start(self): | |
if self.info_thread: | |
t = Thread(target=self.requests_information_thread_logger) | |
t.daemon = True | |
t.start() | |
# This json contains province, department and circuit data. | |
print("[x] Fetching initial data") | |
response = self.handle_request('https://resultados2019.gob.ar/assets/data/regions.json') | |
self.initial_json = response.json() | |
self.create_result_dir() | |
self._start() | |
def create_result_dir(self): | |
logger.info("Creating results dir...") | |
if os.path.exists(self.result_name): | |
response = None | |
while not response: | |
print("A folder named {} already exists, want to remove it ? (Y/n)".format(self.result_name), end=' ') | |
response = input() | |
if response[0].lower() == 'y': | |
shutil.rmtree(self.result_name) | |
os.makedirs(self.result_name, exist_ok=True) | |
os.chdir(self.result_name) | |
def _start(self): | |
if not self.flat: | |
self.create_directory_structure() | |
print("[x] Requesting regions...") | |
self.get_regions() | |
def create_directory_structure(self): | |
# Create province, department and circuit folders. | |
logger.info("Creating directory structure") | |
for level in range(self.STARTING_LEVEL, self.LAST_LEVEL + 1): | |
self.create_directories([x for x in self.initial_json if x['l'] == level], level) | |
def create_directories(self, iterable, level): | |
for node in iterable: | |
os.chdir(self.get_folder_from_cc(node['cc'], level)) | |
name = self.clean(node['n'].capitalize()) | |
self.cc_to_folder[node['cc']] = os.path.join(os.getcwd(), name) | |
os.makedirs(name, exist_ok=True) | |
def get_folder_from_cc(self, cc, level): | |
if self.flat: | |
return self.results_dir | |
bits = cc[:self.LEVELS_TO_CC[level - 2]] | |
if bits: | |
return os.path.join(self.results_dir, self.cc_to_folder[bits]) | |
return '.' | |
def get_regions(self): | |
# In order to get the regions we need to gather info from circuits | |
for i in range(self.THREADS): | |
t = Thread(target=self.circuit_worker) | |
t.daemon = True | |
t.start() | |
t = Thread(target=self.telegram_worker) | |
t.daemon = True | |
t.start() | |
try: | |
for x in self.initial_json: | |
if x['l'] == self.CIRCUIT: | |
tuple_ = (x, 0) | |
self.region_queue.put(tuple_) | |
self.region_queue.join() | |
self.telegram_queue.join() | |
except KeyboardInterrupt: | |
sys.exit(1) | |
def circuit_worker(self): | |
while True: | |
circuit, retries = self.region_queue.get() | |
try: | |
circuit_id = circuit['cc'] | |
# Each of this is the site ID, each site has many telegrams ID. | |
sites = circuit['chd'] | |
for site in sites: | |
pre_cc = site[:-3] or '0' | |
url = "precincts/{}/s{}.json".format(pre_cc, site) | |
self.parse_site(self.sites_base_url.format(url), circuit_id) | |
self.region_queue.task_done() | |
except Exception: | |
logger.debug("Thread exception ocurred with argument {}".format(circuit), exc_info=1) | |
# Put item back into the queue for retrying | |
if retries <= 10: | |
retries += 1 | |
self.region_queue.put((circuit, retries)) | |
else: | |
logger.error("Ignoring {} failed to many times.".format(circuit)) | |
def parse_site(self, url, circuit_cc): | |
response = self.handle_request(url) | |
if response.status_code != 200: | |
if self.info_thread: | |
self.failed_lock.acquire() | |
self.failed += 1 | |
self.failed_lock.release() | |
logger.error("{} returned {}".format(response.url, response.status_code)) | |
else: | |
response_json = response.json() | |
for obj in response_json: | |
try: | |
self.telegram_queue.put((obj['cc'], circuit_cc, 0)) # This is the image ID. | |
except KeyboardInterrupt: | |
sys.exit(1) | |
def telegram_worker(self): | |
while True: | |
image, circuit_cc, retries = self.telegram_queue.get() | |
try: | |
for i in itertools.count(1): | |
image_folder, image_path = self.create_image_path(image, circuit_cc, i) | |
if self.should_download_image(image_path): | |
logger.debug('Saving image {} in {}'.format(image, image_folder)) | |
response = self.handle_request(self.image_base_url.format(image, i), stream=True) | |
# If we recieve 403, then the image doesn't exist and we stop searching for | |
# higher numbers. | |
if response.status_code == 403: | |
break | |
if response.status_code == 200: | |
with open(image_path, 'wb') as f: | |
for chunk in response.iter_content(1024): | |
f.write(chunk) | |
if self.info_thread: | |
self.downloaded_lock.acquire() | |
self.downloaded += 1 | |
self.downloaded_lock.release() | |
else: | |
logger.error("There was an error downloading the image {}".format(image)) | |
elif self.info_thread: | |
self.already_downloaded_lock.acquire() | |
self.already_downloaded += 1 | |
self.already_downloaded_lock.release() | |
self.telegram_queue.task_done() | |
except Exception: | |
logger.debug("Thread exception ocurred with argument {}".format(image), exc_info=1) | |
if retries <= 10: | |
# Put item back into the queue for retrying | |
retries += 1 | |
self.telegram_queue.put((image, circuit_cc, retries)) | |
else: | |
logger.error("Ignoring {} failed to many times.".format(image)) | |
def create_image_path(self, image, circuit_cc, index): | |
if self.flat: | |
folder = self.results_dir | |
path = os.path.join(folder, '{}-{}.png'.format(image, index)) | |
return folder, path | |
folder = os.path.join(self.cc_to_folder[circuit_cc], image) | |
path = os.path.join(folder, "{}.png".format(index)) | |
os.makedirs(folder, exist_ok=True) | |
return folder, path | |
def should_download_image(self, image_path): | |
try: | |
file_size = os.path.getsize(image_path) | |
except FileNotFoundError: | |
file_size = 0 | |
return file_size < self.CORRUPT_FILE_SIZE_THRESHOLD | |
@staticmethod | |
def handle_request(url, **kwargs): | |
# Retry request 5 times if blocked, sleeping in the meantime | |
logger.debug("Requesting url: {}".format(url)) | |
for _ in range(MAX_RETRIES): | |
try: | |
response = requests.get(url, **kwargs) | |
except requests.ConnectionError: | |
time.sleep(2) | |
continue | |
except Exception: | |
logger.debug("Exception ocurred while requesting url {}".format(url), exc_info=1) | |
if response.status_code == 200: | |
return response | |
if response.status_code == 403 and _client_blocked(): | |
# Blocked by Cloudflare, sleep for 4 minutes before retrying. | |
logger.info("Request {} blocked, sleeping and retrying...".format(url)) | |
time.sleep(60*4) | |
continue | |
return response | |
# What to return here ? | |
raise requests.ConnectionError | |
@staticmethod | |
def clean(name): | |
return name.replace('/', '-') | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Elecciones PASO 2019 Scrapper.') | |
parser.add_argument('--threads', type=int, default=10, | |
help='Number of threads.') | |
parser.add_argument('--cft', type=int, default=150000, dest='corrupt_file_threshold', | |
help='Corrupt File Threshold. If an image file is smaller than this\ | |
it will be redownloaded.') | |
parser.add_argument('--output', '-o', default='results', | |
help='Output folder.') | |
parser.add_argument('--info', action='store_true', dest='info_thread', | |
help='Periodically show requests information.') | |
parser.add_argument('--flat', '-f', action='store_true', | |
help='Save all the images in the result folder, no subfolders.') | |
parser.add_argument('--verbose', '-v', action='count', default=1, | |
help='Verbosity level.') | |
args = parser.parse_args() | |
# Logger configuration | |
args = vars(args) | |
verbosity = args.pop('verbose') | |
if verbosity >= 3: | |
logger.setLevel(logging.DEBUG) | |
elif verbosity >= 2: | |
logger.setLevel(logging.INFO) | |
PasoScrapper(**args).start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment