Skip to content

Instantly share code, notes, and snippets.

View ntakouris's full-sized avatar
🤖
Building robots

Theodoros Ntakouris ntakouris

🤖
Building robots
View GitHub Profile
import tensorflow as tf
# beam imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import tensorflow_transform.beam as tft_beam
# orchestration
from tfx.orchestration import pipeline, metadata
import tensorflow_data_validation as tfdv
schema = tfdv.load_schema_text(file)
print(schema) #text output, like doing > cat file
schema.blabla = ..
feature {
name: "payment_type"
value_count {
min: 1
max: 1
}
type: BYTES
domain: "payment_type"
presence {
min_fraction: 1.0
DATAFLOW_BEAM_PIPELINE_ARGS = [
'--project=' + '<your project id>',
'--runner=DataflowRunner',
'--temp_location=' + 'gs://<bucket name>/tmp',
'--staging_location=' + 'gs://<bucket name>/staging',
'--region=' + 'us-central1', # the place where our buckets are
# '--disk_size_gb=50', # no fine tuning needed
# If you are blocked by IPr Address quota, using a bigger machine_type will
# reduce the number of needed IPs.
# '--machine_type=n1-standard-8',
def create_pipeline():
no_eval_config = example_gen_pb2.Input(splits=[ # treat the entire input as a train split
example_gen_pb2.Input.Split(name='train', pattern='taxi_dataset.csv'),
])
example_gen = CsvExampleGen(input=external_input(
'gs://<bucket name>/'), input_config=no_eval_config)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
return pipeline.Pipeline(
import tensorflow_model_analysis as tfma
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
metrics=[
tfma.MetricConfig(class_name='ExampleCount')
],
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
thresholds = {
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_dataset, transform_fn = (
raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data_coder = tft.coders.ExampleProtoCoder(
transformed_metadata.schema)
_ = (
transformed_data
| 'EncodeTrainData' >> beam.Map(transformed_data_coder.encode)
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
# Since we are modifying some features and leaving others unchanged, we
# start by setting `outputs` to a copy of `inputs.
outputs = inputs.copy()
# Scale numeric columns to have range [0, 1].
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(outputs[key])
raw_data = [
{'x': 1, 'y': 1, 's': 'hello'},
{'x': 2, 'y': 2, 's': 'world'},
{'x': 3, 'y': 3, 's': 'hello'}
]
raw_data_metadata = dataset_metadata.DatasetMetadata(
dataset_schema.from_feature_spec({
'y': tf.io.FixedLenFeature([], tf.float32),
'x': tf.io.FixedLenFeature([], tf.float32),
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
x = inputs['x']
y = inputs['y']
s = inputs['s']
x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)
return {