Last active
July 17, 2025 20:23
-
-
Save crypdick/03b8bc7ebe836b5c3a43bbc541907c8b to your computer and use it in GitHub Desktop.
repro for ray data llm failure with parquet sink
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import ray | |
| from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor | |
| config = vLLMEngineProcessorConfig(model_source="unsloth/Llama-3.2-1B-Instruct") | |
| processor = build_llm_processor( | |
| config, | |
| preprocess=lambda row: { | |
| "messages": [ | |
| {"role": "system", "content": "You are a bot that responds with haikus."}, | |
| {"role": "user", "content": row["item"]}, | |
| ], | |
| "sampling_params": {"temperature": 0.3, "max_tokens": 250}, | |
| }, | |
| postprocess=lambda row: {"answer": row["generated_text"]}, | |
| ) | |
| ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."]) | |
| ds = processor(ds) | |
| # ds.show(1) <-- works | |
| # ds.materialize() <-- works | |
| ds.write_parquet("local:///tmp/data/") # See error below |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (autoscaler +2m58s) [autoscaler] [1xL40S:4CPU-32GB] Attempting to add 1 node to the cluster (increasing from 1 to 2). | |
| (autoscaler +3m3s) [autoscaler] [1xL40S:4CPU-32GB] Launched 1 instance. | |
| Running Dataset: dataset_93_0. Active & requested resources: 4/4 CPU, 1/1 GPU, 256.0MB/6.7GB object store: : 0.00 row [04:02, ? row/s]2025-07-16 15:44:54,199 INFO actor_pool_map_operator.py:661 -- Scaled down actor pool by 1 (reason=None; running=0, restarting=0, pending=0) Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store: : 0.00 row [01:12, ? row/s] | |
| 2025-07-16 15:44:54,200tINFO actor_pool_map_operator.py:661 -- Scaled down actor pool by 1 (reason=None; running=0, restarting=0, pending=0) [01:12, ? row/s] | |
| 2025-07-16 15:44:54,269FERROR streaming_executor_state.py:504 -- An exception was raised from a task of operator "Write". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.MEngineStageUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 0.0 CPU, 1.0 GPU, 0.0B object store; [all objects local]: : 0.00 row [01:12, ? row/s] | |
| Traceback (most recent call last):: 0; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 0.0B object store; [all objects local]: : 0.00 row [01:12, ? row/s] | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 472, in process_completed_tasks | |
| bytes_read = task.on_data_ready(ks]; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store: : 0.00 row [01:12, ? row/s] | |
| ^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 138, in on_data_ready | |
| raise ex from None | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 134, in on_data_ready | |
| ray.get(block_ref) | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper | |
| return fn(*args, **kwargs) | |
| ^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper | |
| return func(*args, **kwargs) | |
| ^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2849, in get | |
| values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 939, in get_objects | |
| raise value | |
| ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write | |
| 2025-07-16 15:44:54,274 INFO streaming_executor.py:227 -- ⚠️ Dataset dataset_93_0 execution failed | |
| ⚠️ Dataset dataset_93_0 execution failed: : 0.00 row [04:03, ? row/s] | |
| - Map(_preprocess): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 182.0B object store: : 1.00 row [01:13, 73.3s/ row] | |
| - MapBatches(ChatTemplateUDF): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 505.0B object store; [0/1 objects local]: : 1.00 row [01:13, 73.3s/ row] | |
| - MapBatches(TokenizeUDF): Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 981.0B object store; [0/1 objects local]: : 1.00 row [01:13, 73.3s/ row] | |
| (MapWorker(MapBatches(vLLMEngineStageUDF)) pid=2533, ip=10.0.89.151) [vLLM] Elapsed time for batch f0bc6a706cdb4b42a6ea5b761ead54ca with size 1: 0.12248441300002355 | |
| (MapWorker(MapBatches(vLLMEngineStageUDF)) pid=2533, ip=10.0.89.151) Shutting down vLLM engine | |
| - MapBatches(vLLMEngineStageUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 0.0 CPU, 1.0 GPU, 1.7KB object store; [0/1 objects local]: : 1.00 row [01:15, 76.0s/ row] | |
| - MapBatches(DetokenizeUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 1.7KB object store; [0/1 objects local]: : 1.00 row [01:15, 76.0s/ row] | |
| - Map(_postprocess): Tasks: 1; Actors: 0; Queued blocks: 0; Resources: 1.0 CPU, 246.0B object store: : 1.00 row [01:15, 76.0s/ row] | |
| - Write: Tasks: 1; Actors: 0; Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store: : 0.00 row [01:15, ? row/s] | |
| 2025-07-16 15:44:57,023 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose | |
| 2025-07-16 15:44:57,023 ERROR exceptions.py:81 -- Full stack trace: | |
| Traceback (most recent call last): | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 49, in handle_trace | |
| return fn(*args, **kwargs) | |
| ^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/plan.py", line 508, in execute | |
| blocks = execute_to_legacy_block_list( | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 121, in execute_to_legacy_block_list | |
| block_list = _bundles_to_block_list(bundles) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py", line 169, in _bundles_to_block_list | |
| for ref_bundle in bundles: | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__ | |
| return self.get_next() | |
| ^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 577, in get_next | |
| bundle = state.get_output_blocking(output_split_idx) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 331, in get_output_blocking | |
| raise self._exception | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 278, in run | |
| continue_sched = self._scheduling_loop_step(self._topology) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py", line 336, in _scheduling_loop_step | |
| num_errored_blocks = process_completed_tasks( | |
| ^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 505, in process_completed_tasks | |
| raise e from None | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 472, in process_completed_tasks | |
| bytes_read = task.on_data_ready( | |
| ^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 138, in on_data_ready | |
| raise ex from None | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 134, in on_data_ready | |
| ray.get(block_ref) | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper | |
| return fn(*args, **kwargs) | |
| ^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper | |
| return func(*args, **kwargs) | |
| ^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 2849, in get | |
| values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/_private/worker.py", line 939, in get_objects | |
| raise value | |
| ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write | |
| ray.data.exceptions.SystemException | |
| The above exception was the direct cause of the following exception: | |
| Traceback (most recent call last): | |
| File "/home/ray/default/final_minimal.py", line 19, in <module> | |
| ds.write_parquet("local:///tmp/data/") | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 3449, in write_parquet | |
| self.write_datasink( | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 4598, in write_datasink | |
| self._write_ds = Dataset(plan, logical_plan).materialize() | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/dataset.py", line 5637, in materialize | |
| bundle = copy._plan.execute() | |
| ^^^^^^^^^^^^^^^^^^^^ | |
| File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/exceptions.py", line 89, in handle_trace | |
| raise e.with_traceback(None) from SystemException() | |
| ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=06212aaf682b1a145290da49b6c1c33fb540bffe0d000000, task_name=Write |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment