Skip to content

Instantly share code, notes, and snippets.

@bveeramani
Created February 18, 2026 23:02
Show Gist options
  • Select an option

  • Save bveeramani/51e0383bb3680dd78fdfb92d76ea22a8 to your computer and use it in GitHub Desktop.

Select an option

Save bveeramani/51e0383bb3680dd78fdfb92d76ea22a8 to your computer and use it in GitHub Desktop.
How to replicate the behavior of `locality_with_outputs`
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
# Use `ray.nodes()` or `ray.get_runtime_context().get_node_id()` to get the IDs of nodes
# you want locality with.
consumer_node_ids = ["e1aa424056dc5cdfad1c3d068c42040906a493a046f9ddb42b8a11ca", ...]
class RayRemoteArgsFn:
def __init__(self, node_ids):
self._node_ids = node_ids
self._call_index = 0
def __call__(self):
scheduling_strategy = NodeAffinitySchedulingStrategy(
self._node_ids[self._call_index],
soft=True,
_spill_on_unavailable=True,
)
self._call_index += 1
self._call_index %= len(self._node_ids)
return {"scheduling_strategy": scheduling_strategy}
ds = (
...
# Pass `RayRemoteArgsFn` wherever you want to replicate the behavior of
# `DataContext.execution_options.locality_with_outputs`.
.map_batches(..., ray_remote_args_fn=RayRemoteArgsFn(consumer_node_ids))
...
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment