Skip to content

Instantly share code, notes, and snippets.

@NellyWhads
Last active December 17, 2024 03:29
Show Gist options
  • Save NellyWhads/fdfb261a027be7e7bc87bec91d9e9035 to your computer and use it in GitHub Desktop.
Save NellyWhads/fdfb261a027be7e7bc87bec91d9e9035 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pprint import pprint\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import pyarrow as pa\n",
"import pyarrow.parquet as pq\n",
"import ray.data\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Flat dataframe"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"num_samples = 5"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>D</th>\n",
" <th>s</th>\n",
" <th>ms</th>\n",
" <th>us</th>\n",
" <th>ns</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2026-05-01</td>\n",
" <td>2024-01-01 00:14:11</td>\n",
" <td>2024-01-01 00:00:00.851</td>\n",
" <td>2024-01-01 00:00:00.000851</td>\n",
" <td>2024-01-01 00:00:00.000000851</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2025-08-22</td>\n",
" <td>2024-01-01 00:09:59</td>\n",
" <td>2024-01-01 00:00:00.599</td>\n",
" <td>2024-01-01 00:00:00.000599</td>\n",
" <td>2024-01-01 00:00:00.000000599</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2024-12-04</td>\n",
" <td>2024-01-01 00:05:38</td>\n",
" <td>2024-01-01 00:00:00.338</td>\n",
" <td>2024-01-01 00:00:00.000338</td>\n",
" <td>2024-01-01 00:00:00.000000338</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2026-03-17</td>\n",
" <td>2024-01-01 00:13:26</td>\n",
" <td>2024-01-01 00:00:00.806</td>\n",
" <td>2024-01-01 00:00:00.000806</td>\n",
" <td>2024-01-01 00:00:00.000000806</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2025-05-25</td>\n",
" <td>2024-01-01 00:08:30</td>\n",
" <td>2024-01-01 00:00:00.510</td>\n",
" <td>2024-01-01 00:00:00.000510</td>\n",
" <td>2024-01-01 00:00:00.000000510</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" D s ms \\\n",
"0 2026-05-01 2024-01-01 00:14:11 2024-01-01 00:00:00.851 \n",
"1 2025-08-22 2024-01-01 00:09:59 2024-01-01 00:00:00.599 \n",
"2 2024-12-04 2024-01-01 00:05:38 2024-01-01 00:00:00.338 \n",
"3 2026-03-17 2024-01-01 00:13:26 2024-01-01 00:00:00.806 \n",
"4 2025-05-25 2024-01-01 00:08:30 2024-01-01 00:00:00.510 \n",
"\n",
" us ns \n",
"0 2024-01-01 00:00:00.000851 2024-01-01 00:00:00.000000851 \n",
"1 2024-01-01 00:00:00.000599 2024-01-01 00:00:00.000000599 \n",
"2 2024-01-01 00:00:00.000338 2024-01-01 00:00:00.000000338 \n",
"3 2024-01-01 00:00:00.000806 2024-01-01 00:00:00.000000806 \n",
"4 2024-01-01 00:00:00.000510 2024-01-01 00:00:00.000000510 "
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create random timestamps with different precisions\n",
"base_timestamp = np.datetime64(\"2024-01-01\")\n",
"random_offsets = np.random.randint(0, 1000, size=num_samples)\n",
"\n",
"df = pd.DataFrame(\n",
" {\n",
" \"D\": [base_timestamp + np.timedelta64(offset, \"D\") for offset in random_offsets],\n",
" \"s\": [base_timestamp + np.timedelta64(offset, \"s\") for offset in random_offsets],\n",
" \"ms\": [base_timestamp + np.timedelta64(offset, \"ms\") for offset in random_offsets],\n",
" \"us\": [base_timestamp + np.timedelta64(offset, \"us\") for offset in random_offsets],\n",
" \"ns\": [base_timestamp + np.timedelta64(offset, \"ns\") for offset in random_offsets],\n",
" }\n",
")\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"parquet_file = \"test.parquet\"\n",
"pq.write_table(pa.Table.from_pandas(df), parquet_file)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:21,211\tINFO worker.py:1743 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32m127.0.0.1:8266 \u001b[39m\u001b[22m\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "84ea75106e8947e6867375122aedcc3f",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Parquet Files Sample 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:22,284\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:22,284\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "681f02513e8a4c49a66eae7359caa9b6",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0ef4e388dbce49409f0eb6b33ccc2a0c",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f7f8e787ae0243eab732e9497fece637",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>D</th>\n",
" <th>s</th>\n",
" <th>ms</th>\n",
" <th>us</th>\n",
" <th>ns</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2026-05-01</td>\n",
" <td>2024-01-01 00:14:11</td>\n",
" <td>2024-01-01 00:00:00.851</td>\n",
" <td>2024-01-01 00:00:00.000851</td>\n",
" <td>2024-01-01 00:00:00.000000851</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2025-08-22</td>\n",
" <td>2024-01-01 00:09:59</td>\n",
" <td>2024-01-01 00:00:00.599</td>\n",
" <td>2024-01-01 00:00:00.000599</td>\n",
" <td>2024-01-01 00:00:00.000000599</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2024-12-04</td>\n",
" <td>2024-01-01 00:05:38</td>\n",
" <td>2024-01-01 00:00:00.338</td>\n",
" <td>2024-01-01 00:00:00.000338</td>\n",
" <td>2024-01-01 00:00:00.000000338</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2026-03-17</td>\n",
" <td>2024-01-01 00:13:26</td>\n",
" <td>2024-01-01 00:00:00.806</td>\n",
" <td>2024-01-01 00:00:00.000806</td>\n",
" <td>2024-01-01 00:00:00.000000806</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2025-05-25</td>\n",
" <td>2024-01-01 00:08:30</td>\n",
" <td>2024-01-01 00:00:00.510</td>\n",
" <td>2024-01-01 00:00:00.000510</td>\n",
" <td>2024-01-01 00:00:00.000000510</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" D s ms \\\n",
"0 2026-05-01 2024-01-01 00:14:11 2024-01-01 00:00:00.851 \n",
"1 2025-08-22 2024-01-01 00:09:59 2024-01-01 00:00:00.599 \n",
"2 2024-12-04 2024-01-01 00:05:38 2024-01-01 00:00:00.338 \n",
"3 2026-03-17 2024-01-01 00:13:26 2024-01-01 00:00:00.806 \n",
"4 2025-05-25 2024-01-01 00:08:30 2024-01-01 00:00:00.510 \n",
"\n",
" us ns \n",
"0 2024-01-01 00:00:00.000851 2024-01-01 00:00:00.000000851 \n",
"1 2024-01-01 00:00:00.000599 2024-01-01 00:00:00.000000599 \n",
"2 2024-01-01 00:00:00.000338 2024-01-01 00:00:00.000000338 \n",
"3 2024-01-01 00:00:00.000806 2024-01-01 00:00:00.000000806 \n",
"4 2024-01-01 00:00:00.000510 2024-01-01 00:00:00.000000510 "
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds = ray.data.read_parquet(parquet_file)\n",
"ds_df = ds.take_batch(num_samples, batch_format=\"pandas\")\n",
"ds_df\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:22,375\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:22,375\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "ad3c934bd9d24c298df039e40c6500cd",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "cc066808ae404b7a9698c66a2f6d3663",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- MapBatches(<lambda>) 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "89f5391354dc4de58bfd15f36ca24e58",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 3: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "b8da76afc62b4b3a94953df6f7fd963b",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>D</th>\n",
" <th>s</th>\n",
" <th>ms</th>\n",
" <th>us</th>\n",
" <th>ns</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2026-05-01</td>\n",
" <td>2024-01-01 00:14:11</td>\n",
" <td>2024-01-01 00:00:00.851</td>\n",
" <td>2024-01-01 00:00:00.000851</td>\n",
" <td>2024-01-01 00:00:00.000000851</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2025-08-22</td>\n",
" <td>2024-01-01 00:09:59</td>\n",
" <td>2024-01-01 00:00:00.599</td>\n",
" <td>2024-01-01 00:00:00.000599</td>\n",
" <td>2024-01-01 00:00:00.000000599</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2024-12-04</td>\n",
" <td>2024-01-01 00:05:38</td>\n",
" <td>2024-01-01 00:00:00.338</td>\n",
" <td>2024-01-01 00:00:00.000338</td>\n",
" <td>2024-01-01 00:00:00.000000338</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2026-03-17</td>\n",
" <td>2024-01-01 00:13:26</td>\n",
" <td>2024-01-01 00:00:00.806</td>\n",
" <td>2024-01-01 00:00:00.000806</td>\n",
" <td>2024-01-01 00:00:00.000000806</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2025-05-25</td>\n",
" <td>2024-01-01 00:08:30</td>\n",
" <td>2024-01-01 00:00:00.510</td>\n",
" <td>2024-01-01 00:00:00.000510</td>\n",
" <td>2024-01-01 00:00:00.000000510</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" D s ms \\\n",
"0 2026-05-01 2024-01-01 00:14:11 2024-01-01 00:00:00.851 \n",
"1 2025-08-22 2024-01-01 00:09:59 2024-01-01 00:00:00.599 \n",
"2 2024-12-04 2024-01-01 00:05:38 2024-01-01 00:00:00.338 \n",
"3 2026-03-17 2024-01-01 00:13:26 2024-01-01 00:00:00.806 \n",
"4 2025-05-25 2024-01-01 00:08:30 2024-01-01 00:00:00.510 \n",
"\n",
" us ns \n",
"0 2024-01-01 00:00:00.000851 2024-01-01 00:00:00.000000851 \n",
"1 2024-01-01 00:00:00.000599 2024-01-01 00:00:00.000000599 \n",
"2 2024-01-01 00:00:00.000338 2024-01-01 00:00:00.000000338 \n",
"3 2024-01-01 00:00:00.000806 2024-01-01 00:00:00.000000806 \n",
"4 2024-01-01 00:00:00.000510 2024-01-01 00:00:00.000000510 "
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_batches_ds = ds.map_batches(lambda x: x, batch_format=\"pandas\")\n",
"map_batches_ds_df = map_batches_ds.take_batch(num_samples, batch_format=\"pandas\")\n",
"map_batches_ds_df\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:23,065\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:23,066\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "a5ee77a3ec904f83ac418bcdd12d8879",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "b59430b34ef24925b855f52fecbc1acc",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- MapBatches(<lambda>) 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f7ae7506eefa4b09a11e0134aa67a756",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 3: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "7a67b1e71cab460dbcbbcb6d22d90a29",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>D</th>\n",
" <th>s</th>\n",
" <th>ms</th>\n",
" <th>us</th>\n",
" <th>ns</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2026-05-01</td>\n",
" <td>2024-01-01 00:14:11</td>\n",
" <td>2024-01-01 00:00:00.851</td>\n",
" <td>2024-01-01 00:00:00.000851</td>\n",
" <td>2024-01-01 00:00:00.000000851</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2025-08-22</td>\n",
" <td>2024-01-01 00:09:59</td>\n",
" <td>2024-01-01 00:00:00.599</td>\n",
" <td>2024-01-01 00:00:00.000599</td>\n",
" <td>2024-01-01 00:00:00.000000599</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2024-12-04</td>\n",
" <td>2024-01-01 00:05:38</td>\n",
" <td>2024-01-01 00:00:00.338</td>\n",
" <td>2024-01-01 00:00:00.000338</td>\n",
" <td>2024-01-01 00:00:00.000000338</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2026-03-17</td>\n",
" <td>2024-01-01 00:13:26</td>\n",
" <td>2024-01-01 00:00:00.806</td>\n",
" <td>2024-01-01 00:00:00.000806</td>\n",
" <td>2024-01-01 00:00:00.000000806</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2025-05-25</td>\n",
" <td>2024-01-01 00:08:30</td>\n",
" <td>2024-01-01 00:00:00.510</td>\n",
" <td>2024-01-01 00:00:00.000510</td>\n",
" <td>2024-01-01 00:00:00.000000510</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" D s ms \\\n",
"0 2026-05-01 2024-01-01 00:14:11 2024-01-01 00:00:00.851 \n",
"1 2025-08-22 2024-01-01 00:09:59 2024-01-01 00:00:00.599 \n",
"2 2024-12-04 2024-01-01 00:05:38 2024-01-01 00:00:00.338 \n",
"3 2026-03-17 2024-01-01 00:13:26 2024-01-01 00:00:00.806 \n",
"4 2025-05-25 2024-01-01 00:08:30 2024-01-01 00:00:00.510 \n",
"\n",
" us ns \n",
"0 2024-01-01 00:00:00.000851 2024-01-01 00:00:00.000000851 \n",
"1 2024-01-01 00:00:00.000599 2024-01-01 00:00:00.000000599 \n",
"2 2024-01-01 00:00:00.000338 2024-01-01 00:00:00.000000338 \n",
"3 2024-01-01 00:00:00.000806 2024-01-01 00:00:00.000000806 \n",
"4 2024-01-01 00:00:00.000510 2024-01-01 00:00:00.000000510 "
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_batches_ds = ds.map_batches(lambda x: x)\n",
"map_batches_ds_df = map_batches_ds.take_batch(num_samples, batch_format=\"pandas\")\n",
"map_batches_ds_df"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:23,131\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:23,131\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(<lambda>)] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "69b9d68e14bb4c06ba789c16fd0b88b6",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f62c45e7b7dc48bea2ae01774fd2ed3a",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- Map(<lambda>) 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1034b5f238cf4697914bfc0e31e2ff9c",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 3: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1f76479c48cf47809562628e6b83b503",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>D</th>\n",
" <th>s</th>\n",
" <th>ms</th>\n",
" <th>us</th>\n",
" <th>ns</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2026-05-01</td>\n",
" <td>2024-01-01 00:14:11</td>\n",
" <td>2024-01-01 00:00:00.851</td>\n",
" <td>2024-01-01 00:00:00.000851</td>\n",
" <td>2024-01-01</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2025-08-22</td>\n",
" <td>2024-01-01 00:09:59</td>\n",
" <td>2024-01-01 00:00:00.599</td>\n",
" <td>2024-01-01 00:00:00.000599</td>\n",
" <td>2024-01-01</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>2024-12-04</td>\n",
" <td>2024-01-01 00:05:38</td>\n",
" <td>2024-01-01 00:00:00.338</td>\n",
" <td>2024-01-01 00:00:00.000338</td>\n",
" <td>2024-01-01</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2026-03-17</td>\n",
" <td>2024-01-01 00:13:26</td>\n",
" <td>2024-01-01 00:00:00.806</td>\n",
" <td>2024-01-01 00:00:00.000806</td>\n",
" <td>2024-01-01</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2025-05-25</td>\n",
" <td>2024-01-01 00:08:30</td>\n",
" <td>2024-01-01 00:00:00.510</td>\n",
" <td>2024-01-01 00:00:00.000510</td>\n",
" <td>2024-01-01</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" D s ms \\\n",
"0 2026-05-01 2024-01-01 00:14:11 2024-01-01 00:00:00.851 \n",
"1 2025-08-22 2024-01-01 00:09:59 2024-01-01 00:00:00.599 \n",
"2 2024-12-04 2024-01-01 00:05:38 2024-01-01 00:00:00.338 \n",
"3 2026-03-17 2024-01-01 00:13:26 2024-01-01 00:00:00.806 \n",
"4 2025-05-25 2024-01-01 00:08:30 2024-01-01 00:00:00.510 \n",
"\n",
" us ns \n",
"0 2024-01-01 00:00:00.000851 2024-01-01 \n",
"1 2024-01-01 00:00:00.000599 2024-01-01 \n",
"2 2024-01-01 00:00:00.000338 2024-01-01 \n",
"3 2024-01-01 00:00:00.000806 2024-01-01 \n",
"4 2024-01-01 00:00:00.000510 2024-01-01 "
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"map_ds = ds.map(lambda x: x)\n",
"map_ds_df = map_ds.take_batch(num_samples, batch_format=\"pandas\")\n",
"map_ds_df\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Nested Dataframe"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# Create random timestamps with different precisions using pandas first\n",
"base_timestamp = pd.Timestamp(\"2024-01-01\")\n",
"random_offsets = np.random.randint(0, 1000, size=num_samples)\n",
"\n",
"# Create pandas timestamps with different precisions\n",
"d_timestamps = [base_timestamp + pd.Timedelta(days=offset) for offset in random_offsets]\n",
"s_timestamps = [base_timestamp + pd.Timedelta(seconds=offset) for offset in random_offsets]\n",
"ms_timestamps = [base_timestamp + pd.Timedelta(milliseconds=offset) for offset in random_offsets]\n",
"us_timestamps = [base_timestamp + pd.Timedelta(microseconds=offset) for offset in random_offsets]\n",
"ns_timestamps = [base_timestamp + pd.Timedelta(nanoseconds=offset) for offset in random_offsets]\n",
"\n",
"# Convert to pyarrow arrays\n",
"d_array = pa.array(d_timestamps, type=pa.date32())\n",
"s_array = pa.array(s_timestamps, type=pa.timestamp(\"s\"))\n",
"ms_array = pa.array(ms_timestamps, type=pa.timestamp(\"ms\"))\n",
"us_array = pa.array(us_timestamps, type=pa.timestamp(\"us\"))\n",
"ns_array = pa.array(ns_timestamps, type=pa.timestamp(\"ns\"))\n",
"\n",
"# Create struct array\n",
"struct_array = pa.StructArray.from_arrays(\n",
" [d_array, s_array, ms_array, us_array, ns_array],\n",
" fields=[\n",
" pa.field(\"D\", pa.date32()),\n",
" pa.field(\"s\", pa.timestamp(\"s\")),\n",
" pa.field(\"ms\", pa.timestamp(\"ms\")),\n",
" pa.field(\"us\", pa.timestamp(\"us\")),\n",
" pa.field(\"ns\", pa.timestamp(\"ns\")),\n",
" ],\n",
")\n",
"\n",
"# Create table\n",
"table = pa.Table.from_arrays([struct_array], [\"timestamps\"])"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'D': <pyarrow.Date32Scalar: datetime.date(2024, 9, 16)>,\n",
" 'ms': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.259'>,\n",
" 'ns': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.000000259'>,\n",
" 's': <pyarrow.TimestampScalar: '2024-01-01T00:04:19'>,\n",
" 'us': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.000259'>}\n"
]
}
],
"source": [
"pprint(dict(table[\"timestamps\"][0]))"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"parquet_file_2 = \"test_2.parquet\"\n",
"pq.write_table(table, parquet_file_2)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"pyarrow.Table\n",
"timestamps: struct<D: date32[day], s: timestamp[ms], ms: timestamp[ms], us: timestamp[us], ns: timestamp[ns]>\n",
" child 0, D: date32[day]\n",
" child 1, s: timestamp[ms]\n",
" child 2, ms: timestamp[ms]\n",
" child 3, us: timestamp[us]\n",
" child 4, ns: timestamp[ns]\n",
"----\n",
"timestamps: [\n",
" -- is_valid: all not null\n",
" -- child 0 type: date32[day]\n",
"[2024-09-16,2025-07-31,2025-11-03,2026-05-26,2025-10-09]\n",
" -- child 1 type: timestamp[ms]\n",
"[2024-01-01 00:04:19.000,2024-01-01 00:09:37.000,2024-01-01 00:11:12.000,2024-01-01 00:14:36.000,2024-01-01 00:10:47.000]\n",
" -- child 2 type: timestamp[ms]\n",
"[2024-01-01 00:00:00.259,2024-01-01 00:00:00.577,2024-01-01 00:00:00.672,2024-01-01 00:00:00.876,2024-01-01 00:00:00.647]\n",
" -- child 3 type: timestamp[us]\n",
"[2024-01-01 00:00:00.000259,2024-01-01 00:00:00.000577,2024-01-01 00:00:00.000672,2024-01-01 00:00:00.000876,2024-01-01 00:00:00.000647]\n",
" -- child 4 type: timestamp[ns]\n",
"[2024-01-01 00:00:00.000000259,2024-01-01 00:00:00.000000577,2024-01-01 00:00:00.000000672,2024-01-01 00:00:00.000000876,2024-01-01 00:00:00.000000647]]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table_2 = pq.read_table(parquet_file_2)\n",
"table_2"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'D': <pyarrow.Date32Scalar: datetime.date(2024, 9, 16)>,\n",
" 'ms': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.259'>,\n",
" 'ns': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.000000259'>,\n",
" 's': <pyarrow.TimestampScalar: '2024-01-01T00:04:19.000'>,\n",
" 'us': <pyarrow.TimestampScalar: '2024-01-01T00:00:00.000259'>}\n"
]
}
],
"source": [
"pprint(dict(table_2[\"timestamps\"][0]))"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "9e807375ab984ebfa3cd8ebc297ec529",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Parquet Files Sample 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:23,336\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:23,336\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "b1345fb2084843f0b514ba9892c695e7",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "fafd5850162b4384b30707d2eb2617ef",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0e606e9a1ce24e34a82d6d6ee93a0edc",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>timestamps</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>{'D': 2024-09-16, 's': 2024-01-01 00:04:19, 'm...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>{'D': 2025-07-31, 's': 2024-01-01 00:09:37, 'm...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>{'D': 2025-11-03, 's': 2024-01-01 00:11:12, 'm...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>{'D': 2026-05-26, 's': 2024-01-01 00:14:36, 'm...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>{'D': 2025-10-09, 's': 2024-01-01 00:10:47, 'm...</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" timestamps\n",
"0 {'D': 2024-09-16, 's': 2024-01-01 00:04:19, 'm...\n",
"1 {'D': 2025-07-31, 's': 2024-01-01 00:09:37, 'm...\n",
"2 {'D': 2025-11-03, 's': 2024-01-01 00:11:12, 'm...\n",
"3 {'D': 2026-05-26, 's': 2024-01-01 00:14:36, 'm...\n",
"4 {'D': 2025-10-09, 's': 2024-01-01 00:10:47, 'm..."
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds_2 = ray.data.read_parquet(parquet_file_2)\n",
"ds_2_df = ds_2.take_batch(num_samples, batch_format=\"pandas\")\n",
"ds_2_df"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'D': datetime.date(2024, 9, 16),\n",
" 'ms': datetime.datetime(2024, 1, 1, 0, 0, 0, 259000),\n",
" 'ns': 1704067200000000259,\n",
" 's': datetime.datetime(2024, 1, 1, 0, 4, 19),\n",
" 'us': datetime.datetime(2024, 1, 1, 0, 0, 0, 259)}\n"
]
}
],
"source": [
"pprint(ds_2_df[\"timestamps\"][0])"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-12-16 22:29:23,418\tINFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-12-16_22-29-19_508831_9390/logs/ray-data.log\n",
"2024-12-16 22:29:23,419\tINFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=5]\n",
"\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "3a2091adf475407eae763298340db577",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- ReadParquet->SplitBlocks(22) 1: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "e128af6ab8ea42c28882a31632541618",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"- limit=5 2: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "1a2cc9a9ca0f4fe5adcdd45cf9c3195b",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Running 0: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'D': datetime.date(2024, 9, 16),\n",
" 'ms': datetime.datetime(2024, 1, 1, 0, 0, 0, 259000),\n",
" 'ns': 1704067200000000259,\n",
" 's': datetime.datetime(2024, 1, 1, 0, 4, 19),\n",
" 'us': datetime.datetime(2024, 1, 1, 0, 0, 0, 259)}\n"
]
}
],
"source": [
"ds_2_batch = ds_2.take_batch(num_samples)\n",
"pprint(ds_2_batch[\"timestamps\"][0])\n"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'D': datetime.date(2024, 9, 16),\n",
" 'ms': datetime.datetime(2024, 1, 1, 0, 0, 0, 259000),\n",
" 'ns': 1704067200000000259,\n",
" 's': datetime.datetime(2024, 1, 1, 0, 4, 19),\n",
" 'us': datetime.datetime(2024, 1, 1, 0, 0, 0, 259)}\n"
]
}
],
"source": [
"df_2 = table_2.to_pandas()\n",
"pprint(df_2[\"timestamps\"][0])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment