Skip to content

Instantly share code, notes, and snippets.

@crypdick
Last active July 17, 2025 20:23
Show Gist options
  • Save crypdick/03b8bc7ebe836b5c3a43bbc541907c8b to your computer and use it in GitHub Desktop.
Save crypdick/03b8bc7ebe836b5c3a43bbc541907c8b to your computer and use it in GitHub Desktop.
repro for ray data llm failure with parquet sink
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(model_source="unsloth/Llama-3.2-1B-Instruct")
processor = build_llm_processor(
config,
preprocess=lambda row: {
"messages": [
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]},
],
"sampling_params": {"temperature": 0.3, "max_tokens": 250},
},
postprocess=lambda row: {"answer": row["generated_text"]},
)
ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
# ds.show(1) <-- works
# ds.materialize() <-- works
ds.write_parquet("local:///tmp/data/") # See error below
(autoscaler +2m58s) [autoscaler] [1xL40S:4CPU-32GB] Attempting to add 1 node to the cluster (increasing from 1 to 2).
(autoscaler +3m3s) [autoscaler] [1xL40S:4CPU-32GB] Launched 1 instance.
Running Dataset: dataset_93_0. Active & requested resources: 4/4 CPU, 1/1 GPU, 256.0MB/6.7GB object store: : 0.00 row [04:02, ? row/s]2025-07-16 15:44:54,199 INFO actor_pool_map_operator.py:661 -- Scaled down actor pool by 1 (reason=None; running=0, restarting=0, pending=0) Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store: : 0.00 row [01:12, ? row/s]
2025-07-16 15:44:54,200tINFO actor_pool_map_operator.py:661 -- Scaled down actor pool by 1 (reason=None; running=0, restarting=0, pending=0) [01:12, ? row/s]
2025-07-16 15:44:54,269FERROR streaming_executor_state.py:504 -- An exception was raised from a task of operator "Write". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.MEngineStageUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 0.0 CPU, 1.0 GPU, 0.0B object store; [all objects local]: : 0.00 row [01:12, ? row/s]
Traceback (most recent call last):: 0; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 0.0B object store; [all objects local]: : 0.00 row [01:12, ? row/s]
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 472, in process_completed_tasks
bytes_read = task.on_data_ready(ks]; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [01:12, ? row/s]
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 138, in on_data_ready
raise ex from None
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 134, in on_data_ready
ray.get(block_ref)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2849, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 939, in get_objects
raise value
ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write
2025-07-16 15:44:54,274 INFO streaming_executor.py:227 -- ⚠️ Dataset dataset_93_0 execution failed
⚠️ Dataset dataset_93_0 execution failed: : 0.00 row [04:03, ? row/s]
- Map(_preprocess): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 182.0B object store: : 1.00 row [01:13, 73.3s/ row]
- MapBatches(ChatTemplateUDF): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 505.0B object store; [0/1 objects local]: : 1.00 row [01:13, 73.3s/ row]
- MapBatches(TokenizeUDF): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 981.0B object store; [0/1 objects local]: : 1.00 row [01:13, 73.3s/ row]
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=2533, ip=10.0.89.151) [vLLM] Elapsed time for batch f0bc6a706cdb4b42a6ea5b761ead54ca with size 1: 0.12248441300002355
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=2533, ip=10.0.89.151) Shutting down vLLM engine
- MapBatches(vLLMEngineStageUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 0.0 CPU, 1.0 GPU, 1.7KB object store; [0/1 objects local]: : 1.00 row [01:15, 76.0s/ row]
- MapBatches(DetokenizeUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 1.7KB object store; [0/1 objects local]: : 1.00 row [01:15, 76.0s/ row]
- Map(_postprocess): Tasks: 1; Actors: 0; Queued blocks: 0; Resources: 1.0 CPU, 246.0B object store: : 1.00 row [01:15, 76.0s/ row]
- Write: Tasks: 1; Actors: 0; Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store: : 0.00 row [01:15, ? row/s]
2025-07-16 15:44:57,023 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2025-07-16 15:44:57,023 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 49, in handle_trace
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 508, in execute
blocks = execute_to_legacy_block_list(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 121, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 169, in _bundles_to_block_list
for ref_bundle in bundles:
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
return self.get_next()
^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 577, in get_next
bundle = state.get_output_blocking(output_split_idx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 331, in get_output_blocking
raise self._exception
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 278, in run
continue_sched = self._scheduling_loop_step(self._topology)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 336, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 505, in process_completed_tasks
raise e from None
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 472, in process_completed_tasks
bytes_read = task.on_data_ready(
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 138, in on_data_ready
raise ex from None
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 134, in on_data_ready
ray.get(block_ref)
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2849, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 939, in get_objects
raise value
ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write
ray.data.exceptions.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ray/default/final_minimal.py", line 19, in <module>
ds.write_parquet("local:///tmp/data/")
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 3449, in write_parquet
self.write_datasink(
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 4598, in write_datasink
self._write_ds = Dataset(plan, logical_plan).materialize()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 5637, in materialize
bundle = copy._plan.execute()
^^^^^^^^^^^^^^^^^^^^
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 89, in handle_trace
raise e.with_traceback(None) from SystemException()
ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment