Last active
April 8, 2021 22:38
-
-
Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
raydp-spark-remote.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import ray | |
import raydp | |
HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST" | |
HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT" | |
head_service_ip = os.environ[HEAD_SERVICE_IP_ENV] | |
client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV] | |
ray.util.connect(f"{head_service_ip}:{client_port}") | |
@ray.remote | |
class SparkCluster: | |
def __init__(self): | |
self.spark = None | |
def start(self): | |
self.spark = raydp.init_spark('word_count', | |
num_executors=2, | |
executor_cores=1, | |
executor_memory='1G') | |
def run(self): | |
df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word']) | |
df.show() | |
word_count = df.groupBy('word').count() | |
word_count.show() | |
def stop(self): | |
raydp.stop_spark() | |
spark = SparkCluster.remote() | |
ray.get(spark.start.remote()) | |
ray.get(spark.run.remote()) | |
ray.get(spark.stop.remote()) |
if I stick to this code, I get errors
(base) ray@example-cluster-ray-head-xnk64:~$ python3 ray-spark-remote.py
(pid=None, ip=192.168.13.23) Setting default log level to "WARN".
(pid=None, ip=192.168.13.23) To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "ray-spark-remote.py", line 34, in <module>
ray.get(spark.start.remote())
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
return getattr(ray, func.__name__)(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/api.py", line 35, in get
return self.worker.get(vals, timeout=timeout)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 198, in get
res = self._get(obj_ref, op_timeout)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 221, in _get
raise err
types.RayTaskError(Py4JError): ray::SparkCluster.start() (pid=350, ip=192.168.13.23)
File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/function_manager.py", line 566, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "ray-spark-remote.py", line 21, in start
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
return _global_spark_context.get_or_create_session()
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 68, in get_or_create_session
spark_cluster = self._get_or_create_spark_cluster()
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
self._spark_cluster = SparkCluster()
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 32, in __init__
self._set_up_master(None, None)
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 38, in _set_up_master
self._app_master_bridge.start_up()
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 57, in start_up
self._create_app_master(extra_classpath)
File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 166, in _create_app_master
self._app_master_java_bridge.startUpAppMaster(extra_classpath)
File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 336, in get_return_value
format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o0.startUpAppMaster
In conclusion, I should ray.util.connect(f"{head_service_ip}:{client_port}")
with @ray.remote
and use ray.init(address="auto")
with original codes.
Does XGBoost side need any changes?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ray.util.connect(f"{head_service_ip}:{client_port}")
ray.init(address="auto")
and I get follow errors, it also shows above errors as well. not consistent.