Last active
April 8, 2021 22:38
-
-
Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
raydp-spark-remote.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import ray | |
import raydp | |
HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST" | |
HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT" | |
head_service_ip = os.environ[HEAD_SERVICE_IP_ENV] | |
client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV] | |
ray.util.connect(f"{head_service_ip}:{client_port}") | |
@ray.remote | |
class SparkCluster: | |
def __init__(self): | |
self.spark = None | |
def start(self): | |
self.spark = raydp.init_spark('word_count', | |
num_executors=2, | |
executor_cores=1, | |
executor_memory='1G') | |
def run(self): | |
df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word']) | |
df.show() | |
word_count = df.groupBy('word').count() | |
word_count.show() | |
def stop(self): | |
raydp.stop_spark() | |
spark = SparkCluster.remote() | |
ray.get(spark.start.remote()) | |
ray.get(spark.run.remote()) | |
ray.get(spark.stop.remote()) |
In conclusion, I should ray.util.connect(f"{head_service_ip}:{client_port}")
with @ray.remote
and use ray.init(address="auto")
with original codes.
Does XGBoost side need any changes?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
if I stick to this code, I get errors