Created
September 11, 2022 06:26
-
-
Save bveeramani/750e424f62c50cf50e981cb93fdb23be to your computer and use it in GitHub Desktop.
read_tf_records bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
============================= test session starts ============================== | |
platform darwin -- Python 3.9.12, pytest-7.1.2, pluggy-1.0.0 | |
rootdir: /Users/bveeramani/GitHub/ray/python | |
plugins: anyio-3.6.1, lazy-fixture-0.6.3 | |
collected 1 item | |
python/ray/data/tests/test_dataset_formats.py F [100%] | |
=================================== FAILURES =================================== | |
_____________________________ test_read_tf_records _____________________________ | |
ray_start_regular_shared = RayContext(dashboard_url='127.0.0.1:8265', python_version='3.9.12', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT...1:63110', 'dashboard_agent_listen_port': 52365, 'node_id': 'b54ebfb4d748c8ad87515e023baf6fd414fca4cd9d6ef2e9b3413c8a'}) | |
tmp_path = PosixPath('/private/var/folders/j8/553rq3812fv573lghv2m98n80000gn/T/pytest-of-bveeramani/pytest-120/test_read_tf_records0') | |
def test_read_tf_records(ray_start_regular_shared, tmp_path): | |
import tensorflow as tf | |
features = tf.train.Features( | |
feature={ | |
"int64": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])), | |
"int64_list": tf.train.Feature( | |
int64_list=tf.train.Int64List(value=[1, 2, 3, 4]) | |
), | |
"float": tf.train.Feature(float_list=tf.train.FloatList(value=[1.0])), | |
"float_list": tf.train.Feature( | |
float_list=tf.train.FloatList(value=[1.0, 2.0, 3.0, 4.0]) | |
), | |
"bytes": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"abc"])), | |
"bytes_list": tf.train.Feature( | |
bytes_list=tf.train.BytesList(value=[b"abc", b"1234"]) | |
), | |
} | |
) | |
example = tf.train.Example(features=features) | |
path = os.path.join(tmp_path, "data.tfrecords") | |
with tf.io.TFRecordWriter(path=path) as writer: | |
writer.write(example.SerializeToString()) | |
> ds = ray.data.read_tf_records(path) | |
python/ray/data/tests/test_dataset_formats.py:3302: | |
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ | |
python/ray/data/read_api.py:776: in read_tf_records | |
return read_datasource( | |
python/ray/data/read_api.py:315: in read_datasource | |
block_list.ensure_metadata_for_first_block() | |
python/ray/data/_internal/lazy_block_list.py:373: in ensure_metadata_for_first_block | |
metadata = ray.get(metadata_ref) | |
python/ray/_private/client_mode_hook.py:105: in wrapper | |
return func(*args, **kwargs) | |
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ | |
object_refs = [ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000002000000)] | |
@PublicAPI | |
@client_mode_hook(auto_init=True) | |
def get( | |
object_refs: Union[ray.ObjectRef, Sequence[ray.ObjectRef]], | |
*, | |
timeout: Optional[float] = None, | |
) -> Union[Any, List[Any]]: | |
"""Get a remote object or a list of remote objects from the object store. | |
This method blocks until the object corresponding to the object ref is | |
available in the local object store. If this object is not in the local | |
object store, it will be shipped from an object store that has it (once the | |
object has been created). If object_refs is a list, then the objects | |
corresponding to each object in the list will be returned. | |
Ordering for an input list of object refs is preserved for each object | |
returned. That is, if an object ref to A precedes an object ref to B in the | |
input list, then A will precede B in the returned list. | |
This method will issue a warning if it's running inside async context, | |
you can use ``await object_ref`` instead of ``ray.get(object_ref)``. For | |
a list of object refs, you can use ``await asyncio.gather(*object_refs)``. | |
Args: | |
object_refs: Object ref of the object to get or a list of object refs | |
to get. | |
timeout (Optional[float]): The maximum amount of time in seconds to | |
wait before returning. | |
Returns: | |
A Python object or a list of Python objects. | |
Raises: | |
GetTimeoutError: A GetTimeoutError is raised if a timeout is set and | |
the get takes longer than timeout to return. | |
Exception: An exception is raised if the task that created the object | |
or that created one of the objects raised an exception. | |
""" | |
worker = global_worker | |
worker.check_connected() | |
if hasattr(worker, "core_worker") and worker.core_worker.current_actor_is_asyncio(): | |
global blocking_get_inside_async_warned | |
if not blocking_get_inside_async_warned: | |
logger.warning( | |
"Using blocking ray.get inside async actor. " | |
"This blocks the event loop. Please use `await` " | |
"on object ref with asyncio.gather if you want to " | |
"yield execution to the event loop instead." | |
) | |
blocking_get_inside_async_warned = True | |
with profiling.profile("ray.get"): | |
is_individual_id = isinstance(object_refs, ray.ObjectRef) | |
if is_individual_id: | |
object_refs = [object_refs] | |
if not isinstance(object_refs, list): | |
raise ValueError( | |
"'object_refs' must either be an object ref " | |
"or a list of object refs." | |
) | |
# TODO(ujvl): Consider how to allow user to retrieve the ready objects. | |
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout) | |
for i, value in enumerate(values): | |
if isinstance(value, RayError): | |
if isinstance(value, ray.exceptions.ObjectLostError): | |
worker.core_worker.dump_object_store_memory_usage() | |
if isinstance(value, RayTaskError): | |
> raise value.as_instanceof_cause() | |
E ray.exceptions.RayTaskError(ReferenceError): [36mray::_execute_read_task()[39m (pid=40369, ip=127.0.0.1) | |
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 73, in dumps | |
E cp.dump(obj) | |
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 620, in dump | |
E return Pickler.dump(self, obj) | |
E File "/Users/bveeramani/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 714, in reducer_override | |
E elif isinstance(obj, types.FunctionType): | |
E ReferenceError: weakly-referenced object no longer exists | |
python/ray/_private/worker.py:2279: RayTaskError(ReferenceError) | |
---------------------------- Captured stderr setup ----------------------------- | |
2022-09-10 23:26:14,221 INFO worker.py:1508 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m | |
----------------------------- Captured stdout call ----------------------------- | |
[2m[36m(_execute_read_task pid=40369)[0m Metal device set to: Apple M1 Pro | |
----------------------------- Captured stderr call ----------------------------- | |
[2m[36m(_execute_read_task pid=40369)[0m 2022-09-10 23:26:16.360923: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support. | |
[2m[36m(_execute_read_task pid=40369)[0m 2022-09-10 23:26:16.361063: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>) | |
=========================== short test summary info ============================ | |
FAILED python/ray/data/tests/test_dataset_formats.py::test_read_tf_records - ray.exceptions.RayTaskError(ReferenceError): [36mray::_execute_read_task()[39m (pid=40369, ip=127.0.0.1) | |
======================= 1 failed, 107 warnings in 8.75s ======================== |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment