Skip to content

Instantly share code, notes, and snippets.

@lucperkins
Last active November 7, 2016 11:15
Show Gist options
  • Save lucperkins/f721fe5a9e40889eab8e to your computer and use it in GitHub Desktop.
Save lucperkins/f721fe5a9e40889eab8e to your computer and use it in GitHub Desktop.
Migrate SQL table to Riak
import psycopg2
from riak import RiakClient, RiakObject
from riak.datatypes import Set
import datetime
# Riak connection and set
client = RiakClient(pb_port=8087)
SETS_BUCKET = client.bucket_type('sets').bucket('key_sets')
# Get columns for table
def get_table_columns(cursor, table):
cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table))
columns = cursor.fetchall()
for col in columns:
yield col[0]
# Convert each row to a dict, excluding the primary key (which will be the Riak object's key)
def convert_row_to_dict(row, columns_list):
obj = {}
for n in range(1, len(columns_list)):
if type(row[n]) is datetime.date:
obj[columns_list[n]] = row[n].strftime('%m-%d-%Y')
else:
obj[columns_list[n]] = row[n]
return obj
# Store Postgres table in Riak by row
def store_table_in_riak(database, table):
cursor = psycopg2.connect("dbname={}".format(database)).cursor()
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
cursor.execute("SELECT * FROM {}".format(table))
rows = cursor.fetchall()
columns_list = list(get_table_columns(cursor, table))
for row in rows:
key = str(row[0])
user_dict = convert_row_to_dict(row, columns_list)
obj = RiakObject(client, bucket, key)
obj.data = user_dict
obj.content_type = 'application/json'
obj.store()
key_set.add(key)
key_set.store()
cursor.close()
def store_and_drop_table(database, table):
cursor = psycopg2.connect("dbname={}".format(database)).cursor()
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
cursor.execute("SELECT * FROM {}".format(table))
rows = cursor.fetchall()
columns_list = list(get_table_columns(table))
for row in rows:
key = str(row[0])
user_dict = convert_row_to_dict(row, columns_list)
obj = RiakObject(client, bucket, key)
obj.data = user_dict
obj.content_type = 'application/json'
obj.store()
key_set.add(key)
key_set.store()
cursor.execute("DROP TABLE {};".format(table))
connection.commit()
def select_star_from_table(table):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
for key in key_set.reload().value:
yield bucket.get(key).data
def select_by_id(table, key):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
if not key in key_set.reload().value:
raise Exception('No object for this primary key')
else:
return bucket.get(key).data
def get_row_column_value(table, key, field):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
if not key in key_set.reload().value:
raise Exception
else:
obj = bucket.get(key).data
if field in obj:
return obj[field]
else:
raise Exception('Field does not exist for this object')
def migrate_multiple_tables(database, tables):
cursor = psycopg2.connect("dbname={}".format(database)).cursor()
for table in tables:
store_and_drop_table(table)
cursor.close()
# Run the main function and close our Postgres cursor
# store_table_in_riak('users')
store_table_in_riak('luc', 'posts')
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
author VARCHAR(30) NOT NULL,
title VARCHAR(50) NOT NULL,
body TEXT NOT NULL,
created DATE NOT NULL
);
INSERT INTO posts (author, title, body, created) VALUES ('Luc Perkins', 'about stuff', 'stuff and stuff', now());
etc.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment