Created
July 17, 2015 22:25
-
-
Save SamPenrose/cb5b8a2161e7c5ef6bdb to your computer and use it in GitHub Desktop.
Extracting v4 pings from a gzipped file on Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"nbformat_minor": 0, "cells": [{"execution_count": 1, "cell_type": "code", "source": "import boto\nconn = boto.connect_s3()", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 2, "cell_type": "code", "source": "bucket = conn.get_bucket('net-mozaws-prod-us-west-2-pipeline-analysis')", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 3, "cell_type": "code", "source": "keyhead = 'mreid/bug1171265/merged_by_day2/'", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 4, "cell_type": "code", "source": "basenames = ['20150610.txt.gz'] # etc.", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 7, "cell_type": "code", "source": "handles = map(lambda bn: bucket.get_key(keyhead+bn), basenames)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 8, "cell_type": "code", "source": "keys = handles", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 9, "cell_type": "code", "source": "import ujson as json\nimport zlib\ndef extract_v4_pings(key):\n def stream_gzip_decompress(stream):\n dec = zlib.decompressobj(32 + zlib.MAX_WBITS) # offset 32 to skip the header\n for chunk in stream:\n rv = dec.decompress(chunk)\n if rv:\n yield rv\n def extract_lines():\n buf = ''\n # pre-compression, the file was line-oriented,\n # but chunk does not correspond to a line.\n for chunk in stream_gzip_decompress(key):\n parts = chunk.split('\\n')\n start = parts.pop(0)\n buf += start\n # If parts is now empty, buf is not a line yet -> next chunk.\n # If parts is not empty, buf is a line from the source file.\n while parts:\n yield buf\n # parts is now one of:\n # [''], meaning it ended with '\\n' (unlikely)\n # ['lineN', 'lineN+1', ... ''] (unlikely), or\n # ['start of lineN'] (probably)\n buf = parts.pop(0)\n for line in extract_lines():\n yield json.loads(line.split('\\t')[1])", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 11, "cell_type": "code", "source": "results = []\nfor key in keys:\n for ping in extract_v4_pings(key):\n results.append(ping)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 12, "cell_type": "code", "source": "len(results), results and results[0].keys()", "outputs": [{"execution_count": 12, "output_type": "execute_result", "data": {"text/plain": "(181609,\n [u'clientId',\n u'payload',\n u'environment',\n u'application',\n u'version',\n u'creationDate',\n u'type',\n u'id',\n u'appUpdateChannel'])"}, "metadata": {}}], "metadata": {"collapsed": false, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment