Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active December 22, 2023 08:12
Show Gist options
  • Save mrocklin/83401a7030f9dde483de37c6592a291f to your computer and use it in GitHub Desktop.
Save mrocklin/83401a7030f9dde483de37c6592a291f to your computer and use it in GitHub Desktop.
from threading import Thread
from time import sleep
import uuid
from dask.distributed import LocalCluster, Client
import dask.dataframe as dd
import pandas as pd
import pyspark
def start_worker(address, channel_name, df):
from dask.distributed import Worker, Client
from tornado.ioloop import IOLoop
from tornado import gen
loop = IOLoop.current()
w = Worker(address, loop=loop)
w.start(0)
print("Started worker")
async def add_dataframe():
async with Client(address, start=False) as c:
[future] = await c._scatter([df]) # register local dataframe as remote data
chan = c.channel(channel_name)
chan.append(future) # inform other clients that it exists
w.loop.add_callback(add_dataframe)
async def block_until_closed():
while w.status != 'closed':
await gen.sleep(0.1)
loop.run_sync(block_until_closed)
distributed.global_worker = False
return ['completed']
def spark_to_dask_dataframe(df, loop=None):
""" Convert a Spark cluster/dataFrame to a Dask cluster/dataframe
Parameters
----------
df: pyspark DataFrame
Examples
--------
>>> import pyspark
>>> sc = pyspark.SparkContext('local[2]') # doctest: +SKIP
>>> spark = pyspark.sql.SparkSession(sc)
>>> import pandas as pd
>>> df = pd.DataFrame({'x': [1, 2, 3], 'y': [10, 20, 30.]}, index=[1, 1, 1])
>>> sdf = spark.createDataFrame(df)
>>> ddf = spark_to_dask_dataframe(sdf) # doctest: +SKIP
See Also
--------
spark_to_dask
dask_to_spark
"""
cluster = LocalCluster(n_workers=0, loop=loop)
client = Client(cluster, loop=cluster.loop)
channel_name = 'spark-partitions-' + uuid.uuid4().hex
# Start long running Spark job
address = cluster.scheduler.address
func = lambda df: start_worker(address, channel_name, df)
start_workers = lambda: df.mapPartitionsAsPandas(func).count()
thread = Thread(target=start_workers)
thread.daemon = True
thread.start()
channel = client.channel(channel_name)
seq = iter(channel)
futures = []
for i in range(df.rdd.getNumPartitions()):
futures.append(next(seq))
head = client.submit(pd.DataFrame.head, futures[0]).result()
ddf = dd.from_delayed(futures, meta=head)
return client, ddf
if __name__ == '__main__':
sc = pyspark.SparkContext('local[2]')
spark = pyspark.sql.SparkSession(sc)
df = pd.DataFrame({'x': range(10), 'y': [10] * 10})
sdf = spark.createDataFrame(df)
print(sdf)
print(sdf.show())
client, ddf = spark_to_dask_dataframe(sdf)
print(ddf)
print(ddf.head())
@mrocklin
Copy link
Author

Nope. I'm not maintaining this.

@wpopielarski
Copy link

Hello Matthew, I'm pretty interested in your piece of code above and I'd just like to ask you why you left it. Is it a lack of time or you found some problems with the way how pyspark Dataframes can be translated into Dask dataframes (maybe this way is somehow ineffective in comparison to read/write files in HDFS)? Appreciate any word from your side.

@hayou8
Copy link

hayou8 commented May 28, 2021

how can we convert dask dataframe to spark dataframe?

@holdenk
Copy link

holdenk commented Feb 20, 2022

I think the easiest way to do this with existing maintained code might be by using Dask on Ray and Spark on Ray and then using Ray's DataSet library to move stuff in between.

https://docs.ray.io/en/latest/data/dataset.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment