import dask
import dask.dataframe as dd
import pandas as pd
import pandas as pd
import numpy as np
from pandas.tseries.holiday import USFederalHolidayCalendar
import os
import time
import pyarrow.dataset as ds
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes
import dask_cudf
cluster = LocalCUDACluster()
client = Client(cluster)
cluster.scale(2)
cluster
taxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.12.parquet"
npartitions = len(client.has_what().keys())
print(npartitions)
2
taxi_df = dask_cudf.read_parquet(taxi_parquet_path, npartitions=npartitions)
with dask.annotate(workers=set(client.has_what().keys())):
taxi_df = client.persist(taxi_df)
wait(taxi_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)>}, not_done=set())
client.who_has(taxi_df)
{"('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)": ('tcp://127.0.0.1:39893',)}
Check to see if if repartitioning into two partitons and then writing to disc and subsequently reading from disc still maintains two partitions
taxi_df = taxi_df.repartition(2)
taxi_df.to_parquet("test.parquet")
new_df = dask_cudf.read_parquet("test.parquet")
new_df
Dask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=2 | |||||||||||||||||||
0 | datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 |
403876 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
807751 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: read-parquet, 2 tasks
with dask.annotate(workers=set(client.has_what().keys())):
new_df = client.persist(new_df)
wait(new_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)>, <Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)>}, not_done=set())
client.who_has(new_df)
{"('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)": ('tcp://127.0.0.1:35709',),
"('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)": ('tcp://127.0.0.1:39893',)}
Check to see if data frame has one partition then writing to disc and reading without reartitioning still has a single partition and that dask would persist to a single client.
one_partition = taxi_df.repartition(1)
one_partition
Dask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=1 | |||||||||||||||||||
datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: repartition, 5 tasks
one_partition.to_parquet("test_single.parquet") # has a single partition
new_single_partition_df = dask_cudf.read_parquet("test_single.parquet", npartitions=2)
new_single_partition_df
Dask DataFrame Structure:
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
tpep_pickup_datetime | VendorID | tpep_dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | RateCodeID | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=1 | |||||||||||||||||||
0 | datetime64[ns] | int64 | datetime64[ns] | int64 | float64 | float64 | float64 | int64 | object | float64 | float64 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 |
807751 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
Dask Name: read-parquet, 1 tasks
with dask.annotate(workers=set(client.has_what().keys())):
new_single_partition_df = client.persist(new_single_partition_df)
wait(new_single_partition_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)>}, not_done=set())
client.who_has(new_single_partition_df)
{"('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)": ('tcp://127.0.0.1:35709',)}
client.has_what().keys()
dict_keys(['tcp://127.0.0.1:35709', 'tcp://127.0.0.1:39893'])