Skip to content

Instantly share code, notes, and snippets.

@SamPenrose
Created July 17, 2015 22:25
Show Gist options
  • Save SamPenrose/cb5b8a2161e7c5ef6bdb to your computer and use it in GitHub Desktop.
Save SamPenrose/cb5b8a2161e7c5ef6bdb to your computer and use it in GitHub Desktop.
Extracting v4 pings from a gzipped file on Spark
Display the source blob
Display the rendered blob
Raw
{"nbformat_minor": 0, "cells": [{"execution_count": 1, "cell_type": "code", "source": "import boto\nconn = boto.connect_s3()", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 2, "cell_type": "code", "source": "bucket = conn.get_bucket('net-mozaws-prod-us-west-2-pipeline-analysis')", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 3, "cell_type": "code", "source": "keyhead = 'mreid/bug1171265/merged_by_day2/'", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 4, "cell_type": "code", "source": "basenames = ['20150610.txt.gz'] # etc.", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 7, "cell_type": "code", "source": "handles = map(lambda bn: bucket.get_key(keyhead+bn), basenames)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 8, "cell_type": "code", "source": "keys = handles", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "import ujson as json\nimport zlib\ndef extract_v4_pings(key):\n def stream_gzip_decompress(stream):\n dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header\n for chunk in stream:\n rv = dec.decompress(chunk)\n if rv:\n yield rv\n def extract_lines():\n buf = ''\n # pre-compression, the file was line-oriented,\n # but chunk does not correspond to a line.\n for chunk in stream_gzip_decompress(key):\n parts = chunk.split('\\n')\n start = parts.pop(0)\n buf += start\n # If parts is now empty, buf is not a line yet -> next chunk.\n # If parts is not empty, buf is a line from the source file.\n while parts:\n yield buf\n # parts is now one of:\n # [''], meaning it ended with '\\n' (unlikely)\n # ['lineN', 'lineN+1', ... ''] (unlikely), or\n # ['start of lineN'] (probably)\n buf = parts.pop(0)\n for line in extract_lines():\n yield json.loads(line.split('\\t')[1])", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 11, "cell_type": "code", "source": "results = []\nfor key in keys:\n for ping in extract_v4_pings(key):\n results.append(ping)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 12, "cell_type": "code", "source": "len(results), results and results[0].keys()", "outputs": [{"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "(181609,\n [u'clientId',\n u'payload',\n u'environment',\n u'application',\n u'version',\n u'creationDate',\n u'type',\n u'id',\n u'appUpdateChannel'])"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment