Skip to content

Instantly share code, notes, and snippets.

@levivm
Created July 20, 2018 04:24
Show Gist options
  • Save levivm/5f103dde2c12346a1626a5632134f1fd to your computer and use it in GitHub Desktop.
Save levivm/5f103dde2c12346a1626a5632134f1fd to your computer and use it in GitHub Desktop.
import sys
import os
import time
import boto3
import psycopg2
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class LoanScraper():
NO_PROCESSED_FILE_LOAN_STATUS = 0
UPLOADED_LOAN_FILE_STATUS = 1
CONFIRMED_LOAN_STATUS = 2
MOVED_LOAN_FILE_STATUS = 3
DEST_PATH = "./"
def __init__(self, username=None, password=None):
options = webdriver.ChromeOptions()
download_path = "/tmp"
prefs = {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": False
}
options.add_experimental_option('prefs', prefs)
self.bucket_name = os.environ.get(
'BUCKET_NAME',
'scotia-alpha-mo-data-transaction'
)
self.bim_url = os.environ.get(
'BIM_URL',
'190.216.114.103'
)
self.username = username
self.password = password
self.driver = webdriver.Remote(
command_executor="http://hub:4444/wd/hub",
desired_capabilities=options.to_capabilities()
)
self.driver.implicitly_wait(30)
self.driver.set_window_position(0, 0)
self.driver.set_window_size(1920, 1080)
self.db_connection = self._get_db_connection()
def process_loan_files(self):
self.login()
pending_loan_files = self.get_all_files_from_status(self.NO_PROCESSED_FILE_LOAN_STATUS)
if not pending_loan_files:
print("Nothing to process")
for loan_file in pending_loan_files:
complete_file_name = loan_file.get('output_file_name')
print("Starting to process file {}".format(complete_file_name))
file_name, extension = complete_file_name.split(".")
s3_file_key = "{}{}".format(
'SALIDA/PENDING/',
complete_file_name
)
self.download_file_from_s3(
s3_file_key,
bucket_name=self.bucket_name
)
self.process_loan_file_name(file_name)
def _get_db_connection(self):
try:
conn = psycopg2.connect(
database=os.environ.get('DATABASE_NAME', 'mydatabase'),
user=os.environ.get('DATABASE_USER', 'mydatabaseuser'),
host=os.environ.get('DATABASE_HOST', '127.0.0.1'),
password=os.environ.get('DATABASE_PASSWORD', 'mypassword'),
)
return conn
except Exception as e:
print("Uh oh, can't connect. Invalid dbname, user or password?", e)
def _parse_records(self, result, cursor):
# Get query fields
fields = list(map(
lambda x: x[0],
cursor.description
))
data = [
dict(
zip(
fields,
row
)
) for row in result
]
return data
def _generic_where_query(self, select_query, from_query, where_query, values=None, extra=None):
values = {} if values is None else values
# if self.db_connection:
# self.db_connection.close()
# Open cursor
with self.db_connection.cursor() as cursor:
try:
# Generic query made up using table, lookup field and value
query = """SELECT %s FROM %s""" % (
select_query,
from_query
)
query += """ WHERE {}""".format(where_query,)
query += """ {} """.format(extra,) if extra else ''
# Execute select query
cursor.execute(
query,
values
)
result = cursor.fetchall()
return self._parse_records(
result,
cursor
) if result else None
except Exception as error:
print(cursor._last_executed)
print(error)
return
def update_file_name_status(self, file_name, status):
status_time_attribute_map = {
1: 'uploaded_at',
2: 'confirmed_at',
3: 'moved_at',
4: 'downloaded_at'
}
time_attribute_to_update = status_time_attribute_map.get(status)
values = {
'file_name': file_name,
'now': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'status': status
}
with self.db_connection.cursor() as cursor:
try:
# Generic query made up using table, lookup field and value
query = """UPDATE %s""" % (
'partner_loanfileprocess'
)
query += " SET status=%(status)s, {}=DATE(%(now)s) ".format(
time_attribute_to_update
)
query += "WHERE output_file_name=%(file_name)s;"
# Execute select query
cursor.execute(
query,
values
)
self.db_connection.commit()
result = cursor.rowcount
if result:
print(
"Filename {} was updated to status {}".format(
file_name, status
)
)
return True if result else None
except Exception as error:
print(error)
return
def get_all_files_from_status(self, status):
select_query = "*"
from_query = 'partner_loanfileprocess'
values = {
'status': status
}
where_query = ' status = %(status)s '
file_names = self._generic_where_query(
select_query,
from_query,
where_query,
values
)
print(file_names)
return file_names
def get_s3_connection(self):
return boto3.resource('s3')
def move_file_in_s3(self, from_prefix, file_name, to_prefix, bucket_name=None):
try:
s3 = self.get_s3_connection()
bucket_name = bucket_name
bucket = s3.Bucket(bucket_name)
key = "{}{}".format(
from_prefix,
file_name
)
copy_source = {
'Bucket': bucket_name,
'Key': key
}
new_key = key.replace(
from_prefix,
to_prefix
)
bucket.copy(
copy_source,
new_key
)
bucket.delete_objects(
Delete={
'Objects': [
{
'Key': key
},
]
}
)
return True
except Exception as e:
print(
"File {} could not be moved in s3, reason: {}".format(
file_name,
e
)
)
return False
def download_file_from_s3(self, key, bucket_name=None):
s3 = self.get_s3_connection()
bucket_name = bucket_name
if not bucket_name:
print(' - {0} couldn\'t be find bucket '
'(No bucket name)'.format(bucket_name))
return None
bucket = s3.Bucket(bucket_name)
file_name = key.split('/').pop()
dest_path = '{0}{1}'.format(
self.DEST_PATH,
file_name
)
print("File {} downloaded from s3 {}".format(
key,
self.bucket_name
))
bucket.download_file(key, dest_path)
return dest_path
def login(self):
# self.driver.get("https://190.216.114.103/partner")
self.driver.get("https://{}/partner".format(self.bim_url))
self.driver.find_element_by_xpath("//*[contains(@id,'#USERNAME')]").send_keys(self.username)
password_element = self.driver.find_element_by_xpath("//*[contains(@id,'#PASSWORD')]")
password_element.send_keys(self.password)
password_element.send_keys(Keys.RETURN)
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//*[contains(@id,'#VIEW_ACCOUNTHOLDER')]")
)
)
def get_element_by_content_and_type(self, element_content, type='text()'):
elements_found = self.driver.find_elements_by_xpath(
"//*[contains({}, '{}')]".format(
type,
element_content
)
)
if not elements_found:
return
return elements_found.pop()
def process_loan_file_name(self, file_name):
self.hover_on_an_element_by_content('Financiero')
self.hover_on_an_element_by_content('Transferencias')
batch_transfer_button = self.get_element_by_content_and_type('Transferencias por lote')
batch_transfer_button.click()
complete_file_name = "{}.csv".format(
file_name
)
complete_file_path = "{}{}.csv".format(
self.DEST_PATH,
file_name
)
try:
uploaded = self.upload_loan_file(complete_file_path)
if not uploaded:
return
self.update_file_name_status(
complete_file_name,
self.UPLOADED_LOAN_FILE_STATUS
)
except Exception as e:
print("Could not upload file {} to bim due to {}".format(
file_name,
e
))
return
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH',
'@id'
)
last_loan_uploaded = self.search_last_uploaded_loan_by_file_name(
file_name
)
if last_loan_uploaded:
self.approve_last_uploaded_loan()
self.update_file_name_status(
complete_file_name,
self.CONFIRMED_LOAN_STATUS
)
if not last_loan_uploaded:
print("Could not get last loan uploaded info from table".format(
file_name,
))
return
last_loan_proccesed = self.wait_until_last_uploaded_loan_is_approved(file_name)
time.sleep(5)
if last_loan_proccesed:
self.download_file_from_last_uploaded_loan()
print(
"File {} downloaded".format(
file_name
)
)
file_moved = self.move_file_in_s3(
'SALIDA/PENDING/',
complete_file_name,
'SALIDA/DONE/',
bucket_name=self.bucket_name
)
if file_moved:
self.update_file_name_status(
complete_file_name,
self.MOVED_LOAN_FILE_STATUS
)
def hover_on_an_element_by_content(self, element_content):
element_to_hover_over = self.get_element_by_content_and_type(element_content)
hover = ActionChains(self.driver).move_to_element(element_to_hover_over)
hover.perform()
def click_on_js_element_by_content_and_type(self, element_content, type="text()"):
element_to_click = self.get_element_by_content_and_type(
element_content,
type
)
hover = ActionChains(self.driver).move_to_element(element_to_click)
hover.click(element_to_click)
hover.perform()
def wait_for_element_by_content_and_type(self, element_content, type="text()"):
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(
(
By.XPATH,
"//*[contains({},'{}')]".format(
type,
element_content
)
)
)
)
def load_loan_table(self):
loan_table_loaded = False
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_LINK_ADVANCED_SEARCH',
'@id'
)
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_FIELD_START_DATE',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_BUTTON_SEARCH',
'@id'
)
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_TABLE',
'@id'
)
time.sleep(2)
loan_rows = []
while not loan_table_loaded:
table_body = self.get_element_by_content_and_type(
'v-table-body',
'@class'
)
loan_rows = table_body.find_elements_by_xpath('//tr')
loan_table_loaded = True if len(loan_rows) > 2 else False
time.sleep(1)
return loan_rows
def search_last_uploaded_loan_by_file_name(self, loan_file_name):
FILE_NAME_COLUMN_INDEX = 3
attempts_left = 5
while attempts_left:
loan_rows = self.load_loan_table()
for row in loan_rows:
last_loan_in_table = row
last_loan_in_table_attributes = last_loan_in_table.find_elements_by_tag_name('td')
last_loan_in_table_file_name_div = last_loan_in_table_attributes[
FILE_NAME_COLUMN_INDEX
]
last_loan_in_table_file_name_html = last_loan_in_table_file_name_div.get_attribute(
'innerHTML'
)
if loan_file_name in last_loan_in_table_file_name_html:
return last_loan_in_table_attributes
attempts_left -= 1
def approve_last_uploaded_loan(self):
try:
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.approve',
'@id'
)
except AttributeError:
print(
"File {} can not be approved".format(
self.file_name
)
)
sys.exit()
self.wait_for_element_by_content_and_type(
'DIALOG_CONFIRM_BUTTON_YES',
'@id'
)
self.click_on_js_element_by_content_and_type(
'DIALOG_CONFIRM_BUTTON_YES',
'@id'
)
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_BUTTON_SEARCH',
'@id'
)
time.sleep(2)
def wait_until_last_uploaded_loan_is_approved(self, file_name):
LOAN_STATUS_COLUMN_INDEX = 2
STATUS_COMPLETED = 'Completado'
TIME_TO_WAIT_BT_ATTEMPS = 5
attemps = 8
last_uploaded_loan_completed = False
time.sleep(10)
print(
"Sleeping befor searching last uploaded file {}".format(
file_name
)
)
while attemps:
print(
"Attemps {} to search last uploaded file {}".format(
attemps,
file_name
)
)
last_loan_in_table_attributes = self.search_last_uploaded_loan_by_file_name(
file_name
)
last_loan_in_table_status_div = last_loan_in_table_attributes[
LOAN_STATUS_COLUMN_INDEX
]
last_loan_in_table_status_html = last_loan_in_table_status_div.get_attribute(
'innerHTML'
)
if STATUS_COMPLETED in last_loan_in_table_status_html:
last_uploaded_loan_completed = True
return last_uploaded_loan_completed
attemps -= 1
time.sleep(TIME_TO_WAIT_BT_ATTEMPS)
return last_uploaded_loan_completed
def upload_loan_file(self, file_path):
print(
"Uploading file {}".format(file_path)
)
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_BUTTON_UPLOAD',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_BUTTON_UPLOAD',
'@id'
)
self.wait_for_element_by_content_and_type(
'Seleccione el archivo.',
'text()'
)
file_input = self.get_element_by_content_and_type(
'gwt-FileUpload',
'@class'
)
file_input.send_keys(file_path)
time.sleep(5)
self.click_on_js_element_by_content_and_type(
'VIEW_WIZARD_BUTTON_NEXT',
'@id'
)
self.wait_for_element_by_content_and_type(
'VIEW_WIZARD_BUTTON_NEXT',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_WIZARD_BUTTON_NEXT',
'@id'
)
error = self.get_element_by_content_and_type(
'DIALOG_ERROR',
'@id'
)
if error:
self.wait_for_element_by_content_and_type(
'ACTION_CLOSE',
'@id'
)
self.click_on_js_element_by_content_and_type(
'ACTION_CLOSE',
'@id'
)
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_WIZARD_window_close',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_WIZARD_window_close',
'@id'
)
print(
"There was an error uploading file {}".format(
file_path
)
)
return
print(
"File uploaded successful {}".format(
file_path
)
)
return True
def download_file_from_last_uploaded_loan(self):
self.wait_for_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.id',
'@id'
)
self.click_on_js_element_by_content_and_type(
'VIEW_BATCH_TRANSFER_TABLE_0_batch.transactions.id',
'@id'
)
scraper = LoanScraper(
username=os.environ.get(
'BIM_USERNAME',
'TEST'
),
password=os.environ.get(
'BIM_PASSWORD',
'TEST(F'
)
)
scraper.process_loan_files()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment