This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_and_process_dir(directory): | |
"""Read a directory of CSV files and yield processed Arrow batches.""" | |
for f in os.listdir(directory): | |
if f.endswith(".csv"): | |
filename = os.path.join(directory, f) | |
for batch in read_and_process(filename): | |
yield batch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ds = make_local_dataset(filename) | |
model = model_fit(ds) | |
print("Fit model with weights: {}".format(model.get_weights())) | |
# Fit model with weights: | |
# [array([[0.7793554 ], [0.61216295]], dtype=float32), | |
# array([0.03328196], dtype=float32)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def make_local_dataset(filename): | |
"""Make a TensorFlow Arrow Dataset that reads from a local CSV file.""" | |
# Read the local file and get a record batch iterator | |
batch_iter = read_and_process(filename) | |
# Create the Arrow Dataset as a stream from local iterator of record batches | |
ds = arrow_io.ArrowStreamDataset.from_record_batches( | |
batch_iter, | |
output_types=(tf.int64, tf.float64, tf.float64), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_and_process(filename): | |
"""Read the given CSV file and yield processed Arrow batches.""" | |
# Read a CSV file into an Arrow Table with threading enabled and | |
# set block_size in bytes to break the file into chunks for granularity, | |
# which determines the number of batches in the resulting pyarrow.Table | |
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096) | |
table = pyarrow.csv.read_csv(filename, opts) | |
# Fit the feature transform |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def model_fit(ds): | |
"""Create and fit a Keras logistic regression model.""" | |
# Build the Keras model | |
model = tf.keras.Sequential() | |
model.add(tf.keras.layers.Dense(1, input_shape=(2,), | |
activation='sigmoid')) | |
model.compile(optimizer='sgd', loss='mean_squared_error', | |
metrics=['accuracy']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow_io.arrow as arrow_io | |
ds = arrow_io.ArrowStreamDataset.from_pandas( | |
df, | |
batch_size=2, | |
preserve_index=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow_io.arrow as arrow_io | |
from pyarrow.feather import write_feather | |
# Write the Pandas DataFrame to a Feather file | |
write_feather(df, '/path/to/df.feather') | |
# Create the dataset with one or more filenames | |
ds = arrow_io.ArrowFeatherDataset( | |
['/path/to/df.feather'], | |
columns=(0, 1, 2), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow_io.arrow as arrow_io | |
ds = arrow_io.ArrowDataset.from_pandas( | |
df, | |
batch_size=2, | |
preserve_index=False) | |
# Make an iterator to the dataset | |
ds_iter = iter(ds) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
data = {'label': np.random.binomial(1, 0.5, 10)} | |
data['x0'] = np.random.randn(10) + 5 * data['label'] | |
data['x1'] = np.random.randn(10) + 5 * data['label'] | |
df = pd.DataFrame(data) | |
print(df.head()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial | |
import multiprocessing | |
import os | |
import socket | |
import sys | |
from sklearn.preprocessing import StandardScaler | |
import numpy as np | |
import pandas as pd |