Skip to content

Instantly share code, notes, and snippets.

@avinash-mishra
Forked from erincerys/mysql2cassandra.py
Created September 23, 2016 09:22
Show Gist options
  • Save avinash-mishra/da24e6252c8b8f8ccc4cc6a68644220b to your computer and use it in GitHub Desktop.
Save avinash-mishra/da24e6252c8b8f8ccc4cc6a68644220b to your computer and use it in GitHub Desktop.
Dumps a MySQL table and reformulates it into JSON to be ingested into a Cassandra table
#! /usr/bin/env python
#
# mysql2cassandra.py
# Dump a MySQL result set to file and then import into a Cassandra column family
#
# Configuration
# mysql_params [host, port, user, password, db] MySQL conenction parameters
# mysql_columns [colname, colname2, ...] Columns for building MySQL query
# The column that will hold values of the row key in the Cassandra column family must be first
# mysql_table 'tablename' Table name for use in MySQL query
# mysql_criteria 'WHERE col=val ...' Additional criteria when constructing MySQL query (without ending semicolon)
# cassandra_params [host, port, user, password] Cassandra connection parameters
# cassandra_model [keyspace, column_family, row_key] Cassandra keyspace/CF to insert into and which MySQL column name to use as the value of the row key
# output_file "/path/to/data.csv" Path in which to save the result set dump to
# validation [0 | 1] Disable or enable validation of data types before attempting to dump and import
# Prompts for specific changes will still be given
#
# debug [0 | 1] Disable or enable debug information in STDOUT
#
# Validation
# This step is recommended to be manually done, if necessary by a database administrator due to limitations in this script:
# An example being that TINYINT(1) will be added as IntegerType rather than BooleanType
# Attempts to modify or add metadata to the column family will take place if:
# a column in the MySQL query is not present in the Cassandra column family
# a column's data type in the MySQL table structure does not match the Cassandra column family
#
# Import process
# The first column in the MySQL query will be used as the row key for each insertion into the Cassandra column family
# If row values of columns in the MySQL resultset are empty, they will be omitted from the insert into the Cassandra column family
#
# Dependencies
# mysqldb
# pycassa
#
# Script configuration options
#
mysql_params = ['', 3306, '', '', '']
mysql_columns = ['id', 'server_ref', 'serverversion', 'systemtime', 'uptime', 'diskfreespace']
mysql_table = 'serverstatuses_history'
mysql_criteria = ''
cassandra_params = ['10.11.11.11', 9160, '', '']
cassandra_model = ['demo', 'serverstatuses_history', 'server_ref']
output_file = "./mysql.csv"
validation = 1
debug = 1
import sys, csv
from distutils.util import strtobool
import MySQLdb, pycassa
from MySQLdb import cursors
# MySQL to Cassandra datatype mappings
datatype_map = {
'bit': 'BooleanType',
'tinyint': 'IntegerType',
'smallint': 'IntegerType',
'int': 'IntegerType',
'mediumint': 'IntegerType',
'bigint': 'LongType',
'decimal': 'DecimalType',
'float': 'FloatType',
'double': 'DoubleType',
'char': 'UTF8Type',
'varchar': 'UTF8Type',
'text': 'UTF8Type',
'blob': 'BytesType',
'timestamp': 'DateType',
'datetime': 'DateType',
'date': 'DateType',
'time': 'UTF8Type',
'year': 'UTF8Type'
}
# Utility functions
def IsNotNull(value):
return value is not None and len(value) > 0
def process_choice():
while True:
try:
choice = strtobool(raw_input())
break
except ValueError:
sys.stdout.write("What was that? [y/n] ")
return choice
def construct_query(colnames, table, criteria):
q = 'SELECT '+(', '.join(colnames))+' FROM '+table
if IsNotNull(criteria):
q += ' '+criteria+';'
else:
q += ';'
return q
# MySQL connection initialization
def init_mysql(params):
try:
mcon = MySQLdb.connect(host=params[0], port=params[1], user=params[2], passwd=params[3], db=params[4], use_unicode=True, cursorclass = MySQLdb.cursors.SSCursor)
mcur = mcon.cursor()
except MySQLdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
sys.exit(1)
mcur.execute("SET NAMES 'utf8';")
mcur.execute("SET @@NET_WRITE_TIMEOUT = 900;")
return mcur, mcon
# Cassandra connection initialization
def init_cassandra(params, model):
host = params[0]+':'+str(params[1])
if IsNotNull(params[2]) and IsNotNull(params[3]):
credentials = {'username': params[2], 'password': params[3]}
else:
credentials = None
ccon = pycassa.pool.ConnectionPool(keyspace=model[0], server_list=[host], credentials=credentials)
ccur = pycassa.ColumnFamily(ccon,model[1])
csys = pycassa.system_manager.SystemManager(host)
return ccur, csys, ccon
# Cassandra - Column metadata alteration
def cf_alter_metadata(key, datatype):
if cassandra_datatypes.get(key) is False:
sys.stdout.write("[?] Add column '"+key+"' as data type '"+datatype+"'? [y/n] ")
else:
odatatype = cassandra_datatypes.get(key)
sys.stdout.write("[?] Alter column '"+key+"' from data type '"+odatatype+"' to '"+datatype+"'? [y/n] ")
choice = process_choice()
if choice:
csys.alter_column(cassandra_model[0], cassandra_model[1], key, datatype)
# cast a column value as the data type that matches its validator in the Cassandra column family
def cf_cast(cfmetadata, colname, value):
validator = cfmetadata[colname]
if validator == 'IntegerType':
value = int(value)
elif validator == 'FloatType' or validator == 'DoubleType':
value = float(value)
elif validator == 'BytesType':
q = "SELECT character_set_name FROM information_schema.columns WHERE table_schema = '"+mysql_params[4]+"' AND table_name = '"+mysql_table+"' AND column_name = '"+colname+"';"
mcur.execute(q)
charset = mcur.fetchone()
value = bytesarray(value, charset[0])
return value
# Write MySQL result set to CSV line by line
def mysql_dump(colnames, q, f, delim):
print "[!] Executing MySQL query"
mcur.execute(q)
file = open(f, 'w')
writer = csv.writer(file, delimiter = delim, quotechar = '"', quoting = csv.QUOTE_MINIMAL)
print "[!] Writing result set to file '"+output_file+"'"
while True:
# Get one record from result set
row = mcur.fetchone()
# If no records left, break loop
if not row:
break
# Write record to file in comma-delimited format
writer.writerow(row)
file.close()
# Read CSV file line by line and insert into Cassandra column family
def cassandra_import(colnames, f, delim):
rownum = 0
file = open(f, 'r')
reader = csv.reader(file, delimiter = delim, lineterminator = '\n')
print "[!] Reading file and inserting records into Cassandra"
# Read the CSV line by line
for row in reader:
coldat = dict()
# Get the number of columns, excluding the row key
colcount = len(colnames) - 1
# Iterate each column, adding the column name-value pair to a dictionary
for i in range(colcount):
i += 1
# Skip columns that are empty
if IsNotNull(row[i]):
# cast the type of the value for this column according to the column metadata
colvar = cf_cast(cassandra_datatypes, colnames[i], row[i])
coldat[colnames[i]] = colvar
#print row[0], colnames[i], row[i]
# Insert the data into the Cassandra column family
try:
ccur.insert(row[0],coldat)
if debug == 1:
print "[*] Inserted row with key "+row[0]
except:
print "[*] Failed to insert row with key "+row[0]
rownum += 1
file.close()
return rownum
mcur, mcon = init_mysql(mysql_params)
ccur, csys, ccon = init_cassandra(cassandra_params, cassandra_model)
# Data type validation
if validation:
print "[!] Beginning data type validation"
# Get the data types of the columns in the MySQL table
q = "SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '"+mysql_params[4]+"' AND table_name = '"+mysql_table+"' AND column_name IN ("+(', '.join('"{0}"'.format(col) for col in mysql_columns))+");"
mysql_datatypes = dict()
mcur.execute(q)
for row in mcur:
mysql_datatypes[row[0]] = row[1]
if len(mysql_datatypes) != len(mysql_columns):
print "[?] A number of given columns for MySQL table ("+mysql_table+") does not match information_schema result - you may have typo'd a column in the list"
sys.exit(1)
# Do the same for the Cassandra column family keys
cassandra_datatypes = dict()
cassandra_datatypes[mysql_columns[0]] = ccur.key_validation_class
for key, val in ccur.column_validators.items():
cassandra_datatypes[key] = val
# If number of MySQL columns given do not match what is defined in the Cassandra column family metadata, ask the user if the script should alter the column family
if len(cassandra_datatypes) != len(mysql_columns):
print "[~] A number of MySQL columns specified do not match the number of column's defined in the Cassandra column family ("+cassandra_model[1]+") metadata"
sys.stdout.write("[?] Determine the missing column metadata and modify the column family? (a prompt will be given before each addition) [y/n] ")
choice = process_choice()
if choice:
for mkey, mtype in mysql_datatypes.items():
if cassandra_datatypes.get(mkey) is False:
cf_alter_metadata(mkey, mtype)
# If the data types between MySQL and Cassandra columns do not match, ask the user if the script should modify their definitions
metadata_changes = dict()
for mkey, mtype in mysql_datatypes.items():
# Don't check the key row key data type
if mkey != mysql_columns[0]:
if datatype_map.get(mtype) != cassandra_datatypes.get(mkey):
metadata_changes[mkey] = datatype_map.get(mtype)
if len(metadata_changes) > 0:
print "[~] Some Cassandra column metadata have been determined to be an inappropriate match, given the MySQL columns' datatypes"
sys.stdout.write("[?] Review the Cassandra columns that are proposed to have their data types changed? [y/n] ")
choice = process_choice()
if choice:
for key, type in metadata_changes.items():
cf_alter_metadata(key, type)
mysql_query = construct_query(mysql_columns, mysql_table, mysql_criteria)
print "[~] Below is the MySQL query constructed from the given configuration:"
print mysql_query
sys.stdout.write("[?] Is this correct? [y/n] ")
choice = process_choice()
if choice is False:
sys.exit(1)
print "[!] Working... this may take awhile!\n"
# Stream MySQL result set to file
mysql_dump(mysql_columns, mysql_query, output_file, ',')
print "[!] Completed MySQL dump"
# Import dump into Cassandra row by row
insertions = cassandra_import(mysql_columns, output_file, ',')
print "[!] Completed Cassandra import ("+str(insertions)+" rows processed)"
ccon.dispose()
mcon.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment