Created
June 11, 2019 07:58
-
-
Save garanews/d93a8c91dc06d75db93b952977e59756 to your computer and use it in GitHub Desktop.
takagi_dask2.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import asyncpg | |
import cupy as cp | |
import numpy as np | |
from dask import dataframe as dd | |
from dask.distributed import Client | |
from dask_cuda import LocalCUDACluster | |
import time | |
async def read_async(): | |
conn = await asyncpg.connect('postgresql://user:password@ip_db/malwares') | |
rows = await conn.fetch('SELECT data FROM malwares limit 1') | |
return [x['data'] for x in rows] | |
def ruzicka(matrix_a, vector_b): | |
mm = matrix_a.values | |
info = [x[0] for x in mm] | |
mat_a = cp.array([np.unpackbits(np.frombuffer(x[1], dtype=np.uint8)) for x in mm]) * cp.arange(1023, -1, -1, dtype=np.int32) | |
min_up = cp.minimum(mat_a, vector_b) | |
max_down = cp.maximum(mat_a, vector_b) | |
numerator = cp.sum(min_up, axis=1) | |
denominator = cp.sum(max_down, axis=1) | |
ruz = numerator / denominator | |
return ",".join(["%s>%2f" % (x, y) for (x,y) in zip(info, cp.asnumpy(ruz))]) | |
if __name__ == "__main__": | |
f_start = time.time() | |
d_start = time.time() | |
cluster = LocalCUDACluster() | |
client = Client(cluster) | |
malpedia = dd.read_sql_table( | |
"malwares", | |
"postgresql+psycopg2://user:passwordV@ip_db/malwares", | |
npartitions=8, | |
index_col="id", | |
columns=[ "sha256","data"], | |
) | |
malpedia = client.persist(malpedia) | |
end = time.time()-f_start | |
f_start = time.time() | |
print("DB MAIN", end) | |
vector_new = asyncio.get_event_loop().run_until_complete(read_async()) | |
vector_b_np = cp.array([np.unpackbits(np.frombuffer(x, dtype=np.uint8)) for x in vector_new]) * cp.arange(1023, -1, -1, dtype=np.int32) | |
end = time.time()-f_start | |
f_start = time.time() | |
print("DB SINGOLO", end) | |
res = malpedia.map_partitions( | |
lambda df: ruzicka(df, vector_b_np), | |
meta=('result', str) | |
).compute() | |
print(res) | |
end = time.time()-f_start | |
print("GPU computation time", end) | |
end = time.time()-d_start | |
print("total time", end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment