Skip to content

Instantly share code, notes, and snippets.

@agoose77
Created May 20, 2021 08:47
Show Gist options
  • Save agoose77/9cda3b131df81b2f8691285dbcdd5e24 to your computer and use it in GitHub Desktop.
Save agoose77/9cda3b131df81b2f8691285dbcdd5e24 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "ed9c49be-382f-4c4d-8955-7c681369a056",
"metadata": {},
"outputs": [],
"source": [
"%load_ext line_profiler"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "bfc239fe-a880-4356-b79c-2ad4163a2489",
"metadata": {},
"outputs": [],
"source": [
"import mmap\n",
"import struct\n",
"import sys\n",
"import typing\n",
"\n",
"import awkward as ak\n",
"import numba as nb\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"id": "b9e46cbe-871e-4635-9c67-6ce0f010d4bb",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "80f02bfb-2c6e-4947-baef-4be6b9077f5a",
"metadata": {},
"outputs": [],
"source": [
"record_header_dtype = np.dtype(\n",
" [\n",
" (\"id\", \"S8\"),\n",
" (\"sequence\", \"u4\"),\n",
" (\"stream\", \"u2\"),\n",
" (\"tape\", \"u2\"),\n",
" (\"tape_endian\", \"u2\"),\n",
" (\"data_endian\", \"u2\"),\n",
" (\"data_len\", \"u4\"),\n",
" ]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e3b5e886-502b-4bfb-94fd-92be1dfecb77",
"metadata": {},
"outputs": [],
"source": [
"event_header_dtype = np.dtype(\n",
" [\n",
" (\"start_end_token\", \"u2\"),\n",
" (\"length\", \"u2\"),\n",
" ]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 198,
"id": "8ffd751a-89e5-4647-87f2-e7b0257a8f8d",
"metadata": {},
"outputs": [],
"source": [
"event_data_dtype = np.dtype(\n",
" [\n",
" (\"adc\", \"u1\"),\n",
" (\"bin\", \"u1\"),\n",
" (\"value\", \"u2\"),\n",
" ]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 199,
"id": "e7814fe0-c789-4e5c-be1e-42144fcb47d3",
"metadata": {},
"outputs": [],
"source": [
"def swap_byteorder(dtype, *fields):\n",
" names = set(fields)\n",
" return np.dtype(\n",
" [\n",
" (n, (v.newbyteorder() if n in names else v), o)\n",
" for n, (v, o) in dtype.fields.items()\n",
" ]\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 200,
"id": "3358976d-1195-45e3-94a2-9ce8351d25a8",
"metadata": {},
"outputs": [],
"source": [
"@nb.njit\n",
"def handle_bytes(memory, dtype):\n",
" return np.frombuffer(memory, dtype)"
]
},
{
"cell_type": "code",
"execution_count": 201,
"id": "a7e6c4a2-3959-457f-bb10-a7d064dcdda6",
"metadata": {},
"outputs": [],
"source": [
"ev = np.empty(3, event_data_dtype)"
]
},
{
"cell_type": "code",
"execution_count": 202,
"id": "248a9dc4-bf28-4d9b-899e-963f569902fa",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([(16, 166, 21920), (53, 86, 0), ( 0, 0, 0)],\n",
" dtype=[('adc', 'u1'), ('bin', 'u1'), ('value', '<u2')])"
]
},
"execution_count": 202,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"handle_bytes(ev, ev.dtype)"
]
},
{
"cell_type": "code",
"execution_count": 203,
"id": "123ee13c-1a06-41a8-b68f-3da7c8fc8a2a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<_struct.Struct at 0x7efc4098c6b0>"
]
},
"execution_count": 203,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"record_header_id_struct = struct.Struct(\"8s\")\n",
"record_header_id_struct"
]
},
{
"cell_type": "code",
"execution_count": 204,
"id": "9409e6f9-4dbc-4e56-a84b-3e6685861471",
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"def iterate(f, chunk_size=1024):\n",
" contents = np.empty(1500000, dtype=event_data_dtype)\n",
"\n",
" event_header_size = event_header_dtype.itemsize\n",
" event_data_size = event_data_dtype.itemsize\n",
"\n",
" try:\n",
" # num of events\n",
" with mmap.mmap(\n",
" f.fileno(), 0, flags=mmap.ACCESS_READ, prot=mmap.PROT_READ\n",
" ) as memory:\n",
" # We will be searching\n",
" memory.madvise(mmap.MADV_SEQUENTIAL)\n",
" # Create a view over the memory map\n",
" data = memoryview(memory)\n",
" # Keep track of where we are\n",
" source_cursor = 0\n",
" dest_cursor = 0\n",
" num = []\n",
"\n",
" while True:\n",
" # Find record header\n",
" while True:\n",
" try:\n",
" (record_header,) = np.frombuffer(\n",
" data,\n",
" dtype=record_header_dtype,\n",
" count=1,\n",
" offset=source_cursor,\n",
" )\n",
" except ValueError:\n",
" return contents[:dest_cursor], num\n",
"\n",
" if record_header[\"id\"] == b\"EBYEDATA\":\n",
" break\n",
"\n",
" # Move to next 1024 aligned region\n",
" source_cursor = (source_cursor & ~0x3FF) + 0x400\n",
"\n",
" # Commit the header read here\n",
" source_cursor += record_header_dtype.itemsize\n",
" \n",
"# yield locals()\n",
"\n",
" # Swap byte-order if non-native endian\n",
" # Only for non-endian-indicator fields\n",
" if record_header[\"tape_endian\"] != 0x0001:\n",
" record_header = record_header.view(\n",
" swap_byteorder(\n",
" record_header_dtype,\n",
" \"sequence\",\n",
" \"stream\",\n",
" \"tape\",\n",
" \"data_len\",\n",
" )\n",
" )\n",
"\n",
" # Swap byteorder of data if required\n",
" data_byteorder = \"=\" if record_header[\"data_endian\"] == 0 else \"S\"\n",
" record_event_header_dtype = event_header_dtype.newbyteorder(\n",
" data_byteorder\n",
" )\n",
" record_event_data_dtype = event_data_dtype.newbyteorder(data_byteorder)\n",
" event_header_fmt = {\"little\": \">\"}.get(sys.byteorder, \"<\") + \"HH\"\n",
" while True:\n",
" # Read the event header\n",
" # (event_header,) = np.frombuffer(\n",
" # data, record_event_header_dtype, count=1, offset=source_cursor\n",
" # )\n",
" magic, event_length = struct.unpack_from(\n",
" event_header_fmt, data, offset=source_cursor\n",
" )\n",
" source_cursor += event_header_size\n",
"\n",
" # If the length is 0, this is the final event\n",
" if not event_length:\n",
" break\n",
"\n",
" # Determine how many entries to read\n",
" length = event_length - event_header_size\n",
" n_data = length // event_data_size\n",
" num.append(n_data)\n",
"\n",
" # Store data\n",
" next_dest_cursor = dest_cursor + n_data\n",
" contents[dest_cursor:next_dest_cursor] = np.frombuffer(\n",
" data,\n",
" dtype=record_event_data_dtype,\n",
" count=n_data,\n",
" offset=source_cursor,\n",
" )\n",
"\n",
" # Advance cursors\n",
" source_cursor += length\n",
" dest_cursor = next_dest_cursor\n",
" except BufferError:\n",
" return contents[:dest_cursor], num, source_cursor"
]
},
{
"cell_type": "code",
"execution_count": 213,
"id": "c7a23235-ebec-47e6-976e-bf1c04f97660",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.63 s ± 64.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"f = open(\"R366_0\", \"rb\")\n",
"it = iterate(f)"
]
},
{
"cell_type": "code",
"execution_count": 208,
"id": "68a1dbac-92be-4fd1-ba8b-95b5a531f535",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"8355840"
]
},
"execution_count": 208,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"cont, num, cursor = it\n",
"cursor"
]
},
{
"cell_type": "code",
"execution_count": 209,
"id": "6c834513-7bf4-4555-a52a-223479391251",
"metadata": {},
"outputs": [],
"source": [
"arr = ak.zip({k: cont[k] for k in cont.dtype.fields})"
]
},
{
"cell_type": "code",
"execution_count": 210,
"id": "5c476e5f-5eca-4435-a378-38c5ee48a8a7",
"metadata": {},
"outputs": [],
"source": [
"events = ak.unflatten(arr, num)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment