Created
November 9, 2017 16:01
-
-
Save vaibhavmule/e650a83b125ca8e006f0e0e1f0bb1aec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Develop an ETL system that will Extract data from a CSV file, | |
apply some transformation and output the transformed | |
data to another CSV. | |
ETL stands for Extract Transform and Load - this system should be divided into these modules: | |
Extract module - should be able to retrieve data from a CSV file | |
Transform module - should accept input stream from Extract module and | |
for each column adds an additional column | |
indicating the data type of the column | |
Load module - should load this transformed data to another CSV file | |
Expected output CSV: | |
Name, Name datatype, Age, Age datatype, Join Date, Join Date datatype, Bank Balance, | |
Bank Balance datatype | |
Ramesh, str, 32, int, 2017-07-21, date, 26790.60, float | |
Ashok, str, 37, int, 2017-01-19, date, 132870.48, float | |
""" | |
import csv | |
import datetime | |
def extract(filename): | |
if '.csv' in filename: | |
with open(filename) as f: | |
reader = csv.reader(f) | |
for row in reader: | |
yield row | |
else: | |
raise Exception('Please use correct file!') | |
def transform(extracted_data): | |
first_row = [] | |
for column in next(extracted_data): | |
first_row.append(column.strip()) | |
first_row.append( | |
'{} datatype'.format(column.strip())) | |
yield first_row | |
row_to_yield = [] | |
for row in extracted_data: | |
for column in row: | |
striped_column = column.strip() | |
row_to_yield.append(striped_column) | |
row_to_yield.append(get_type(striped_column)) | |
yield row_to_yield | |
row_to_yield = [] | |
def get_type(column): | |
if is_integer(column): | |
return 'int' | |
elif is_float(column): | |
return 'float' | |
elif is_date(column): | |
return 'date' | |
else: | |
return 'str' | |
def is_integer(s): | |
try: | |
return int(s) | |
except ValueError: | |
return False | |
def is_float(s): | |
try: | |
return float(s) | |
except ValueError: | |
return False | |
def is_date(s): | |
try: | |
splited = s.split('-') | |
return datetime.date( | |
year=int(splited[0]), | |
month=int(splited[1]), | |
day=int(splited[2])) | |
except Exception: | |
return False | |
def load_to_csv(transformed_data, out_filename): | |
with open(out_filename, 'w+') as f: | |
writer = csv.writer(f) | |
for data in transformed_data: | |
writer.writerow(data) | |
def main(): | |
while True: | |
filename = input('Enter file name: ') | |
if '.csv' in filename: | |
break | |
else: | |
print('Please correct extension of the file') | |
while True: | |
output_filename = input('Enter output file name: ') | |
if '.csv' in output_filename: | |
break | |
else: | |
print('Please correct extension of the file') | |
extracted_data = extract(filename) | |
transformed_data = transform(extracted_data) | |
load_to_csv(transformed_data, output_filename) | |
print("Exracted and transformed data to", output_filename) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment