Skip to content

Instantly share code, notes, and snippets.

@Uberi
Last active March 30, 2016 18:03
Show Gist options
  • Save Uberi/a3a92bb011c7f3b0e8dc677c91471a10 to your computer and use it in GitHub Desktop.
Save Uberi/a3a92bb011c7f3b0e8dc677c91471a10 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Debugging [Bug 1248845](https://bugzilla.mozilla.org/show_bug.cgi?id=1248845)\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, confirm that the issue exists and is reproducible."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/hadoop/anaconda2/lib/python2.7/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.\n",
" warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')\n",
"WARNING: "
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Unable to parse whitelist (/home/hadoop/anaconda2/lib/python2.7/site-packages/moztelemetry/bucket-whitelist.json). Assuming all histograms are acceptable.\n",
"Populating the interactive namespace from numpy and matplotlib\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"pylab import has clobbered these variables: ['Annotation', 'Figure']\n",
"`%matplotlib` prevents importing * from pylab and numpy\n"
]
}
],
"source": [
"import ujson as json\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import numpy as np\n",
"import plotly.plotly as py\n",
"from plotly.graph_objs import *\n",
"\n",
"from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client, get_clients_history, get_records\n",
"\n",
"# additional stuff used in debugging\n",
"from moztelemetry.spark import _filter_to_schema, _list_s3_filenames\n",
"from moztelemetry.spark import parse_heka_message\n",
"from telemetry.util.heka_message import unpack, BacktrackableFile, read_one_record, read_until_next, UnpackedRecord\n",
"import struct\n",
"import telemetry.util.message_pb2 as message_pb2\n",
"import snappy\n",
"\n",
"%pylab inline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"records_protobuf_single = get_records(sc, \"telemetry-webrtc\", submissionDate=\"20160101-protobuf-single\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"records_snappy_single = get_records(sc, \"telemetry-webrtc\", submissionDate=\"20160101-snappy-single\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Both counts should be the same, since both versions of the data contain the same pings."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(34329, 3575)"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"records_protobuf_single.count(), records_snappy_single.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We're going to do all the setup stuff that `get_records` does, so later we can play around with the actual records."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import boto\n",
"conn = boto.connect_s3(host=\"s3-us-west-2.amazonaws.com\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"meta = conn.get_bucket(\"net-mozaws-prod-us-west-2-pipeline-metadata\", validate=False)\n",
"_sources = json.loads(meta.get_key(\"sources.json\").get_contents_as_string())"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(u'net-mozaws-prod-us-west-2-pipeline-data', u'telemetry-webrtc/')"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bucket_name = _sources[\"telemetry-webrtc\"][\"bucket\"]\n",
"bucket_prefix = _sources[\"telemetry-webrtc\"][\"prefix\"] + \"/\"\n",
"bucket_name, bucket_prefix"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{u'version': 1, u'dimensions': [{u'field_name': u'submissionDate', u'allowed_values': u'*'}]}\n"
]
}
],
"source": [
"schema = json.loads(meta.get_key(\"telemetry-webrtc/schema.json\").get_contents_as_string())\n",
"print(schema)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"v4 = conn.get_bucket(\"net-mozaws-prod-us-west-2-pipeline-data\", validate=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Okay, now we can retrieve the protobuf version and see if the number of retrieved records actually matches up."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[u'telemetry-webrtc/20160101-protobuf-single/output.log']"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"filter_schema = _filter_to_schema(schema, {\"submissionDate\": \"20160101-protobuf-single\"})\n",
"files = _list_s3_filenames(v4, \"telemetry-webrtc/\", filter_schema)\n",
"files"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-protobuf-single/output.log\")\n",
"key.open_read()\n",
"messages_protobuf = parse_heka_message(key)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"34327\n"
]
}
],
"source": [
"c = 0\n",
"for m in messages_protobuf: c += 1\n",
"print(c)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The protobuf version seems close enough - there's a slight difference somewhere, but it's not important. Let's try the same thing with the Snappy version."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[u'telemetry-webrtc/20160101-snappy-single/output_snappy.log']"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"filter_schema = _filter_to_schema(schema, {\"submissionDate\": \"20160101-snappy-single\"})\n",
"files = _list_s3_filenames(v4, \"telemetry-webrtc/\", filter_schema)\n",
"files"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read()\n",
"messages_snappy = parse_heka_message(key)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"34327\n"
]
}
],
"source": [
"c = 0\n",
"for m in messages_snappy: c += 1\n",
"print(c)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since it seems like we can read Snappy stuff just fine if we do it all in one go, it seems like the chunking is the problem. We're going to debug the chunking by doing that part manually."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"19"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"n_chunks = key.size / (100*(2**20))\n",
"n_chunks"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1838, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1664, 0, 0, 0, 0, 0]\n"
]
}
],
"source": [
"c = []\n",
"for chunk in range(n_chunks):\n",
" start = 100*(2**20)*chunk\n",
" key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
" key.open_read(headers={'Range': \"bytes={}-\".format(start)})\n",
" c.append(0)\n",
" for m in parse_heka_message(key, boundary_bytes=100*(2**20)):\n",
" c[chunk] += 1\n",
"print c"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So there's clearly something up with `parse_heka_message`. We can use the `verbose` flag to help figure out what's going on."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Skipped 315 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 293 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Truncated message.\n",
"Skipped 11 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Truncated string.\n",
"Skipped 27 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Truncated string.\n",
"Skipped 111 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 3 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Skipped 10 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Skipped 127 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 432 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 50 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 459 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Truncated message.\n",
"Skipped 550 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 147 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 8 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 28 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 21 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Skipped 196 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 141 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Skipped 112 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Skipped 82 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Skipped 52 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 167 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 315 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 372 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 42 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 120 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 76 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Truncated message.\n",
"Skipped 174 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 323 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 60 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Skipped 62 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Tag had invalid wire type.\n",
"Skipped 416 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 81 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Truncated message.\n",
"Tag had invalid wire type.\n",
"Skipped 76 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 231 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 70 bytes to find a valid separator\n",
"Unexpected end-group tag.\n",
"Unexpected end-group tag.\n",
"Skipped 67 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 76 bytes to find a valid separator\n",
"Tag had invalid wire type.\n",
"Tag had invalid wire type.\n",
"Skipped 72 bytes to find a valid separator\n",
"Truncated message.\n",
"Truncated message.\n",
"Skipped 22 bytes to find a valid separator\n",
"Processed 0 records\n",
"0\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes={}-\".format(100*(2**20))})\n",
"message = BacktrackableFile(key)\n",
"\n",
"c = 0\n",
"for record, total_bytes in unpack(message, backtrack=True, verbose=True):\n",
" #yield _parse_heka_record(record)\n",
" c += 1\n",
"\n",
" if total_bytes >= 100*(2**20):\n",
" break\n",
"\n",
"print c\n",
"key.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Seems like the issue is with `unpack`."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Skipped 628 bytes to find a valid separator\n"
]
},
{
"ename": "DecodeError",
"evalue": "Tag had invalid wire type.",
"output_type": "error",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[1;31mDecodeError\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m<ipython-input-19-ee817c49699c>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 3\u001b[0m \u001b[0mmessage\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mBacktrackableFile\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 4\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 5\u001b[1;33m \u001b[0mread_one_record\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mmessage\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mraw\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mverbose\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mstrict\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtry_snappy\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/telemetry/util/heka_message.pyc\u001b[0m in \u001b[0;36mread_one_record\u001b[1;34m(input_stream, raw, verbose, strict, try_snappy)\u001b[0m\n\u001b[0;32m 119\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 120\u001b[0m \u001b[0mheader\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mmessage_pb2\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mHeader\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 121\u001b[1;33m \u001b[0mheader\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mParseFromString\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mheader_raw\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 122\u001b[0m \u001b[0munit_separator\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0minput_stream\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mread\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 123\u001b[0m \u001b[0mtotal_bytes\u001b[0m \u001b[1;33m+=\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/google/protobuf/message.pyc\u001b[0m in \u001b[0;36mParseFromString\u001b[1;34m(self, serialized)\u001b[0m\n\u001b[0;32m 184\u001b[0m \"\"\"\n\u001b[0;32m 185\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mClear\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 186\u001b[1;33m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mMergeFromString\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mserialized\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 187\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 188\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mSerializeToString\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/google/protobuf/internal/python_message.pyc\u001b[0m in \u001b[0;36mMergeFromString\u001b[1;34m(self, serialized)\u001b[0m\n\u001b[0;32m 839\u001b[0m \u001b[0mlength\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mserialized\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 840\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 841\u001b[1;33m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_InternalParse\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mserialized\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m0\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mlength\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m!=\u001b[0m \u001b[0mlength\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 842\u001b[0m \u001b[1;31m# The only reason _InternalParse would return early is if it\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 843\u001b[0m \u001b[1;31m# encountered an end-group tag.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/google/protobuf/internal/python_message.pyc\u001b[0m in \u001b[0;36mInternalParse\u001b[1;34m(self, buffer, pos, end)\u001b[0m\n\u001b[0;32m 864\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mfield_decoder\u001b[0m \u001b[1;32mis\u001b[0m \u001b[0mNone\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 865\u001b[0m \u001b[0mvalue_start_pos\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mnew_pos\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 866\u001b[1;33m \u001b[0mnew_pos\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlocal_SkipField\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mbuffer\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mnew_pos\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mend\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtag_bytes\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 867\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mnew_pos\u001b[0m \u001b[1;33m==\u001b[0m \u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 868\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mpos\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/google/protobuf/internal/decoder.pyc\u001b[0m in \u001b[0;36mSkipField\u001b[1;34m(buffer, pos, end, tag_bytes)\u001b[0m\n\u001b[0;32m 825\u001b[0m \u001b[1;31m# The wire type is always in the first byte since varints are little-endian.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 826\u001b[0m \u001b[0mwire_type\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mord\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mtag_bytes\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m&\u001b[0m \u001b[0mwiretype_mask\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 827\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mWIRETYPE_TO_SKIPPER\u001b[0m\u001b[1;33m[\u001b[0m\u001b[0mwire_type\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mbuffer\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mpos\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mend\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 828\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 829\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mSkipField\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/hadoop/anaconda2/lib/python2.7/site-packages/google/protobuf/internal/decoder.pyc\u001b[0m in \u001b[0;36m_RaiseInvalidWireType\u001b[1;34m(buffer, pos, end)\u001b[0m\n\u001b[0;32m 795\u001b[0m \u001b[1;34m\"\"\"Skip function for unknown wire types. Raises an exception.\"\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 796\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 797\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0m_DecodeError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m'Tag had invalid wire type.'\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 798\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 799\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0m_FieldSkipper\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;31mDecodeError\u001b[0m: Tag had invalid wire type."
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=1-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"read_one_record(message, raw=False, verbose=True, strict=False, try_snappy=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Huh, that's weird, shouldn't it be trying Snappy before giving up? Time to debug `read_one_record`."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Skipped 0 bytes to find a valid separator\n",
"Header length 4\n",
"Message length 52459\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=0-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"#read_one_record(message, raw=False, verbose=True, strict=False, try_snappy=True)\n",
"total_bytes = 0\n",
"skipped, eof = read_until_next(message, 0x1e)\n",
"total_bytes += skipped\n",
"\n",
"# we've read one separator (plus anything we skipped)\n",
"total_bytes += 1\n",
"\n",
"print \"Skipped\", skipped, \"bytes to find a valid separator\"\n",
"\n",
"raw_record = struct.pack(\"<B\", 0x1e)\n",
"\n",
"header_length_raw = message.read(1)\n",
"total_bytes += 1\n",
"raw_record += header_length_raw\n",
"\n",
"(header_length,) = struct.unpack('<B', header_length_raw)\n",
"print \"Header length\", header_length\n",
"\n",
"header_raw = message.read(header_length)\n",
"total_bytes += header_length\n",
"raw_record += header_raw\n",
"\n",
"header = message_pb2.Header()\n",
"header.ParseFromString(header_raw)\n",
"\n",
"print \"Message length\", header.message_length"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from cStringIO import StringIO\n",
"_record_separator = 0x1e\n",
"class BacktrackableFile:\n",
" def __init__(self, stream):\n",
" self._stream = stream\n",
" self._buffer = StringIO()\n",
" self._position = 0\n",
"\n",
" def read(self, size):\n",
" self._position += size\n",
" buffer_data = self._buffer.read(size)\n",
" to_read = size - len(buffer_data)\n",
"\n",
" assert to_read >= 0, \"Read data must be equal or smaller to requested data\"\n",
" if to_read == 0:\n",
" return buffer_data\n",
"\n",
" stream_data = self._stream.read(to_read)\n",
" self._buffer.write(stream_data)\n",
"\n",
" return buffer_data + stream_data\n",
"\n",
" def close(self):\n",
" self._buffer.close()\n",
" if type(self._stream) == boto.s3.key.Key:\n",
" if self._stream.resp: # Hack! Connections are kept around otherwise!\n",
" self._stream.resp.close()\n",
"\n",
" self._stream.close(True)\n",
" else:\n",
" self._stream.close()\n",
"\n",
" def backtrack(self):\n",
" buffer = self._buffer.getvalue()\n",
" index = buffer.find(chr(_record_separator), 1)\n",
" self._position += index - len(buffer) + 1\n",
"\n",
" self._buffer = StringIO()\n",
" if index >= 0:\n",
" self._buffer.write(buffer[index + 1:])\n",
" self._buffer.seek(0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you look carefully at the code for `BacktrackableFile`, there's an off by one in `backtrack`! The code above has it fixed."
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"collapsed": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Skip 647 to 648\n",
"Skip 747 to 1396\n",
"Skip 442 to 1839\n",
"Skip 443 to 2283\n",
"Skip 165 to 2449\n",
"Skip 302 to 2752\n",
"Skip 130 to 2883\n",
"Skip 1332 to 4216\n",
"Skip 264 to 4481\n",
"Skip 22 to 4504\n",
"Skip 166 to 4671\n",
"Skip 190 to 4862\n",
"Skip 94 to 4957\n",
"Skip 780 to 5738\n",
"Skip 736 to 6475\n",
"Skip 76 to 6552\n",
"Skip 14 to 6567\n",
"Skip 737 to 7146\n",
"Skip 1 to 7148\n",
"Skip 8 to 6986\n",
"Skip 98 to 6891\n",
"Skip 88 to 6980\n",
"Skip 12 to 6911\n",
"Skip 61 to 6905\n",
"Skip 7 to 6913\n",
"Skip 124 to 7038\n",
"Skip 102 to 7117\n",
"Skip 44 to 7162\n",
"Skip 263 to 7426\n",
"Skip 79 to 7506\n",
"Skip 281 to 7788\n",
"Skip 87 to 7876\n",
"Skip 125 to 7907\n",
"Skip 18 to 7926\n",
"Skip 403 to 8330\n",
"Skip 15 to 8346\n",
"Skip 299 to 8646\n",
"Skip 2 to 8649\n",
"Skip 460 to 9024\n",
"Skip 265 to 9290\n",
"Skip 24 to 9315\n",
"Skip 77 to 9393\n",
"Skip 86 to 9371\n",
"Skip 216 to 9567\n",
"Skip 135 to 9703\n",
"Skip 15 to 9685\n",
"Skip 17 to 9703\n",
"Skip 33 to 9622\n",
"Skip 105 to 9647\n",
"Skip 57 to 9705\n",
"Skip 35 to 9712\n",
"Skip 213 to 9881\n",
"Skip 40 to 9922\n",
"Skip 429 to 10352\n",
"Skip 403 to 10756\n",
"Skip 10 to 10767\n",
"Skip 44 to 10812\n",
"Skip 1 to 10814\n",
"Skip 299 to 11104\n",
"Skip 165 to 11270\n",
"Skip 115 to 11386\n",
"Skip 21 to 11408\n",
"Skip 7 to 11416\n",
"Skip 169 to 11586\n",
"Skip 42 to 11629\n",
"Skip 824 to 12344\n",
"Skip 217 to 12562\n",
"Skip 38 to 12601\n",
"Skip 126 to 12707\n",
"Skip 116 to 12824\n",
"Skip 10 to 12835\n",
"Skip 86 to 12722\n",
"Skip 114 to 12728\n",
"Skip 19 to 12748\n",
"Skip 135 to 12884\n",
"Skip 52 to 12937\n",
"Skip 249 to 13187\n",
"Skip 35 to 13223\n",
"Skip 80 to 13304\n",
"Skip 87 to 13392\n",
"Skip 257 to 13650\n",
"Skip 153 to 13804\n",
"Skip 157 to 13962\n",
"Skip 90 to 14053\n",
"Skip 183 to 14210\n",
"Skip 503 to 14714\n",
"Skip 206 to 14921\n",
"Skip 58 to 14980\n",
"Skip 465 to 15446\n",
"Skip 36 to 15483\n",
"Skip 4 to 15461\n",
"Skip 157 to 15601\n",
"Skip 225 to 15827\n",
"Skip 278 to 16106\n",
"Skip 158 to 16265\n",
"Skip 19 to 16285\n",
"Skip 392 to 16676\n",
"Skip 27 to 16704\n",
"Skip 164 to 16869\n",
"Skip 12 to 16882\n",
"Skip 66 to 16828\n",
"Skip 80 to 16784\n",
"Skip 220 to 16949\n",
"Skip 265 to 17215\n",
"Skip 10 to 17226\n",
"Skip 25 to 17252\n",
"Skip 0 to 17253\n",
"Skip 320 to 17618\n",
"Skip 57 to 17676\n",
"Skip 37 to 17714\n",
"Skip 324 to 18039\n",
"Skip 38 to 18078\n",
"Skip 130 to 18209\n",
"Skip 7 to 18135\n",
"Skip 192 to 18328\n",
"Skip 236 to 18565\n",
"Skip 116 to 18682\n",
"Skip 146 to 18829\n",
"Skip 35 to 18865\n",
"Skip 198 to 19007\n",
"Skip 629 to 19637\n",
"Skip 128 to 19766\n",
"Skip 8 to 19775\n",
"Skip 455 to 20231\n",
"Skip 360 to 20592\n",
"Skip 339 to 20932\n",
"Skip 86 to 21019\n",
"Skip 158 to 21178\n",
"Skip 294 to 21473\n",
"Skip 5 to 21479\n",
"Skip 23 to 21503\n",
"Skip 36 to 21527\n",
"Skip 284 to 21812\n",
"Skip 77 to 21890\n",
"Skip 178 to 22069\n",
"Skip 48 to 22118\n",
"Skip 37 to 22124\n",
"Skip 119 to 22149\n",
"Skip 66 to 22203\n",
"Skip 186 to 22390\n",
"Skip 114 to 22505\n",
"Skip 554 to 23060\n",
"Skip 71 to 23132\n",
"Skip 59 to 23185\n",
"Skip 171 to 23357\n",
"Skip 50 to 23408\n",
"Skip 111 to 23520\n",
"Skip 247 to 23768\n",
"Skip 15 to 23784\n",
"Skip 670 to 24239\n",
"Skip 459 to 24699\n",
"Skip 808 to 25508\n",
"Skip 334 to 25843\n",
"Skip 10 to 25854\n",
"Skip 22 to 25839\n",
"Skip 20 to 25726\n",
"Skip 155 to 25882\n",
"Skip 418 to 26301\n",
"Skip 205 to 26507\n",
"Skip 347 to 26855\n",
"Skip 219 to 27075\n",
"Skip 1614 to 28690\n",
"Skip 6 to 28697\n",
"Skip 346 to 28901\n",
"Skip 110 to 29012\n",
"Skip 547 to 29560\n",
"Skip 257 to 29818\n",
"Skip 82 to 29901\n",
"Skip 84 to 29986\n",
"Skip 43 to 30030\n",
"Skip 307 to 30231\n",
"Skip 108 to 30340\n",
"Skip 109 to 30450\n",
"Skip 172 to 30623\n",
"Skip 190 to 30814\n",
"Skip 46 to 30861\n",
"Skip 77 to 30939\n",
"Skip 110 to 31050\n",
"Skip 78 to 31129\n",
"Skip 16 to 31146\n",
"Skip 71 to 31218\n",
"Skip 18 to 31237\n",
"Skip 97 to 31285\n",
"Skip 202 to 31401\n",
"Skip 1 to 31403\n",
"Skip 291 to 31483\n",
"Skip 445 to 31929\n",
"Skip 9 to 31939\n",
"Skip 262 to 32195\n",
"Skip 195 to 32391\n",
"Skip 17 to 32409\n",
"Skip 8 to 32418\n",
"Skip 684 to 33103\n",
"Skip 75 to 33179\n",
"Skip 91 to 33271\n",
"Skip 39 to 33296\n",
"Skip 70 to 33367\n",
"Skip 105 to 33473\n",
"Skip 33 to 33507\n",
"Skip 145 to 33586\n",
"Skip 15 to 33602\n",
"Skip 145 to 33748\n",
"Skip 105 to 33780\n",
"Skip 401 to 34182\n",
"Skip 321 to 34504\n",
"Skip 424 to 34929\n",
"Skip 29 to 34959\n",
"Skip 427 to 35387\n",
"Skip 45 to 35433\n",
"Skip 15 to 35449\n",
"Skip 20 to 35470\n",
"Skip 36 to 35507\n",
"Skip 233 to 35741\n",
"Skip 77 to 35819\n",
"Skip 240 to 36060\n",
"Skip 291 to 36352\n",
"Skip 315 to 36668\n",
"Skip 77 to 36746\n",
"Skip 50 to 36797\n",
"Skip 48 to 36846\n",
"Skip 2 to 36849\n",
"Skip 2 to 36711\n",
"Skip 110 to 36695\n",
"Skip 3 to 36576\n",
"Skip 24 to 36601\n",
"Skip 184 to 36786\n",
"Skip 252 to 37039\n",
"Skip 46 to 37086\n",
"Skip 224 to 37311\n",
"Skip 99 to 37411\n",
"Skip 132 to 37544\n",
"Skip 68 to 37613\n",
"Skip 264 to 37878\n",
"Skip 213 to 38092\n",
"Skip 83 to 38176\n",
"Skip 67 to 38244\n",
"Skip 40 to 38285\n",
"Skip 75 to 38361\n",
"Skip 323 to 38685\n",
"Skip 755 to 39441\n",
"Skip 121 to 39563\n",
"Skip 200 to 39764\n",
"Skip 3 to 39768\n",
"Skip 122 to 39891\n",
"Skip 23 to 39915\n",
"Skip 139 to 39950\n",
"Skip 39 to 39990\n",
"Skip 231 to 40122\n",
"Skip 173 to 40296\n",
"Skip 132 to 40429\n",
"Skip 252 to 40682\n",
"Skip 208 to 40891\n",
"Skip 23 to 40915\n",
"Skip 128 to 41014\n",
"Skip 59 to 41074\n",
"Skip 67 to 41052\n",
"Skip 34 to 41052\n",
"Skip 19 to 41072\n",
"Skip 168 to 41241\n",
"Skip 50 to 41292\n",
"Skip 275 to 41534\n",
"Skip 133 to 41668\n",
"Skip 9 to 41678\n",
"Skip 52 to 41731\n",
"Skip 59 to 41669\n",
"Skip 242 to 41848\n",
"Skip 33 to 41882\n",
"Skip 8 to 41891\n",
"Skip 4 to 41896\n",
"Skip 124 to 41956\n",
"Skip 158 to 42115\n",
"Skip 47 to 42163\n",
"Skip 277 to 42441\n",
"Skip 164 to 42606\n",
"Skip 294 to 42901\n",
"Skip 137 to 43039\n",
"Skip 394 to 43434\n",
"Skip 38 to 43473\n",
"Skip 318 to 43667\n",
"Skip 58 to 43726\n",
"Skip 28 to 43738\n",
"Skip 78 to 43817\n",
"Skip 109 to 43925\n",
"Skip 507 to 44433\n",
"Skip 94 to 44528\n",
"Skip 93 to 44622\n",
"Skip 52 to 44675\n",
"Skip 17 to 44693\n",
"Skip 230 to 44924\n",
"Skip 191 to 45116\n",
"Skip 224 to 45298\n",
"Skip 9 to 45308\n",
"Skip 249 to 45558\n",
"Skip 202 to 45761\n",
"Skip 2 to 45764\n",
"Skip 16 to 45781\n",
"Skip 570 to 46352\n",
"Skip 26 to 46379\n",
"Skip 196 to 46576\n",
"Skip 6 to 46583\n",
"Skip 72 to 46656\n",
"Skip 186 to 46843\n",
"Skip 16 to 46860\n",
"Skip 211 to 46892\n",
"Skip 127 to 47020\n",
"Skip 213 to 47131\n",
"Skip 205 to 47337\n",
"Skip 9 to 47347\n",
"Skip 104 to 47382\n",
"Skip 2 to 47331\n",
"Skip 9 to 47341\n",
"Skip 9 to 47241\n",
"Skip 4 to 47186\n",
"Skip 251 to 47394\n",
"Skip 161 to 47556\n",
"Skip 251 to 47808\n",
"Skip 45 to 47854\n",
"Skip 62 to 47917\n",
"Skip 32 to 47950\n",
"Skip 312 to 48263\n",
"Skip 10 to 48274\n",
"Skip 231 to 48434\n",
"Skip 676 to 49111\n",
"Skip 532 to 49644\n",
"Skip 444 to 50089\n",
"Skip 350 to 50440\n",
"Skip 142 to 50583\n",
"Skip 44 to 50546\n",
"Skip 93 to 50640\n",
"Skip 45 to 50597\n",
"Skip 0 to 50520\n",
"Skip 159 to 50580\n",
"Skip 148 to 50729\n",
"Skip 1073 to 51791\n",
"Skip 40 to 51832\n",
"Skip 23 to 51856\n",
"Skip 36 to 51893\n",
"Skip 81 to 51975\n",
"Skip 617 to 52593\n",
"Skip 38 to 52632\n",
"Skip 767 to 53384\n",
"Skip 143 to 53528\n",
"Skip 204 to 53733\n",
"Skip 505 to 54239\n",
"Skip 9 to 54249\n",
"Skip 224 to 54474\n",
"Skip 18 to 54493\n",
"Skip 57 to 54551\n",
"Skip 0 to 54437\n",
"Skip 73 to 54431\n",
"Skip 1 to 54433\n",
"Skip 29 to 54463\n",
"Skip 2 to 54295\n",
"Skip 51 to 54179\n",
"Skip 51 to 54231\n",
"Skip 427 to 54582\n",
"Skip 14 to 54597\n",
"Skip 27 to 54625\n",
"Skip 46 to 54660\n",
"Skip 178 to 54839\n",
"Skip 12 to 54852\n",
"Skip 205 to 55058\n",
"Skip 47 to 55106\n",
"Skip 34 to 55141\n",
"Skip 143 to 55285\n",
"Skip 4 to 55290\n",
"Skip 93 to 55384\n",
"Skip 83 to 55410\n",
"Skip 104 to 55515\n",
"Skip 23 to 55448\n",
"Skip 67 to 55516\n",
"Skip 56 to 55438\n",
"Skip 372 to 55696\n",
"Skip 89 to 55786\n",
"Skip 140 to 55927\n",
"Skip 33 to 55961\n",
"Skip 179 to 56141\n",
"Skip 400 to 56531\n",
"Skip 23 to 56555\n",
"Skip 66 to 56536\n",
"Skip 82 to 56619\n",
"Skip 0 to 56479\n",
"Skip 16 to 56456\n",
"Skip 37 to 56494\n",
"Skip 41 to 56490\n",
"Skip 2 to 56471\n",
"Skip 170 to 56642\n",
"Skip 61 to 56687\n",
"Skip 64 to 56752\n",
"Skip 132 to 56885\n",
"Skip 227 to 57113\n",
"Skip 20 to 57134\n",
"Skip 525 to 57660\n",
"Skip 239 to 57900\n",
"Skip 221 to 58122\n",
"Skip 565 to 58688\n",
"Skip 593 to 59282\n",
"Skip 239 to 59522\n",
"==============MESSAGE OF SIZE 54309\n",
"Skip 0 to 113838\n",
"==============MESSAGE OF SIZE 64102\n",
"Skip 0 to 177947\n",
"==============MESSAGE OF SIZE 54440\n",
"Skip 0 to 232394\n",
"==============MESSAGE OF SIZE 48243\n",
"Skip 0 to 280644\n",
"==============MESSAGE OF SIZE 27954\n",
"Skip 0 to 308605\n",
"==============MESSAGE OF SIZE 61612\n",
"Skip 0 to 370224\n",
"==============MESSAGE OF SIZE 47316\n",
"Skip 0 to 417547\n",
"==============MESSAGE OF SIZE 54494\n",
"Skip 0 to 472048\n",
"==============MESSAGE OF SIZE 63450\n",
"Skip 0 to 535505\n",
"==============MESSAGE OF SIZE 76108\n",
"Skip 0 to 611620\n",
"==============MESSAGE OF SIZE 53986\n",
"Skip 0 to 665613\n",
"==============MESSAGE OF SIZE 40890\n",
"Skip 0 to 706510\n",
"==============MESSAGE OF SIZE 43557\n",
"Skip 0 to 750074\n",
"==============MESSAGE OF SIZE 50527\n",
"Skip 0 to 800608\n",
"==============MESSAGE OF SIZE 70798\n",
"Skip 0 to 871413\n",
"==============MESSAGE OF SIZE 44865\n",
"Skip 0 to 916285\n",
"==============MESSAGE OF SIZE 86664\n",
"Skip 0 to 1002956\n",
"==============MESSAGE OF SIZE 60093\n",
"Skip 0 to 1063056\n",
"==============MESSAGE OF SIZE 71373\n",
"Skip 0 to 1134436\n",
"==============MESSAGE OF SIZE 56364\n",
"Skip 0 to 1190807\n",
"==============MESSAGE OF SIZE 48416\n",
"Skip 0 to 1239230\n",
"==============MESSAGE OF SIZE 50397\n",
"Skip 0 to 1289634\n",
"==============MESSAGE OF SIZE 50188\n",
"Skip 0 to 1339829\n",
"==============MESSAGE OF SIZE 68152\n",
"Skip 0 to 1407988\n",
"==============MESSAGE OF SIZE 77064\n",
"Skip 0 to 1485059\n",
"==============MESSAGE OF SIZE 50714\n",
"Skip 0 to 1535780\n",
"==============MESSAGE OF SIZE 47336\n",
"Skip 0 to 1583123\n",
"==============MESSAGE OF SIZE 58330\n",
"Skip 0 to 1641460\n",
"==============MESSAGE OF SIZE 56718\n",
"Skip 0 to 1698185\n",
"==============MESSAGE OF SIZE 68893\n",
"Skip 0 to 1767085\n",
"==============MESSAGE OF SIZE 40888\n",
"Skip 0 to 1807980\n",
"==============MESSAGE OF SIZE 50066\n",
"Skip 0 to 1858053\n",
"==============MESSAGE OF SIZE 68674\n",
"Skip 0 to 1926734\n",
"==============MESSAGE OF SIZE 69180\n",
"Skip 0 to 1995921\n",
"==============MESSAGE OF SIZE 54307\n",
"Skip 0 to 2050235\n",
"==============MESSAGE OF SIZE 66950\n",
"Skip 0 to 2117192\n",
"==============MESSAGE OF SIZE 53857\n",
"Skip 0 to 2171056\n",
"==============MESSAGE OF SIZE 67171\n",
"Skip 0 to 2238234\n",
"==============MESSAGE OF SIZE 46439\n",
"Skip 0 to 2284680\n",
"==============MESSAGE OF SIZE 51657\n",
"Skip 0 to 2336344\n",
"==============MESSAGE OF SIZE 50209\n",
"Skip 0 to 2386560\n",
"==============MESSAGE OF SIZE 37861\n",
"Skip 0 to 2424428\n",
"==============MESSAGE OF SIZE 49929\n",
"Skip 0 to 2474364\n",
"==============MESSAGE OF SIZE 53489\n",
"Skip 0 to 2527860\n",
"==============MESSAGE OF SIZE 79678\n",
"Skip 0 to 2607545\n",
"==============MESSAGE OF SIZE 52086\n",
"Skip 0 to 2659638\n",
"==============MESSAGE OF SIZE 45370\n",
"Skip 0 to 2705015\n",
"==============MESSAGE OF SIZE 62318\n",
"Skip 0 to 2767340\n",
"==============MESSAGE OF SIZE 56604\n",
"Skip 0 to 2823951\n",
"==============MESSAGE OF SIZE 53199\n",
"Skip 0 to 2877157\n",
"==============MESSAGE OF SIZE 49539\n",
"Skip 0 to 2926703\n",
"==============MESSAGE OF SIZE 45957\n",
"Skip 0 to 2972667\n",
"==============MESSAGE OF SIZE 49616\n",
"Skip 0 to 3022290\n",
"==============MESSAGE OF SIZE 57192\n",
"Skip 0 to 3079489\n",
"==============MESSAGE OF SIZE 57763\n",
"Skip 0 to 3137259\n",
"==============MESSAGE OF SIZE 50242\n",
"Skip 0 to 3187508\n",
"==============MESSAGE OF SIZE 40932\n",
"Skip 0 to 3228447\n",
"==============MESSAGE OF SIZE 63640\n",
"Skip 0 to 3292094\n",
"==============MESSAGE OF SIZE 54431\n",
"Skip 0 to 3346532\n",
"==============MESSAGE OF SIZE 40979\n",
"Skip 0 to 3387518\n",
"==============MESSAGE OF SIZE 71354\n",
"Skip 0 to 3458879\n",
"==============MESSAGE OF SIZE 66797\n",
"Skip 0 to 3525683\n",
"==============MESSAGE OF SIZE 37428\n",
"Skip 0 to 3563118\n",
"==============MESSAGE OF SIZE 56354\n",
"Skip 0 to 3619479\n",
"==============MESSAGE OF SIZE 61753\n",
"Skip 0 to 3681239\n",
"==============MESSAGE OF SIZE 43090\n",
"Skip 0 to 3724336\n",
"==============MESSAGE OF SIZE 44765\n",
"Skip 0 to 3769108\n",
"==============MESSAGE OF SIZE 71734\n",
"Skip 0 to 3840849\n",
"==============MESSAGE OF SIZE 43634\n",
"Skip 0 to 3884490\n",
"==============MESSAGE OF SIZE 81110\n",
"Skip 0 to 3965607\n",
"==============MESSAGE OF SIZE 40026\n",
"Skip 0 to 4005640\n",
"==============MESSAGE OF SIZE 98108\n",
"Skip 0 to 4103755\n",
"==============MESSAGE OF SIZE 65669\n",
"Skip 0 to 4169431\n",
"==============MESSAGE OF SIZE 61587\n",
"Skip 0 to 4231025\n",
"==============MESSAGE OF SIZE 52154\n",
"Skip 0 to 4283186\n",
"==============MESSAGE OF SIZE 45517\n",
"Skip 0 to 4328710\n",
"==============MESSAGE OF SIZE 35337\n",
"Skip 0 to 4364054\n",
"==============MESSAGE OF SIZE 49094\n",
"Skip 0 to 4413155\n",
"==============MESSAGE OF SIZE 41352\n",
"Skip 0 to 4454514\n",
"==============MESSAGE OF SIZE 47509\n",
"Skip 0 to 4502030\n",
"==============MESSAGE OF SIZE 54179\n",
"Skip 0 to 4556216\n",
"==============MESSAGE OF SIZE 63397\n",
"Skip 0 to 4619620\n",
"==============MESSAGE OF SIZE 55707\n",
"Skip 0 to 4675334\n",
"==============MESSAGE OF SIZE 59883\n",
"Skip 0 to 4735224\n",
"==============MESSAGE OF SIZE 41220\n",
"Skip 0 to 4776451\n",
"==============MESSAGE OF SIZE 41213\n",
"Skip 0 to 4817671\n",
"==============MESSAGE OF SIZE 41831\n",
"Skip 0 to 4859509\n",
"==============MESSAGE OF SIZE 71327\n",
"Skip 0 to 4930843\n",
"==============MESSAGE OF SIZE 52144\n",
"Skip 0 to 4982994\n",
"==============MESSAGE OF SIZE 41250\n",
"Skip 0 to 5024251\n",
"==============MESSAGE OF SIZE 55569\n",
"Skip 0 to 5079827\n",
"==============MESSAGE OF SIZE 42294\n",
"Skip 0 to 5122128\n",
"==============MESSAGE OF SIZE 72845\n",
"Skip 0 to 5194980\n",
"==============MESSAGE OF SIZE 51996\n",
"Skip 0 to 5246983\n",
"==============MESSAGE OF SIZE 73491\n",
"Skip 0 to 5320481\n",
"==============MESSAGE OF SIZE 59614\n",
"Skip 0 to 5380102\n",
"==============MESSAGE OF SIZE 48015\n",
"Skip 0 to 5428124\n",
"==============MESSAGE OF SIZE 51335\n",
"Skip 0 to 5479466\n",
"==============MESSAGE OF SIZE 50030\n",
"Skip 0 to 5529503\n",
"==============MESSAGE OF SIZE 51900\n",
"Skip 0 to 5581410\n",
"==============MESSAGE OF SIZE 48540\n",
"Skip 0 to 5629957\n",
"==============MESSAGE OF SIZE 60769\n",
"Skip 0 to 5690733\n",
"==============MESSAGE OF SIZE 79078\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"#key.open_read(headers={'Range': \"bytes=286388020-\"})\n",
"key.open_read(headers={'Range': \"bytes=286388021-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"# 52467\n",
"\n",
"for i in range(500):\n",
" try:\n",
" skipped, eof = read_until_next(message, 0x1e)\n",
"\n",
" print \"Skip\", skipped, \"to\", message._position\n",
"\n",
" header_length_raw = message.read(1)\n",
" (header_length,) = struct.unpack('<B', header_length_raw)\n",
"\n",
" header_raw = message.read(header_length)\n",
" header = message_pb2.Header()\n",
" header.ParseFromString(header_raw)\n",
"\n",
" unit_separator = message.read(1)\n",
" if ord(unit_separator[0]) != 0x1f:\n",
" raise ValueError(\"Unit separator shouldn't be here\")\n",
" message_raw = message.read(header.message_length)\n",
"\n",
" print \"==============MESSAGE OF SIZE\", header.message_length\n",
" except Exception, e:\n",
" #print e\n",
" message.backtrack()\n",
" #print \"Backtrack\", message._position"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So after fixing the off-by-one in `BacktrackableFile` and allowing for more iterations in the error recovery code, we've managed to make the reading work!"
]
},
{
"cell_type": "code",
"execution_count": 82,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"#!/usr/bin/env python\n",
"# encoding: utf-8\n",
"\n",
"# This Source Code Form is subject to the terms of the Mozilla Public\n",
"# License, v. 2.0. If a copy of the MPL was not distributed with this\n",
"# file, You can obtain one at http://mozilla.org/MPL/2.0/.\n",
"\n",
"import boto\n",
"import snappy\n",
"import struct\n",
"import gzip\n",
"\n",
"from cStringIO import StringIO\n",
"from google.protobuf.message import DecodeError\n",
"\n",
"\n",
"RECORD_SEPARATOR = 0x1e\n",
"\n",
"\n",
"class BacktrackableFile:\n",
" \"\"\"\n",
" Wrapper for file-like objects that exposes a file-like object interface,\n",
" but also allows backtracking to the first byte in the file-like object\n",
" equal to `RECORD_SEPARATOR` that we haven't backtracked to yet.\n",
"\n",
" This is useful for parsing Heka records, since backtracking will always move us\n",
" back to the start of a possible Heka record.\n",
"\n",
" See http://hekad.readthedocs.org/en/latest/message/ for Heka message details.\n",
" \"\"\"\n",
" def __init__(self, stream):\n",
" self._stream = stream\n",
" self._buffer = StringIO()\n",
" self._position = 0\n",
"\n",
" def tell(self):\n",
" \"\"\"\n",
" Returns the virtual position within this file-like object;\n",
" the effective offset from the beginning of the wrapped file-like object.\n",
" \"\"\"\n",
" return self._position\n",
"\n",
" def read(self, size):\n",
" \"\"\"\n",
" Read and return `size` bytes. Might not actually read the wrapped\n",
" file-like object if there are enough bytes buffered.\n",
" \"\"\"\n",
" self._position += size\n",
" buffer_data = self._buffer.read(size)\n",
" to_read = size - len(buffer_data)\n",
"\n",
" assert to_read >= 0, \"Read data must be equal or smaller to requested data\"\n",
" if to_read == 0:\n",
" return buffer_data\n",
"\n",
" stream_data = self._stream.read(to_read)\n",
" self._buffer.write(stream_data)\n",
"\n",
" return buffer_data + stream_data\n",
"\n",
" def close(self):\n",
" \"\"\"Close the file-like object, as well as its wrapped file-like object.\"\"\"\n",
" self._buffer.close()\n",
" if type(self._stream) == boto.s3.key.Key:\n",
" if self._stream.resp: # Hack! Connections are kept around otherwise!\n",
" self._stream.resp.close()\n",
"\n",
" self._stream.close(True)\n",
" else:\n",
" self._stream.close()\n",
"\n",
" def backtrack(self):\n",
" \"\"\"\n",
" Move the file cursor back to just after the first `RECORD_SEPARATOR` byte\n",
" in the stream that we haven't already backtracked to.\n",
" \"\"\"\n",
" buffer = self._buffer.getvalue()\n",
"\n",
" # start searching after the first byte, since the first byte would often be a record separator,\n",
" # and we don't want to backtrack to the same place every time\n",
" index = buffer.find(chr(RECORD_SEPARATOR), 1)\n",
"\n",
" # update the position to account for moving backward in the stream\n",
" self._position += index + 1 - len(buffer)\n",
"\n",
" # reset the buffer, since we'll never want to backtrack before this point ever again\n",
" # basically we're going to discard everything before this backtracking operation\n",
" self._buffer = StringIO()\n",
" if index >= 0:\n",
" # we add 1 because we want to have the same behaviour as `read_until_next`,\n",
" # which will set the cursor to just after the record separator\n",
" self._buffer.write(buffer[index + 1:])\n",
" self._buffer.seek(0)\n",
"\n",
"\n",
"class UnpackedRecord():\n",
" \"\"\"Represents a single Heka message. See http://hekad.readthedocs.org/en/latest/message/ for details.\"\"\"\n",
" def __init__(self, raw, header, message=None, error=None):\n",
" self.raw = raw\n",
" self.header = header\n",
" self.message = message\n",
" self.error = error\n",
"\n",
"\n",
"# Returns (bytes_skipped=int, eof_reached=bool)\n",
"def read_until_next(fin, separator=RECORD_SEPARATOR):\n",
" \"\"\"\n",
" Read bytes in a file-like object until `separator` is found.\n",
"\n",
" Returns the number of bytes skipped, and whether the search failed to find `separator`. If `separator` is found,\n",
" the number of bytes skipped is one less than the actual number of bytes successfully read. Otherwise, they are the same.\n",
"\n",
" Note that when this completes, the file cursor will be immediately after the `separator` byte,\n",
" so the next byte read will be the one after it.\n",
" \"\"\"\n",
" bytes_skipped = 0\n",
" while True:\n",
" c = fin.read(1)\n",
" if c == '':\n",
" return (bytes_skipped, True)\n",
" elif ord(c) != separator:\n",
" bytes_skipped += 1\n",
" else:\n",
" break\n",
" return (bytes_skipped, False)\n",
"\n",
"\n",
"# Stream Framing:\n",
"# https://hekad.readthedocs.org/en/latest/message/index.html\n",
"def read_one_record(input_stream, raw=False, verbose=False, strict=False, try_snappy=True):\n",
" \"\"\"\n",
" Attempt to read one Heka record from the file-like object `input_stream`, returning an `UnpackedRecord` instance.\n",
"\n",
" Returns the record (or `None` if the stream ended while reading), and the total number of bytes read.\n",
"\n",
" If and only if `raw` is set, messages won't be parsed (the `UnpackedRecord` instance still contains the raw record, however).\n",
"\n",
" If and only if `verbose` is set, useful debugging information will be printed while parsing.\n",
"\n",
" If and only if `strict` is set, the stream is validated more thoroughly.\n",
"\n",
" If and only if `try_snappy` is set, the function will also attempt to decompress the message body with Snappy.\n",
" \"\"\"\n",
" # Read 1 byte record separator (and keep reading until we get one)\n",
" total_bytes = 0\n",
" skipped, eof = read_until_next(input_stream, RECORD_SEPARATOR)\n",
" total_bytes += skipped\n",
" if eof:\n",
" return None, total_bytes\n",
" else:\n",
" # we've read one separator (plus anything we skipped)\n",
" total_bytes += 1\n",
"\n",
" if skipped > 0:\n",
" if strict:\n",
" raise ValueError(\"Unexpected character(s) at the start of record\")\n",
" if verbose:\n",
" print \"Skipped\", skipped, \"bytes to find a valid separator\"\n",
"\n",
" #print \"position\", input_stream.tell()\n",
" raw_record = struct.pack(\"<B\", 0x1e)\n",
"\n",
" # Read the header length\n",
" header_length_raw = input_stream.read(1)\n",
" if header_length_raw == '': # no more data to read\n",
" return None, total_bytes\n",
"\n",
" total_bytes += 1\n",
" raw_record += header_length_raw\n",
"\n",
" # The \"<\" is to force it to read as Little-endian to match the way it's\n",
" # written. This is the \"native\" way in linux too, but might as well make\n",
" # sure we read it back the same way.\n",
" (header_length,) = struct.unpack('<B', header_length_raw)\n",
"\n",
" header_raw = input_stream.read(header_length)\n",
" if header_length > 0 and header_raw == '': # no more data to read\n",
" return None, total_bytes\n",
" total_bytes += header_length\n",
" raw_record += header_raw\n",
"\n",
" header = message_pb2.Header()\n",
" header.ParseFromString(header_raw)\n",
" unit_separator = input_stream.read(1)\n",
" total_bytes += 1\n",
" if ord(unit_separator[0]) != 0x1f:\n",
" raise DecodeError(\"Unexpected unit separator character in record at offset {}: {}\".format(total_bytes, ord(unit_separator[0])))\n",
" raw_record += unit_separator\n",
"\n",
" #print \"message length:\", header.message_length\n",
" message_raw = input_stream.read(header.message_length)\n",
"\n",
" total_bytes += header.message_length\n",
" raw_record += message_raw\n",
"\n",
" message = None\n",
" if not raw:\n",
" message = message_pb2.Message()\n",
" parsed_ok = False\n",
" if try_snappy:\n",
" try:\n",
" message.ParseFromString(snappy.decompress(message_raw))\n",
" parsed_ok = True\n",
" except:\n",
" # Wasn't snappy-compressed\n",
" pass\n",
" if not parsed_ok:\n",
" # Either we didn't want to attempt snappy, or the\n",
" # data was not snappy-encoded (or it was just bad).\n",
" message.ParseFromString(message_raw)\n",
"\n",
" return UnpackedRecord(raw_record, header, message), total_bytes\n",
"\n",
"\n",
"def unpack_file(filename, **kwargs):\n",
" fin = None\n",
" if filename.endswith(\".gz\"):\n",
" fin = gzip.open(filename, \"rb\")\n",
" else:\n",
" fin = open(filename, \"rb\")\n",
" return unpack(fin, **kwargs)\n",
"\n",
"\n",
"def unpack_string(string, **kwargs):\n",
" return unpack(StringIO(string), **kwargs)\n",
"\n",
"\n",
"def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_snappy=True):\n",
" \"\"\"\n",
" Attempt to parse a sequence of records in a file-like object.\n",
"\n",
" Returns an iterator, which yields tuples of `UnpackedRecord` and the total number of bytes read so far.\n",
"\n",
" The flags are the same as those for `read_one_record`.\n",
" \"\"\"\n",
" record_count = 0\n",
" bad_records = 0\n",
" total_bytes = 0\n",
"\n",
" while True:\n",
" r = None\n",
" try:\n",
" r, bytes = read_one_record(fin, raw, verbose, strict, try_snappy)\n",
" except Exception as e:\n",
" if strict:\n",
" fin.close()\n",
" raise e\n",
" elif verbose:\n",
" print e\n",
"\n",
" # if we can backtrack and the message wasn't well formed,\n",
" # backtrack and try to parse the message starting from there\n",
" if backtrack and type(e) in {DecodeError, UnicodeDecodeError}: # these are the exceptions that protobuf will throw\n",
" fin.backtrack()\n",
" continue\n",
"\n",
" if r is None:\n",
" break\n",
"\n",
" if verbose and r.error is not None:\n",
" print r.error\n",
"\n",
" record_count += 1\n",
" total_bytes += bytes\n",
"\n",
" yield r, total_bytes\n",
"\n",
" if verbose:\n",
" print \"Processed\", record_count, \"records\"\n",
"\n",
" fin.close()\n"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(<__main__.UnpackedRecord instance at 0x7faa41d25050>, 52466)\n",
"(<__main__.UnpackedRecord instance at 0x7faa41d25488>, 68034)\n",
"(<telemetry.util.message_pb2.Message object at 0x7faa419fc1b8>, 68192)\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=0-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"print str(read_one_record(message, raw=False, verbose=False, strict=False, try_snappy=True))\n",
"print str(read_one_record(message, raw=False, verbose=False, strict=False, try_snappy=True))\n",
"\n",
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=1-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"while True:\n",
" r = None\n",
" try:\n",
" r, bytes = read_one_record(message, raw=False, verbose=False, strict=False, try_snappy=True)\n",
" except Exception as e:\n",
" if type(e) in {DecodeError, UnicodeDecodeError}:\n",
" message.backtrack()\n",
" continue\n",
" import traceback\n",
" print traceback.format_exc()\n",
"\n",
" if r is not None and r.error is not None:\n",
" print r.error\n",
"\n",
" print str((r.message, bytes))\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": 84,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1838\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=0-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"c = 0\n",
"for record, total_bytes in unpack(message, backtrack=True, verbose=False, strict=False):\n",
" c += 1\n",
" if total_bytes >= 100*(2**20):\n",
" break\n",
"\n",
"print c\n",
"#key.close()"
]
},
{
"cell_type": "code",
"execution_count": 85,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1837\n"
]
}
],
"source": [
"key = v4.get_key(\"telemetry-webrtc/20160101-snappy-single/output_snappy.log\")\n",
"key.open_read(headers={'Range': \"bytes=1-\"})\n",
"message = BacktrackableFile(key)\n",
"\n",
"c = 0\n",
"for record, total_bytes in unpack(message, backtrack=True, verbose=False, strict=False):\n",
" #yield _parse_heka_record(record)\n",
" c += 1\n",
" if total_bytes >= 100*(2**20):\n",
" break\n",
"\n",
"print c\n",
"#key.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Perfect - the first, malformed record was skipped, and all the other ones worked fine. Seems like this fixes the issue. Just to make sure, it's important to test with the full test cases."
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"34339\n"
]
}
],
"source": [
"def get_records(sc, source_name, **kwargs):\n",
" \"\"\" Returns a RDD of records for a given data source and filtering criteria.\n",
" All data sources support\n",
" :param fraction: the fraction of files read from the data source, set to 1.0\n",
" by default.\n",
" Depending on the data source, different filtering criteria will be available.\n",
" Filtering criteria should be specified using the TelemetrySchema approach:\n",
" Range filter: submissionDate={\"min\": \"20150901\", \"max\": \"20150908\"}\n",
" or submissionDate=(\"20150901\", \"20150908\")\n",
" List filter: appUpdateChannel=[\"nightly\", \"aurora\", \"beta\"]\n",
" Value filter: docType=\"main\"\n",
" \"\"\"\n",
" schema = _get_source_schema(source_name)\n",
" if schema is None:\n",
" raise ValueError(\"Error getting schema for {}\".format(source_name))\n",
"\n",
" bucket_name = _sources[source_name][\"bucket\"]\n",
" bucket_prefix = _sources[source_name][\"prefix\"]\n",
" if bucket_prefix[-1] != \"/\":\n",
" bucket_prefix = bucket_prefix + \"/\"\n",
"\n",
" fraction = kwargs.pop(\"fraction\", 1.0)\n",
" if fraction < 0 or fraction > 1:\n",
" raise ValueError(\"Invalid fraction argument\")\n",
"\n",
" filter_args = {}\n",
" field_names = [f[\"field_name\"] for f in schema[\"dimensions\"]]\n",
" for field_name in field_names:\n",
" field_filter = kwargs.pop(field_name, None)\n",
" if field_filter is None:\n",
" continue\n",
" # Special case for compatibility with get_pings:\n",
" # If we get a filter parameter that is a tuple of length 2, treat it\n",
" # as a min/max filter instead of a list of allowed values.\n",
" if isinstance(field_filter, tuple) and len(field_filter) == 2:\n",
" field_filter = {\"min\": field_filter[0], \"max\": field_filter[1]}\n",
" filter_args[field_name] = field_filter\n",
"\n",
" if kwargs:\n",
" raise TypeError(\"Unexpected **kwargs {}\".format(repr(kwargs)))\n",
"\n",
" # We should eventually support data sources in other buckets, but for now,\n",
" # assume that everything lives in the v4 data bucket.\n",
" assert(bucket_name == _bucket_v4.name)\n",
" # TODO: cache the buckets, or at least recognize the ones we already have\n",
" # handles to (_bucket_* vars)\n",
" bucket = _bucket_v4 # TODO: bucket = _conn.get_bucket(bucket_name, validate=False)\n",
" filter_schema = _filter_to_schema(schema, filter_args)\n",
" files = _list_s3_filenames(bucket, bucket_prefix, filter_schema)\n",
"\n",
" if files and fraction != 1.0:\n",
" sample = random.choice(files, size=len(files)*fraction, replace=False)\n",
" else:\n",
" sample = files\n",
"\n",
" # TODO: Make sure that \"bucket_name\" matches the v4 bucket name, otherwise\n",
" # introduce a \"bucket\" parameter to _read_v4_ranges\n",
" parallelism = max(len(sample), sc.defaultParallelism)\n",
" ranges = sc.parallelize(sample, parallelism).flatMap(_read_v4_ranges).collect()\n",
"\n",
" if len(ranges) == 0:\n",
" return sc.parallelize([])\n",
" else:\n",
" return sc.parallelize(ranges, len(ranges)).flatMap(_read_v4_range)\n",
"\n",
"from moztelemetry.spark import _get_source_schema, _sources, _bucket_v4, _filter_to_schema, _list_s3_filenames, _read_v4_ranges\n",
"\n",
"def _read_v4_range(filename_chunk):\n",
" try:\n",
" filename, chunk = filename_chunk\n",
" start = 100*(2**20)*chunk\n",
" key = v4.get_key(filename)\n",
" key.open_read(headers={'Range': \"bytes={}-\".format(start)})\n",
" return parse_heka_message(key, boundary_bytes=100*(2**20))\n",
" except ssl.SSLError:\n",
" return []\n",
"\n",
"print get_records(sc, \"telemetry-webrtc\", submissionDate=\"20160101-snappy-single\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Looks like it works!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment