Created
July 20, 2018 04:24
-
-
Save levivm/5f103dde2c12346a1626a5632134f1fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import time | |
import boto3 | |
import psycopg2 | |
from datetime import datetime | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.common.action_chains import ActionChains | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
class LoanScraper(): | |
NO_PROCESSED_FILE_LOAN_STATUS = 0 | |
UPLOADED_LOAN_FILE_STATUS = 1 | |
CONFIRMED_LOAN_STATUS = 2 | |
MOVED_LOAN_FILE_STATUS = 3 | |
DEST_PATH = "./" | |
def __init__(self, username=None, password=None): | |
options = webdriver.ChromeOptions() | |
download_path = "/tmp" | |
prefs = { | |
"download.default_directory": download_path, | |
"download.prompt_for_download": False, | |
"download.directory_upgrade": True, | |
"safebrowsing.enabled": False | |
} | |
options.add_experimental_option('prefs', prefs) | |
self.bucket_name = os.environ.get( | |
'BUCKET_NAME', | |
'scotia-alpha-mo-data-transaction' | |
) | |
self.bim_url = os.environ.get( | |
'BIM_URL', | |
'190.216.114.103' | |
) | |
self.username = username | |
self.password = password | |
self.driver = webdriver.Remote( | |
command_executor="http://hub:4444/wd/hub", | |
desired_capabilities=options.to_capabilities() | |
) | |
self.driver.implicitly_wait(30) | |
self.driver.set_window_position(0, 0) | |
self.driver.set_window_size(1920, 1080) | |
self.db_connection = self._get_db_connection() | |
def process_loan_files(self): | |
self.login() | |
pending_loan_files = self.get_all_files_from_status(self.NO_PROCESSED_FILE_LOAN_STATUS) | |
if not pending_loan_files: | |
print("Nothing to process") | |
for loan_file in pending_loan_files: | |
complete_file_name = loan_file.get('output_file_name') | |
print("Starting to process file {}".format(complete_file_name)) | |
file_name, extension = complete_file_name.split(".") | |
s3_file_key = "{}{}".format( | |
'SALIDA/PENDING/', | |
complete_file_name | |
) | |
self.download_file_from_s3( | |
s3_file_key, | |
bucket_name=self.bucket_name | |
) | |
self.process_loan_file_name(file_name) | |
def _get_db_connection(self): | |
try: | |
conn = psycopg2.connect( | |
database=os.environ.get('DATABASE_NAME', 'mydatabase'), | |
user=os.environ.get('DATABASE_USER', 'mydatabaseuser'), | |
host=os.environ.get('DATABASE_HOST', '127.0.0.1'), | |
password=os.environ.get('DATABASE_PASSWORD', 'mypassword'), | |
) | |
return conn | |
except Exception as e: | |
print("Uh oh, can't connect. Invalid dbname, user or password?", e) | |
def _parse_records(self, result, cursor): | |
# Get query fields | |
fields = list(map( | |
lambda x: x[0], | |
cursor.description | |
)) | |
data = [ | |
dict( | |
zip( | |
fields, | |
row | |
) | |
) for row in result | |
] | |
return data | |
def _generic_where_query(self, select_query, from_query, where_query, values=None, extra=None): | |
values = {} if values is None else values | |
# if self.db_connection: | |
# self.db_connection.close() | |
# Open cursor | |
with self.db_connection.cursor() as cursor: | |
try: | |
# Generic query made up using table, lookup field and value | |
query = """SELECT %s FROM %s""" % ( | |
select_query, | |
from_query | |
) | |
query += """ WHERE {}""".format(where_query,) | |
query += """ {} """.format(extra,) if extra else '' | |
# Execute select query | |
cursor.execute( | |
query, | |
values | |
) | |
result = cursor.fetchall() | |
return self._parse_records( | |
result, | |
cursor | |
) if result else None | |
except Exception as error: | |
print(cursor._last_executed) | |
print(error) | |
return | |
def update_file_name_status(self, file_name, status): | |
status_time_attribute_map = { | |
1: 'uploaded_at', | |
2: 'confirmed_at', | |
3: 'moved_at', | |
4: 'downloaded_at' | |
} | |
time_attribute_to_update = status_time_attribute_map.get(status) | |
values = { | |
'file_name': file_name, | |
'now': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'status': status | |
} | |
with self.db_connection.cursor() as cursor: | |
try: | |
# Generic query made up using table, lookup field and value | |
query = """UPDATE %s""" % ( | |
'partner_loanfileprocess' | |
) | |
query += " SET status=%(status)s, {}=DATE(%(now)s) ".format( | |
time_attribute_to_update | |
) | |
query += "WHERE output_file_name=%(file_name)s;" | |
# Execute select query | |
cursor.execute( | |
query, | |
values | |
) | |
self.db_connection.commit() | |
result = cursor.rowcount | |
if result: | |
print( | |
"Filename {} was updated to status {}".format( | |
file_name, status | |
) | |
) | |
return True if result else None | |
except Exception as error: | |
print(error) | |
return | |
def get_all_files_from_status(self, status): | |
select_query = "*" | |
from_query = 'partner_loanfileprocess' | |
values = { | |
'status': status | |
} | |
where_query = ' status = %(status)s ' | |
file_names = self._generic_where_query( | |
select_query, | |
from_query, | |
where_query, | |
values | |
) | |
print(file_names) | |
return file_names | |
def get_s3_connection(self): | |
return boto3.resource('s3') | |
def move_file_in_s3(self, from_prefix, file_name, to_prefix, bucket_name=None): | |
try: | |
s3 = self.get_s3_connection() | |
bucket_name = bucket_name | |
bucket = s3.Bucket(bucket_name) | |
key = "{}{}".format( | |
from_prefix, | |
file_name | |
) | |
copy_source = { | |
'Bucket': bucket_name, | |
'Key': key | |
} | |
new_key = key.replace( | |
from_prefix, | |
to_prefix | |
) | |
bucket.copy( | |
copy_source, | |
new_key | |
) | |
bucket.delete_objects( | |
Delete={ | |
'Objects': [ | |
{ | |
'Key': key | |
}, | |
] | |
} | |
) | |
return True | |
except Exception as e: | |
print( | |
"File {} could not be moved in s3, reason: {}".format( | |
file_name, | |
e | |
) | |
) | |
return False | |
def download_file_from_s3(self, key, bucket_name=None): | |
s3 = self.get_s3_connection() | |
bucket_name = bucket_name | |
if not bucket_name: | |
print(' - {0} couldn\'t be find bucket ' | |
'(No bucket name)'.format(bucket_name)) | |
return None | |
bucket = s3.Bucket(bucket_name) | |
file_name = key.split('/').pop() | |
dest_path = '{0}{1}'.format( | |
self.DEST_PATH, | |
file_name | |
) | |
print("File {} downloaded from s3 {}".format( | |
key, | |
self.bucket_name | |
)) | |
bucket.download_file(key, dest_path) | |
return dest_path | |
def login(self): | |
# self.driver.get("https://190.216.114.103/partner") | |
self.driver.get("https://{}/partner".format(self.bim_url)) | |
self.driver.find_element_by_xpath("//*[contains(@id,'#USERNAME')]").send_keys(self.username) | |
password_element = self.driver.find_element_by_xpath("//*[contains(@id,'#PASSWORD')]") | |
password_element.send_keys(self.password) | |
password_element.send_keys(Keys.RETURN) | |
WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located( | |
(By.XPATH, "//*[contains(@id,'#VIEW_ACCOUNTHOLDER')]") | |
) | |
) | |
def get_element_by_content_and_type(self, element_content, type='text()'): | |
elements_found = self.driver.find_elements_by_xpath( | |
"//*[contains({}, '{}')]".format( | |
type, | |
element_content | |
) | |
) | |
if not elements_found: | |
return | |
return elements_found.pop() | |
def process_loan_file_name(self, file_name): | |
self.hover_on_an_element_by_content('Financiero') | |
self.hover_on_an_element_by_content('Transferencias') | |
batch_transfer_button = self.get_element_by_content_and_type('Transferencias por lote') | |
batch_transfer_button.click() | |
complete_file_name = "{}.csv".format( | |
file_name | |
) | |
complete_file_path = "{}{}.csv".format( | |
self.DEST_PATH, | |
file_name | |
) | |
try: | |
uploaded = self.upload_loan_file(complete_file_path) | |
if not uploaded: | |
return | |
self.update_file_name_status( | |
complete_file_name, | |
self.UPLOADED_LOAN_FILE_STATUS | |
) | |
except Exception as e: | |
print("Could not upload file {} to bim due to {}".format( | |
file_name, | |
e | |
)) | |
return | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH', | |
'@id' | |
) | |
last_loan_uploaded = self.search_last_uploaded_loan_by_file_name( | |
file_name | |
) | |
if last_loan_uploaded: | |
self.approve_last_uploaded_loan() | |
self.update_file_name_status( | |
complete_file_name, | |
self.CONFIRMED_LOAN_STATUS | |
) | |
if not last_loan_uploaded: | |
print("Could not get last loan uploaded info from table".format( | |
file_name, | |
)) | |
return | |
last_loan_proccesed = self.wait_until_last_uploaded_loan_is_approved(file_name) | |
time.sleep(5) | |
if last_loan_proccesed: | |
self.download_file_from_last_uploaded_loan() | |
print( | |
"File {} downloaded".format( | |
file_name | |
) | |
) | |
file_moved = self.move_file_in_s3( | |
'SALIDA/PENDING/', | |
complete_file_name, | |
'SALIDA/DONE/', | |
bucket_name=self.bucket_name | |
) | |
if file_moved: | |
self.update_file_name_status( | |
complete_file_name, | |
self.MOVED_LOAN_FILE_STATUS | |
) | |
def hover_on_an_element_by_content(self, element_content): | |
element_to_hover_over = self.get_element_by_content_and_type(element_content) | |
hover = ActionChains(self.driver).move_to_element(element_to_hover_over) | |
hover.perform() | |
def click_on_js_element_by_content_and_type(self, element_content, type="text()"): | |
element_to_click = self.get_element_by_content_and_type( | |
element_content, | |
type | |
) | |
hover = ActionChains(self.driver).move_to_element(element_to_click) | |
hover.click(element_to_click) | |
hover.perform() | |
def wait_for_element_by_content_and_type(self, element_content, type="text()"): | |
WebDriverWait(self.driver, 5).until( | |
EC.presence_of_element_located( | |
( | |
By.XPATH, | |
"//*[contains({},'{}')]".format( | |
type, | |
element_content | |
) | |
) | |
) | |
) | |
def load_loan_table(self): | |
loan_table_loaded = False | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_FIELD_START_DATE', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_BUTTON_SEARCH', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_TABLE', | |
'@id' | |
) | |
time.sleep(2) | |
loan_rows = [] | |
while not loan_table_loaded: | |
table_body = self.get_element_by_content_and_type( | |
'v-table-body', | |
'@class' | |
) | |
loan_rows = table_body.find_elements_by_xpath('//tr') | |
loan_table_loaded = True if len(loan_rows) > 2 else False | |
time.sleep(1) | |
return loan_rows | |
def search_last_uploaded_loan_by_file_name(self, loan_file_name): | |
FILE_NAME_COLUMN_INDEX = 3 | |
attempts_left = 5 | |
while attempts_left: | |
loan_rows = self.load_loan_table() | |
for row in loan_rows: | |
last_loan_in_table = row | |
last_loan_in_table_attributes = last_loan_in_table.find_elements_by_tag_name('td') | |
last_loan_in_table_file_name_div = last_loan_in_table_attributes[ | |
FILE_NAME_COLUMN_INDEX | |
] | |
last_loan_in_table_file_name_html = last_loan_in_table_file_name_div.get_attribute( | |
'innerHTML' | |
) | |
if loan_file_name in last_loan_in_table_file_name_html: | |
return last_loan_in_table_attributes | |
attempts_left -= 1 | |
def approve_last_uploaded_loan(self): | |
try: | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.approve', | |
'@id' | |
) | |
except AttributeError: | |
print( | |
"File {} can not be approved".format( | |
self.file_name | |
) | |
) | |
sys.exit() | |
self.wait_for_element_by_content_and_type( | |
'DIALOG_CONFIRM_BUTTON_YES', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'DIALOG_CONFIRM_BUTTON_YES', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_BUTTON_SEARCH', | |
'@id' | |
) | |
time.sleep(2) | |
def wait_until_last_uploaded_loan_is_approved(self, file_name): | |
LOAN_STATUS_COLUMN_INDEX = 2 | |
STATUS_COMPLETED = 'Completado' | |
TIME_TO_WAIT_BT_ATTEMPS = 5 | |
attemps = 8 | |
last_uploaded_loan_completed = False | |
time.sleep(10) | |
print( | |
"Sleeping befor searching last uploaded file {}".format( | |
file_name | |
) | |
) | |
while attemps: | |
print( | |
"Attemps {} to search last uploaded file {}".format( | |
attemps, | |
file_name | |
) | |
) | |
last_loan_in_table_attributes = self.search_last_uploaded_loan_by_file_name( | |
file_name | |
) | |
last_loan_in_table_status_div = last_loan_in_table_attributes[ | |
LOAN_STATUS_COLUMN_INDEX | |
] | |
last_loan_in_table_status_html = last_loan_in_table_status_div.get_attribute( | |
'innerHTML' | |
) | |
if STATUS_COMPLETED in last_loan_in_table_status_html: | |
last_uploaded_loan_completed = True | |
return last_uploaded_loan_completed | |
attemps -= 1 | |
time.sleep(TIME_TO_WAIT_BT_ATTEMPS) | |
return last_uploaded_loan_completed | |
def upload_loan_file(self, file_path): | |
print( | |
"Uploading file {}".format(file_path) | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_BUTTON_UPLOAD', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_BUTTON_UPLOAD', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'Seleccione el archivo.', | |
'text()' | |
) | |
file_input = self.get_element_by_content_and_type( | |
'gwt-FileUpload', | |
'@class' | |
) | |
file_input.send_keys(file_path) | |
time.sleep(5) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_WIZARD_BUTTON_NEXT', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_WIZARD_BUTTON_NEXT', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_WIZARD_BUTTON_NEXT', | |
'@id' | |
) | |
error = self.get_element_by_content_and_type( | |
'DIALOG_ERROR', | |
'@id' | |
) | |
if error: | |
self.wait_for_element_by_content_and_type( | |
'ACTION_CLOSE', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'ACTION_CLOSE', | |
'@id' | |
) | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_WIZARD_window_close', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_WIZARD_window_close', | |
'@id' | |
) | |
print( | |
"There was an error uploading file {}".format( | |
file_path | |
) | |
) | |
return | |
print( | |
"File uploaded successful {}".format( | |
file_path | |
) | |
) | |
return True | |
def download_file_from_last_uploaded_loan(self): | |
self.wait_for_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.id', | |
'@id' | |
) | |
self.click_on_js_element_by_content_and_type( | |
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.id', | |
'@id' | |
) | |
scraper = LoanScraper( | |
username=os.environ.get( | |
'BIM_USERNAME', | |
'TEST' | |
), | |
password=os.environ.get( | |
'BIM_PASSWORD', | |
'TEST(F' | |
) | |
) | |
scraper.process_loan_files() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment