Last active
November 11, 2022 16:17
-
-
Save russellpierce/1b32ba38cfac68e23efb350c5a135c86 to your computer and use it in GitHub Desktop.
DuckDb ds hang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker run --rm -it python:3.10-buster /bin/bash | |
## Can't test | |
pyarrow == 5.0.0 | |
## Happy | |
??? | |
## Unhappy python:3.10-buster | |
pa.__version__='10.0.0' | |
duckdb.__version__='0.5.1' | |
fsspec.__version__='2022.11.0' | |
adlfs.__version__='2022.10.0' | |
## Unhappy r-rrunner (vscode) & | |
pa.__version__='10.0.0' | |
duckdb.__version__='0.5.1' | |
fsspec.__version__='2022.3.0' | |
adlfs.__version__='2021.10.0' | |
pa.__version__='10.0.0' | |
duckdb.__version__='0.5.1' | |
fsspec.__version__='2021.11.0' | |
adlfs.__version__='2021.10.0' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requires an env var `azure_connection_string_secret` that is acceptable to adlfs | |
# tries four cases, local fs with and without partitioning and then the adlfs with and without partitioning | |
# it only seems to fail for the last case | |
import pyarrow as pa | |
from pyarrow.dataset import HivePartitioning | |
import pyarrow.parquet as pq | |
import duckdb | |
import fsspec | |
import datetime as dt | |
import os | |
from fsspec.implementations.local import LocalFileSystem | |
fs_local = LocalFileSystem() | |
import adlfs | |
azure_fs = adlfs.AzureBlobFileSystem(connection_string=os.getenv("azure_connection_string_secret")) | |
# need to write example files | |
base_dir = "ds-general-cool/russellrepex" | |
con = duckdb.connect() | |
print(f"{pa.__version__=}") | |
print(f"{duckdb.__version__=}") | |
print(f"{fsspec.__version__=}") | |
print(f"{adlfs.__version__=}") | |
# Create fake data | |
example_dag_time = dt.datetime.now().replace(hour=0,minute=0, second=0, microsecond=0) | |
example_dag_time_not_break = dt.datetime.now().replace(hour=1,minute=30, second=0, microsecond=0) | |
example_dict = { | |
"timestamp": [dt.datetime.now(), dt.datetime.now()-dt.timedelta(days=1), dt.datetime.now()], | |
"stringfield": ["a", "b", "dsadsa"], | |
"floatfield": [1, None, 3.14], | |
"DAG_TIME": [example_dag_time,example_dag_time, example_dag_time_not_break] | |
} | |
df_a = pa.Table.from_pydict(example_dict) | |
example_dict_2 = { | |
"timestamp": [dt.datetime.now()-dt.timedelta(days=2), dt.datetime.now()-dt.timedelta(days=3), dt.datetime.now()], | |
"stringfield": ["a", None, "example_dict_2"], | |
"floatfield": [1, None, 3.14], | |
"DAG_TIME": [example_dag_time-dt.timedelta(days=1),example_dag_time-dt.timedelta(days=1), example_dag_time_not_break-dt.timedelta(days=1)] | |
} | |
df_b = pa.Table.from_pydict(example_dict_2) | |
# Define schema | |
pa_schema = pa.schema([("DAG_TIME", pa.timestamp('ms'))]) | |
part_var = HivePartitioning(pa_schema) | |
for fs, name in zip([fs_local,azure_fs],["local","azure"]): | |
for do_partition in (False, True): | |
if do_partition: | |
part_arg = part_var | |
else: | |
part_arg = None | |
print(f"trying: {name} with partitioning as {do_partition}") | |
print("Writing data remotely") | |
# existing_data_behavior arg not available in pyarrow==6.0.0 but is in 10.0.0 | |
pa.dataset.write_dataset( | |
df_a, | |
base_dir=base_dir, | |
partitioning = part_arg, | |
#partitioning=['DAG_TIME'], | |
#partitioning_flavor="hive", | |
format="parquet", | |
existing_data_behavior="delete_matching", | |
filesystem=fs | |
) | |
pa.dataset.write_dataset( | |
df_b, | |
base_dir=base_dir, | |
partitioning = part_arg, | |
#partitioning=['DAG_TIME'], | |
#partitioning_flavor="hive", | |
format="parquet", | |
existing_data_behavior="delete_matching", | |
filesystem=fs | |
) | |
print("Populating dataset") | |
example_ds=pa.dataset.dataset( | |
base_dir, | |
filesystem=fs, | |
partitioning=part_arg | |
) | |
# the data is there and loads | |
print("demonstrating full table load") | |
print(example_ds.to_table()) | |
# duckdb is there and responsive | |
print("demonstrating duckdb connection") | |
print(con.execute("select 1").arrow()) | |
# This command hangs - hard, no escape via ctrl-c --- under some package combos | |
print("Checking for hang") | |
print(con.execute("select * from example_ds").arrow()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment