Created
October 6, 2016 16:50
-
-
Save pabloem/1755254018999c64900d99e29259c8aa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## -*- coding: utf-8 -*- | |
import json | |
import sys | |
import time | |
import apache_beam as beam | |
from apache_beam.io.fileio import CompressionTypes as ct | |
from apache_beam.utils.options import PipelineOptions | |
from beam_utils.sources import CsvFileSource | |
class InegiPipelineOptions(PipelineOptions): | |
@classmethod | |
def _add_argparse_args(cls, parser): | |
parser.add_argument('--input', dest='inputFile', | |
default='data/2015/*csv', | |
help="Input file to process.") | |
parser.add_argument('--output', dest='outputFile', | |
default='outputFile_inegi', | |
help='Prefix for the output text files') | |
#parser.add_argument('--runner', dest='runner', | |
#default='DirectPipelineRunner') | |
opts = InegiPipelineOptions(flags=sys.argv) | |
p = beam.Pipeline(options=opts) | |
# Ahora leemos el archivo de entrada | |
pairSA = ( | |
p | |
| 'read_files' >> beam.io.Read( | |
CsvFileSource(opts.inputFile, | |
compression_type = ct.UNCOMPRESSED)) | |
# Filtramos filas que no tienen las columnas que nos interesan | |
| 'filter_rows' >> beam.Filter(lambda x: ('Entidad federativa' in x and | |
'Nombre de clase de la actividad' in x)) | |
# Ahora filtramos las columnas que nos interesan | |
| 'filter_columns' >> beam.Map( | |
lambda x: tuple((x['Entidad federativa'].strip(), | |
x['Nombre de clase de la actividad'].strip()))) | |
# Finalmente contamos cuantos negocios hay del mismo tipo en cada estado | |
| 'count_pairs' >> beam.combiners.Count.PerElement() | |
) | |
# Primero tenemos que hacer el negocio la CLAVE en una tupla de CLAVE-VALOR | |
# Por eso lo organizamos en una estructura de (negocio, (estado, conteo)) | |
# Donde negocio es la CLAVE | |
sortByBiz = ( | |
pairSA | |
|'business as key' >> beam.Map(lambda x: (x[0][1], (x[0][0], x[1]))) | |
# Ahora agrupamos por CLAVE, de manera que obtengamos una | |
# coleccion de (negocio, [(estado, conteo), (estado, conteo),..]) | |
| 'group by key' >> beam.GroupByKey() | |
# Y dentro de esa coleccion ordenamos por CONTEO | |
# y juntamos los resultados - para despues guardarlos | |
| 'sort then join' >> beam.FlatMap( | |
lambda x: [(x[0], y[0], y[1]) | |
for y in sorted(x[1], key=lambda y:y[1])]) | |
) | |
# Finalmente escribimos a un archivo de salida | |
(sortByBiz | |
| "jsonize" >> beam.Map(lambda x: json.dumps(x)) | |
| "write out" >> beam.Write(beam.io.TextFileSink(opts.outputFile)) | |
) | |
st_time = time.time() | |
p.run() | |
print("---- Runtime: {} seconds ----".format(time.time() - st_time)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mkdir bigdata-inegi ; cd bigdata-inegi | |
mkdir data ; cd data | |
wget --no-check-certificate http://storage.googleapis.com/noogler-projects.appspot.com/denue-2015.tar.gz | |
tar -xvzf denue-2015.tar.gz | |
cd .. | |
virtualenv venv | |
source venv/bin/activate | |
git clone https://github.com/apache/incubator-beam.git | |
cd incubator-beam ; git checkout python-sdk | |
cd sdks/python ; python setup.py install --root / | |
cd ../../../ | |
cp ../analisis.py . | |
pip install google-cloud-dataflow | |
pip install beam_utils |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment