This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow_io.arrow as arrow_io | |
ds = arrow_io.ArrowStreamDataset.from_pandas( | |
df, | |
batch_size=2, | |
preserve_index=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def model_fit(ds): | |
"""Create and fit a Keras logistic regression model.""" | |
# Build the Keras model | |
model = tf.keras.Sequential() | |
model.add(tf.keras.layers.Dense(1, input_shape=(2,), | |
activation='sigmoid')) | |
model.compile(optimizer='sgd', loss='mean_squared_error', | |
metrics=['accuracy']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_and_process(filename): | |
"""Read the given CSV file and yield processed Arrow batches.""" | |
# Read a CSV file into an Arrow Table with threading enabled and | |
# set block_size in bytes to break the file into chunks for granularity, | |
# which determines the number of batches in the resulting pyarrow.Table | |
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096) | |
table = pyarrow.csv.read_csv(filename, opts) | |
# Fit the feature transform |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def make_local_dataset(filename): | |
"""Make a TensorFlow Arrow Dataset that reads from a local CSV file.""" | |
# Read the local file and get a record batch iterator | |
batch_iter = read_and_process(filename) | |
# Create the Arrow Dataset as a stream from local iterator of record batches | |
ds = arrow_io.ArrowStreamDataset.from_record_batches( | |
batch_iter, | |
output_types=(tf.int64, tf.float64, tf.float64), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ds = make_local_dataset(filename) | |
model = model_fit(ds) | |
print("Fit model with weights: {}".format(model.get_weights())) | |
# Fit model with weights: | |
# [array([[0.7793554 ], [0.61216295]], dtype=float32), | |
# array([0.03328196], dtype=float32)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_and_process_dir(directory): | |
"""Read a directory of CSV files and yield processed Arrow batches.""" | |
for f in os.listdir(directory): | |
if f.endswith(".csv"): | |
filename = os.path.join(directory, f) | |
for batch in read_and_process(filename): | |
yield batch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def serve_csv_data(ip_addr, port_num, directory): | |
""" | |
Create a socket and serve Arrow record batches as a stream read from the | |
given directory containing CVS files. | |
""" | |
# Create the socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.bind((ip_addr, port_num)) | |
sock.listen(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def make_remote_dataset(endpoint): | |
"""Make a TensorFlow Arrow Dataset that reads from a remote Arrow stream.""" | |
# Create the Arrow Dataset from a remote host serving a stream | |
ds = arrow_io.ArrowStreamDataset( | |
[endpoint], | |
columns=(0, 1, 2), | |
output_types=(tf.int64, tf.float64, tf.float64), | |
output_shapes=(tf.TensorShape([]), tf.TensorShape([]), tf.TensorShape([])), | |
batch_mode='auto') |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.