Last active
May 29, 2023 02:53
-
-
Save auxten/5dd72eeaf5f76728d17531deb59a6dd2 to your computer and use it in GitHub Desktop.
Bench different impl of running chdb directly on dataframe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import pandas as pd | |
import pyarrow as pa | |
import chdb | |
import subprocess | |
# file size 117MB | |
data_path = '/home/Clickhouse/bench/hits_0.parquet' | |
# Run query directly on parquet file | |
t = time.time() | |
res = chdb.query(f"""SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) \ | |
FROM file('{data_path}', Parquet) GROUP BY RegionID ORDER BY c DESC LIMIT 10""", output_format="Dataframe") | |
# print(res) | |
print(f"Run query on parquet file in {time.time() - t} seconds") | |
# read dataframe from parquet file | |
t = time.time() | |
df = pd.read_parquet(data_path, engine='pyarrow', dtype_backend='pyarrow') | |
print(f"Read parquet file in {time.time() - t} seconds") | |
t = time.time() | |
data = pa.BufferOutputStream() | |
table = pa.Table.from_pandas(df) | |
print(f"Convert Dataframe to Arrow stream in {time.time() - t} seconds") | |
t = time.time() | |
with pa.RecordBatchStreamWriter(data, table.schema) as writer: | |
writer.write_table(table) | |
data = data.getvalue() | |
print(f"Write Arrow stream to buffer in {time.time() - t} seconds") | |
query = ''' | |
CREATE TABLE table ENGINE = File(ArrowStream, 0); | |
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) | |
FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10; | |
''' | |
# Spawn the subprocess and pass custom input from the parent process | |
def spawn_chdb(query, stdin, format): | |
proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
custom_input = data | |
stdout = proc.communicate(input=stdin)[0].decode('utf-8') | |
return stdout | |
t = time.time() | |
# Call the function to spawn the subprocess with custom input | |
res = spawn_chdb(query, data, "Dataframe") | |
print(f"Run query in {time.time() - t} seconds") | |
# print(res) | |
# create memfd | |
fd = os.memfd_create("./tmp_mem_fd", flags=os.MFD_CLOEXEC) | |
# fd = open("./hits_0.arrow", "wb") | |
# time.sleep(1000) | |
# create writer from file descriptor | |
t = time.time() | |
# with pa.RecordBatchFileWriter(fd, table.schema) as writer: | |
# writer.write_table(table) | |
ffd = os.fdopen(fd, "wb") | |
with pa.RecordBatchFileWriter(ffd, table.schema) as writer: | |
writer.write_table(table) | |
ffd.flush() | |
#trancate the last byte | |
# print(fd.tell()) | |
# fd.truncate(fd.tell() - 1) | |
# fd.close() | |
print(f"Write Arrow file to memfd in {time.time() - t} seconds") | |
os.lseek(fd, 0, os.SEEK_SET) | |
#get file size from fd | |
# print(os.fstat(fd)) | |
# print(os.lseek(fd, 0, os.SEEK_END)) | |
# copy file content to another file | |
# with os.fdopen(fd, "rb") as f: | |
# f.seek(0) | |
# with open("./hits_0_memfd.arrow", "wb") as f2: | |
# while True: | |
# buf = f.read(1024) | |
# if buf: | |
# f2.write(buf) | |
# else: | |
# break | |
# f.seek(0) | |
t = time.time() | |
# Query with Arrow file descriptor. | |
os.lseek(fd, 0, os.SEEK_SET) | |
# query = f'''CREATE TABLE table ENGINE = File(Arrow, {fd}); | |
# SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) | |
# FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10;''' | |
query=f'''SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) | |
FROM file('/dev/fd/{fd}', Arrow) GROUP BY RegionID ORDER BY c DESC LIMIT 10''' | |
# query=f'''SELECT * | |
# FROM file('hits_0.arrow', Arrow) LIMIT 10''' | |
ret = chdb.query(query, output_format="Dataframe") | |
# os.close(fd) | |
# fd.close() | |
print(f"Run query in {time.time() - t} seconds") | |
print(ret) | |
# Output | |
# Run query on parquet file in 0.11854410171508789 seconds | |
# Read parquet file in 0.503279447555542 seconds | |
# Convert Dataframe to Arrow stream in 0.01825261116027832 seconds | |
# Write Arrow stream to buffer in 1.214848279953003 seconds | |
# Run query in 2.638005495071411 seconds | |
# Write Arrow file to memfd in 0.5541248321533203 seconds | |
# Run query in 0.4575684070587158 seconds | |
# RegionID sum(AdvEngineID) c avg(ResolutionWidth) uniqExact(UserID) | |
# 0 229 38044 426435 1612.787187 27961 | |
# 1 2 12801 148193 1593.870891 10413 | |
# 2 208 2673 30614 1490.615111 3073 | |
# 3 1 1802 28577 1623.851699 1720 | |
# 4 34 508 14329 1592.897201 1428 | |
# 5 47 1041 13661 1637.851914 943 | |
# 6 158 78 13294 1576.340605 1110 | |
# 7 7 1166 11679 1627.319034 647 | |
# 8 42 642 11547 1625.601022 956 | |
# 9 184 30 10157 1614.693807 987 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Use arrow not arrow stream, and
FROM file('/dev/fd/{fd}', Arrow)
got:CREATE TABLE table ENGINE = File(Arrow, {fd});
runs much slower thanSELECT FROM file('/dev/fd/{fd}', Arrow)