Last active
April 8, 2021 22:38
-
-
Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
raydp-spark-remote.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import ray | |
| import raydp | |
| HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST" | |
| HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT" | |
| head_service_ip = os.environ[HEAD_SERVICE_IP_ENV] | |
| client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV] | |
| ray.util.connect(f"{head_service_ip}:{client_port}") | |
| @ray.remote | |
| class SparkCluster: | |
| def __init__(self): | |
| self.spark = None | |
| def start(self): | |
| self.spark = raydp.init_spark('word_count', | |
| num_executors=2, | |
| executor_cores=1, | |
| executor_memory='1G') | |
| def run(self): | |
| df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word']) | |
| df.show() | |
| word_count = df.groupBy('word').count() | |
| word_count.show() | |
| def stop(self): | |
| raydp.stop_spark() | |
| spark = SparkCluster.remote() | |
| ray.get(spark.start.remote()) | |
| ray.get(spark.run.remote()) | |
| ray.get(spark.stop.remote()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In conclusion, I should
ray.util.connect(f"{head_service_ip}:{client_port}")with@ray.remoteand useray.init(address="auto")with original codes.Does XGBoost side need any changes?