Created
April 30, 2022 07:44
-
-
Save ntuaha/c420a075533df665f3a4e5d08284550e to your computer and use it in GitHub Desktop.
做相關實驗設計
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from psycopg2 import connect | |
import contextlib | |
import psycopg2 | |
import datetime | |
import random | |
import pickle | |
import os | |
import time | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
import pandas as pd | |
import numpy as np | |
from sqlalchemy import create_engine | |
import io | |
import tempfile | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
logging.info(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
logging.info(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
logging.info(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', | |
datefmt='%m-%d %H:%M:%S') | |
def get_raw_conn(): | |
engine = create_engine(f"""postgresql://{os.environ['OTP_DB_USR']}:{os.environ['OTP_DB_PWD']}@{os.environ['OTP_DB_HOST']}:5432/raw""") | |
return engine | |
def get_features_conn(): | |
engine = create_engine(f"""postgresql://{os.environ['OTP_DB_USR']}:{os.environ['OTP_DB_PWD']}@{os.environ['OTP_DB_HOST']}:5432/features""") | |
return engine | |
#@profile | |
def check(db,schema='lab',table='t1'): | |
if db == 'row': | |
engine = get_raw_conn() | |
else: | |
engine = get_features_conn() | |
conn = engine.raw_connection() | |
with conn.cursor() as cur: | |
cur.execute(f"SET search_path TO {schema}") | |
cur.execute(f"select count(*) from {schema}.{table}") | |
r = cur.fetchall() | |
res = np.array(r) | |
logging.info(f"total #: {res}") | |
conn.close() | |
def clean_csv_value(value) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
#@profile | |
def copy_stringio(connection, beers) -> None: | |
schema = 'lab' | |
table = 't1' | |
with connection.cursor() as cursor: | |
csv_file_like_object = io.StringIO() | |
for beer in beers: | |
csv_file_like_object.write('|'.join(map(str,beer)) + '\n') | |
csv_file_like_object.seek(0) | |
cursor.execute(f"SET search_path TO {schema}") | |
cursor.copy_from(csv_file_like_object, f'{schema}.{table}', sep='|') | |
connection.commit() | |
@profile | |
def insert_raws(num=10000): | |
#if num > 100000: | |
# raise 'too much' | |
arr = np.random.randint(40,size=(num,8)) | |
df = pd.DataFrame(arr) | |
logging.info(arr.shape) | |
logging.info(df.shape) | |
engine = get_raw_conn() | |
conn = engine.raw_connection() | |
with conn.cursor() as cur: | |
cur.execute("TRUNCATE TABLE lab.t1") | |
conn.commit() | |
#df.to_sql('lab.t1', con=engine, if_exists='append') | |
copy_stringio(conn,arr) | |
conn.close() | |
def main(): | |
logging.info("HI") | |
import tempfile | |
@profile | |
def copy_from_raw_feature(schema = 'lab',table = 't1') -> None: | |
raw_engine = get_raw_conn() | |
feature_engine = get_features_conn() | |
raw_conn = raw_engine.raw_connection() | |
feature_conn = feature_engine.raw_connection() | |
try: | |
with tempfile.TemporaryFile() as tmp, raw_conn.cursor() as cur1, feature_conn.cursor() as cur2: | |
cur2.execute("TRUNCATE TABLE lab.t1") | |
feature_conn.commit() | |
sql = "SELECT * from lab.t1" | |
copy_sql = f"""COPY ({sql}) TO STDOUT WITH CSV HEADER""" | |
cur1.copy_expert(copy_sql,tmp) | |
tmp.seek(0) | |
#cur1.execute("") | |
#r = cur1.fetchall() | |
#arr = np.array(r) | |
#csv_file_like_object = io.StringIO() | |
#for beer in arr: | |
# csv_file_like_object.write('|'.join(map(str,beer)) + '\n') | |
#csv_file_like_object.seek(0) | |
#cur2.execute(f"SET search_path TO {schema}") | |
#cur2.copy_from(csv_file_like_object, f'{schema}.{table}', sep='|') | |
cur2.copy_expert(f"""COPY {schema}.{table} FROM STDIN DELIMITER ',' CSV HEADER""",tmp) | |
feature_conn.commit() | |
finally: | |
feature_conn.close() | |
raw_conn.close() | |
@profile | |
def copy_from_raw_feature2(schema = 'lab',table = 't1') -> None: | |
raw_engine = get_raw_conn() | |
feature_engine = get_features_conn() | |
raw_conn = raw_engine.raw_connection() | |
feature_conn = feature_engine.raw_connection() | |
try: | |
with raw_conn.cursor() as cur1, feature_conn.cursor() as cur2: | |
cur2.execute("TRUNCATE TABLE lab.t1") | |
feature_conn.commit() | |
cur1.execute("SELECT * from lab.t1") | |
r = cur1.fetchall() | |
arr = np.array(r) | |
csv_file_like_object = io.StringIO() | |
for beer in arr: | |
csv_file_like_object.write('|'.join(map(str,beer)) + '\n') | |
csv_file_like_object.seek(0) | |
cur2.execute(f"SET search_path TO {schema}") | |
cur2.copy_from(csv_file_like_object, f'{schema}.{table}', sep='|') | |
feature_conn.commit() | |
finally: | |
feature_conn.close() | |
raw_conn.close() | |
from memory_tempfile import MemoryTempfile | |
memfile = MemoryTempfile() | |
@profile | |
def copy_from_raw_feature3(schema = 'lab',table = 't1') -> None: | |
raw_engine = get_raw_conn() | |
feature_engine = get_features_conn() | |
raw_conn = raw_engine.raw_connection() | |
feature_conn = feature_engine.raw_connection() | |
try: | |
with memfile.TemporaryFile() as tmp, raw_conn.cursor() as cur1, feature_conn.cursor() as cur2: | |
cur2.execute("TRUNCATE TABLE lab.t1") | |
feature_conn.commit() | |
sql = "SELECT * from lab.t1" | |
copy_sql = f"""COPY ({sql}) TO STDOUT WITH CSV HEADER""" | |
cur1.copy_expert(copy_sql,tmp) | |
tmp.seek(0) | |
cur2.copy_expert(f"""COPY {schema}.{table} FROM STDIN DELIMITER ',' CSV HEADER""",tmp) | |
feature_conn.commit() | |
finally: | |
feature_conn.close() | |
raw_conn.close() | |
if __name__ == "__main__": | |
main() | |
insert_raws(500000) | |
check('row') | |
copy_from_raw_feature() | |
check('features') | |
copy_from_raw_feature2() | |
check('features') | |
copy_from_raw_feature3() | |
check('features') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment