I am using Ray 2.6.3. I would like to rename a column in a dataset.
I am able to run map properly without my data
column of images. However, the buffer is too small for the array when I include data
. What should I do to increase the size of the buffer?
Also, is there a better way to rename a column than by using the mapping?
metadata = ray.data.from_items([{'index': index, 'timestamp': timestamp} for index, timestamp in zip(r, timestamps)])
image_data = ray.data.from_numpy(images)
full_ds = metadata.zip(image_data)
full_ds.schema()
Column Type
------ ----
index int64
timestamp double
data numpy.ndarray(shape=(720, 1280, 3), dtype=uint8)
def rename_row(row: Dict[str, Any]) -> Dict[str, Any]:
row['image'] = row.pop('data')
return row
renamed_ds = full_ds.map(rename_row)
renamed_ds.schema()
2023-09-24 19:35:09,776 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input], InputDataBuffer[Input] -> ZipOperator[Zip] -> TaskPoolMapOperator[Map(rename_row)] -> LimitOperator[limit=1]
2023-09-24 19:35:09,777 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-09-24 19:35:09,777 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(Map(rename_row) pid=1177384) Task failed with retryable exception: TaskID(6012944da5356edfffffffffffffffffffffffff01000000).
(Map(rename_row) pid=1177384) Traceback (most recent call last):
(Map(rename_row) pid=1177384) File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(Map(rename_row) pid=1177384) File "python/ray/_raylet.pyx", line 3684, in ray._raylet.CoreWorker.store_task_outputs
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(Map(rename_row) pid=1177384) for b_out in fn(iter(blocks), ctx):
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(Map(rename_row) pid=1177384) yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/planner/map_rows.py", line 25, in fn
(Map(rename_row) pid=1177384) for row in block.iter_rows(public_row_format=True):
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/table_block.py", line 196, in __next__
(Map(rename_row) pid=1177384) return row.as_pydict()
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/row.py", line 33, in as_pydict
(Map(rename_row) pid=1177384) return dict(self.items())
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "<frozen _collections_abc>", line 861, in __iter__
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py", line 87, in __getitem__
(Map(rename_row) pid=1177384) return ArrowBlockAccessor._build_tensor_row(self._row, col_name=key)
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py", line 201, in _build_tensor_row
(Map(rename_row) pid=1177384) element = element.as_py()
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 193, in as_py
(Map(rename_row) pid=1177384) return self.type._extension_scalar_to_ndarray(self)
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 135, in _extension_scalar_to_ndarray
(Map(rename_row) pid=1177384) return _to_ndarray_helper(shape, value_type, offset, data_buffer)
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) File "/mnt/md0/.anaconda3/envs/python311news/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 892, in _to_ndarray_helper
(Map(rename_row) pid=1177384) return np.ndarray(shape, dtype=ext_dtype, buffer=data_buffer, offset=data_offset)
(Map(rename_row) pid=1177384) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Map(rename_row) pid=1177384) TypeError: buffer is too small for requested array
...
More task tracebacks
Instead of using ray.data.Dataset
, using ray actors and @ray.remote
eliminated the memory problems.