Skip to content

Instantly share code, notes, and snippets.

@pascalwhoop
Created July 8, 2020 20:06
Show Gist options
  • Save pascalwhoop/ea3bf6641b239f8605c210b9f88179f1 to your computer and use it in GitHub Desktop.
Save pascalwhoop/ea3bf6641b239f8605c210b9f88179f1 to your computer and use it in GitHub Desktop.
sample pipeline for data cleaning
import apache_beam as beam
import logging
import json
from apache_beam.io import ReadFromText
from apache_beam.io import BigQuerySource
from apache_beam.io import BigQuerySink
from apache_beam.io import WriteToText
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
logging.getLogger().setLevel(logging.INFO)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "pascalwhoop"
google_cloud_options.job_name = "phone-sensors-cleanup"
google_cloud_options.staging_location = "gs://pascalwhoop-private/staging"
google_cloud_options.temp_location = "gs://pascalwhoop-private/temp"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
# see here for bigquery docs https://beam.apache.org/documentation/io/built-in/google-bigquery/
source_table_spec = bigquery.TableReference(
projectId="pascalwhoop", datasetId="phone_sensors", tableId="heartbeat"
)
sink_table_spec = bigquery.TableReference(
projectId="pascalwhoop", datasetId="phone_sensors", tableId="heartbeat_cleaned"
)
def make_sink_schema():
mapping = {
"altitude": "FLOAT",
"battery_status": "INTEGER",
"bluetooth_status": "STRING",
"cell_id": "STRING",
"cell_strength": "INTEGER",
"gps_status": "STRING",
"last_app": "STRING",
"location_accuracy": "FLOAT",
"location_gps": "STRING",
"location_net": "STRING",
"location_seconds": "STRING",
"speed": "FLOAT",
"timestamp": "INTEGER"
}
mapping_list = [{"mode": "NULLABLE", "name": k, "type": mapping[k]} for k in mapping.keys()]
return json.JSONEncoder(sort_keys=True).encode({"fields": mapping_list})
table_schema = parse_table_schema_from_json(make_sink_schema())
#source = BigQuerySource(query="SELECT * FROM `pascalwhoop.phone_sensors.heartbeat` LIMIT 10", use_standard_sql=True) # you can also use SQL queries
source = BigQuerySource(source_table_spec)
target = BigQuerySink(sink_table_spec, schema=table_schema)
#target = beam.io.WriteToText("output.txt")
def run():
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "ReadTable" >> beam.io.Read(source)
| "cleanup" >> beam.ParDo(ElementCleanup())
| "writeTable" >> beam.io.Write(target)
)
# pipeline
# parDo for all values in PCollection: process
# each element: define a target datatype and a set of cleanup functions for each
class ElementCleanup(beam.DoFn):
"""
tasker uses the %VAR_NAME syntax to construct JSON. Sometimes, values aren't replaced. In these cases, the string starts with a "%". If this is the case, simply replace it with a None
"""
def __init__(self):
self.transforms = self.make_transform_map()
def make_transform_map(self):
return {
"battery_status": [self.trim, self.percent_cleaner, self.to_int],
"bluetooth_status": [self.trim, self.percent_cleaner, ],
"cell_id": [self.trim, self.percent_cleaner, ],
"cell_strength": [self.trim, self.percent_cleaner, self.to_int],
"gps_status": [self.trim, self.percent_cleaner, ],
"last_app": [self.trim, self.percent_cleaner, ],
"location_gps": [self.trim, self.percent_cleaner, ], #keeping encoding as "LAT,LON" as data studio likes this
"location_net": [self.trim, self.percent_cleaner, ], #keeping encoding as "LAT,LON" as data studio likes this
"location_accuracy": [self.trim, self.percent_cleaner, self.to_float],
"altitude": [self.trim, self.percent_cleaner, self.to_float],
"speed": [self.trim, self.percent_cleaner, self.to_float],
"location_seconds": [self.trim, self.percent_cleaner, ],
"timestamp": [self.trim, self.percent_cleaner, self.to_int],
}
def process(self, row):
#process receives the object and (must) return an iterable (in case of breaking objects up into several)
return [self.handle_row(row, self.transforms)]
def handle_row(self, row, transforms):
fixed = {}
for key in row.keys():
val = row[key]
for func in transforms[key]:
val = func(val)
fixed[key] = val
return fixed
def percent_cleaner(self, value: str):
if isinstance(value, str) and value.startswith("%"):
return None
else:
return value
def trim(self, val:str):
return val.strip()
def to_int(self, val: str):
return (int(val) if val != None else None)
def to_float(self, val: str):
return (float(val) if val != None else None)
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment