Last active
November 7, 2016 11:15
-
-
Save lucperkins/f721fe5a9e40889eab8e to your computer and use it in GitHub Desktop.
Migrate SQL table to Riak
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
from riak import RiakClient, RiakObject | |
from riak.datatypes import Set | |
import datetime | |
# Riak connection and set | |
client = RiakClient(pb_port=8087) | |
SETS_BUCKET = client.bucket_type('sets').bucket('key_sets') | |
# Get columns for table | |
def get_table_columns(cursor, table): | |
cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table)) | |
columns = cursor.fetchall() | |
for col in columns: | |
yield col[0] | |
# Convert each row to a dict, excluding the primary key (which will be the Riak object's key) | |
def convert_row_to_dict(row, columns_list): | |
obj = {} | |
for n in range(1, len(columns_list)): | |
if type(row[n]) is datetime.date: | |
obj[columns_list[n]] = row[n].strftime('%m-%d-%Y') | |
else: | |
obj[columns_list[n]] = row[n] | |
return obj | |
# Store Postgres table in Riak by row | |
def store_table_in_riak(database, table): | |
cursor = psycopg2.connect("dbname={}".format(database)).cursor() | |
bucket = client.bucket(table) | |
key_set = Set(SETS_BUCKET, table) | |
cursor.execute("SELECT * FROM {}".format(table)) | |
rows = cursor.fetchall() | |
columns_list = list(get_table_columns(cursor, table)) | |
for row in rows: | |
key = str(row[0]) | |
user_dict = convert_row_to_dict(row, columns_list) | |
obj = RiakObject(client, bucket, key) | |
obj.data = user_dict | |
obj.content_type = 'application/json' | |
obj.store() | |
key_set.add(key) | |
key_set.store() | |
cursor.close() | |
def store_and_drop_table(database, table): | |
cursor = psycopg2.connect("dbname={}".format(database)).cursor() | |
bucket = client.bucket(table) | |
key_set = Set(SETS_BUCKET, table) | |
cursor.execute("SELECT * FROM {}".format(table)) | |
rows = cursor.fetchall() | |
columns_list = list(get_table_columns(table)) | |
for row in rows: | |
key = str(row[0]) | |
user_dict = convert_row_to_dict(row, columns_list) | |
obj = RiakObject(client, bucket, key) | |
obj.data = user_dict | |
obj.content_type = 'application/json' | |
obj.store() | |
key_set.add(key) | |
key_set.store() | |
cursor.execute("DROP TABLE {};".format(table)) | |
connection.commit() | |
def select_star_from_table(table): | |
bucket = client.bucket(table) | |
key_set = Set(SETS_BUCKET, table) | |
for key in key_set.reload().value: | |
yield bucket.get(key).data | |
def select_by_id(table, key): | |
bucket = client.bucket(table) | |
key_set = Set(SETS_BUCKET, table) | |
if not key in key_set.reload().value: | |
raise Exception('No object for this primary key') | |
else: | |
return bucket.get(key).data | |
def get_row_column_value(table, key, field): | |
bucket = client.bucket(table) | |
key_set = Set(SETS_BUCKET, table) | |
if not key in key_set.reload().value: | |
raise Exception | |
else: | |
obj = bucket.get(key).data | |
if field in obj: | |
return obj[field] | |
else: | |
raise Exception('Field does not exist for this object') | |
def migrate_multiple_tables(database, tables): | |
cursor = psycopg2.connect("dbname={}".format(database)).cursor() | |
for table in tables: | |
store_and_drop_table(table) | |
cursor.close() | |
# Run the main function and close our Postgres cursor | |
# store_table_in_riak('users') | |
store_table_in_riak('luc', 'posts') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE posts ( | |
id SERIAL PRIMARY KEY, | |
author VARCHAR(30) NOT NULL, | |
title VARCHAR(50) NOT NULL, | |
body TEXT NOT NULL, | |
created DATE NOT NULL | |
); | |
INSERT INTO posts (author, title, body, created) VALUES ('Luc Perkins', 'about stuff', 'stuff and stuff', now()); | |
etc. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment