Last active
July 25, 2024 20:59
-
-
Save Andy-P/01b8ced63f0d34b4e21c to your computer and use it in GitHub Desktop.
High Performance Streaming Analytics in Julia
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is part of a presentation on streaming analytics in Julia | |
# It was inspired by a number of individuals and makes use of some of their ideas | |
# 1. FastML.com got me thinking about inline processing after | |
# reading his great Vowpal Wabbit posts | |
# 2. John Lanford and his fantastic Vowpal Wabbit library. | |
# Check out his NYU video course to learn more (see below) | |
# 3. John Myles White's presentation on online SDG and his StreamStats.jl library | |
# Thank you all! | |
path = dirname(@__FILE__) # path to the script. Will fail if used from console (set path manually) | |
# Before we begin we're going to need some data. | |
# Below, we create two files with 5k and 500k rows respectively. | |
# Feel free to try this out with larger files, but if CSV format be prepared to wait! | |
# csv data files | |
faux_data_5k = joinpath(path,"faux_data_5k.csv") | |
faux_data_500k = joinpath(path,"faux_data_500k.csv") | |
function fauxCSVData(path, n, p) | |
iostrm = open(path,"w") | |
xs = rand(n,p) # e.g. 32 x 32 images | |
y = ones(n) | |
data = hcat(y,xs) | |
writecsv(path,data) | |
println("Completed Write out...") | |
end | |
# let's create some LARGE files with lots of features/columns | |
p = 32 * 32 # e.g. 1024 columns (32 x 32 images) | |
@time fauxCSVData(faux_data_5k, 5000, p) # <- 5k rows of 1024 columns (32 x 32) 93.6MB | |
# 1.68 seconds (0.25GB allocated, 9.77% gc time) | |
# WARNING: Don't run during pesentation (over 6 min!) | |
@time fauxCSVData(faux_data_500k, 500_000, p) # <-500k rows of 1024 columns (32 x 32) 9.36GB | |
# 339.7 seconds (25.8GB bytes allocated, 4.15% gc time) | |
# now, lets load this data into memory and calculate the mean of the 1st feature/column | |
function CSVread(path::String) | |
data = readcsv(path) | |
mean(data[:,1]) | |
end | |
@time CSVread(faux_data_5k) | |
# elapsed time: 4.038636026 seconds (689176136 bytes allocated, 3.34% gc time) | |
# WARNING: Don't run during pesentation (over 13 min!!!) | |
@time CSVread(faux_data_500k) # <- needed about 1.9 GB of RAN at its peak | |
# elapsed time: 787.871814233 seconds (22229432176 bytes allocated, 1.72% gc time) | |
# Note | |
# ============== | |
# 1. This still works if your plan is to load it once and do some feature engineering | |
# 2. However, we're already taking a really long time and we're only at 500K rows | |
# 3. We can speed this up a lot by moving from | |
# "Load all -> manipulate/calc all" to "Load 1 row -> manipulate/calc 1 row" | |
function inlineCSVRead(path::String) | |
iostrm = open(path,"r") # open an IO stream to the file | |
total = 0. | |
n = 0 | |
while !eof(iostrm) | |
l = readline(iostrm) # reads in a line as a string | |
v = split(l,",") # split it into an array of strings | |
n += 1 | |
total += parse(Float64,v[1]) # cast string to a value | |
end | |
close(iostrm) | |
println("Completed read in... \ntotal = $total avg = $(total/n)") | |
end | |
@time inlineCSVRead(faux_data_5k) | |
# elapsed time: 0.594267434 seconds (412923464 bytes allocated, 43.24% gc time) | |
@time inlineCSVRead(faux_data_500k) | |
# elapsed time: 70.693168822 seconds (41283386680 bytes allocated, 41.23% gc time) | |
# Note | |
# ======== | |
# 1. 5k went from 4.04 secs to 0.59 secs | |
# 2. 500k went from 787.87 secs to 70.69 secs | |
# 3. Garbage collection went through the roof at > 41%!! | |
# 4. There's an important shift from thinking of data as a big table | |
# to thinking of data as a series of event. | |
# Really fast IO | |
# ================== | |
# 1. To get really fast IO in Julia two conditions need to be met | |
# Turn all non-numeric data into numeric data (one-hot encoding, word-vecs, etc) | |
# Store it as binary data, not csv | |
# Let's create some binary data | |
faux_data_fast_5k = joinpath(path,"faux_data_fast_5k.dat") | |
faux_data_fast_500k = joinpath(path,"faux_data_fast_500k.dat") | |
faux_data_fast_5M = joinpath(path,"faux_data_fast_5M.dat") | |
function writeFast(path, n,p) | |
iostrm = open(path,"w") | |
x = zeros(Float32,p) | |
for y in 1:n | |
y += 1 | |
write(iostrm,y) # <- we're writing a float directly to disk | |
rand!(x) # e.g. 32 x 32 images | |
write(iostrm,x) # <- we're writing a bunch of floats directly to disk | |
end | |
close(iostrm) | |
println("Completed write out...") | |
end | |
@time writeFast(faux_data_fast_5k, 5000, p) | |
# 5k rows of 1024 columns (32 x 32) 20.5MB and took 0.037 seconds (5128 bytes allocated)) | |
@time writeFast(faux_data_fast_500k, 500_000, p) | |
# 5k rows of 1024 columns (32 x 32) 20.5MB and took 3.509 seconds (5128 bytes allocated)) | |
# WARNING: Might not want to run. Prepare to talk during this part of pesentation (33 sec!!!) | |
@time writeFast(faux_data_fast_5M, 5_000_000, p) | |
# 5M rows of 1024 columns (32 x 32) 20GB takes about 33.15 secs | |
# Note | |
# =========== | |
# 1. 5k comparision: 1.68 seconds (csv) vs 0.037 seconds (binary) | |
# 2. 500k comparision: 339.7 seconds seconds (csv) vs 3.509 seconds (binary) | |
# 3. 5M no comparision but only took 33.509 seconds for about 20GB (binary) | |
# 4. Writing data in binary is both faster and uses less disk | |
# Next, let's try reading through it to calculate the mean of Y. | |
function fastRead(path::String, p) | |
iostrm = open(path,"r") | |
n = 0 | |
total = 0. | |
while !eof(iostrm) | |
y = read(iostrm,Int64,1) # <-- lots of memory allocation happening here! | |
x = read(iostrm,Float32,p) # <-- and here! | |
n += 1 | |
total += y | |
end | |
close(iostrm) | |
println("Completed read in... \ntotal = $total avg = $(total/(n))") | |
end | |
@time fastRead(faux_data_fast_5k, (32 * 32)) | |
# elapsed time: 0.035616242 seconds (22122928 bytes allocated, 57.90% gc time) | |
@time fastRead(faux_data_fast_500k, (32 * 32)) | |
# elapsed time: 2.705912819 seconds (2212002896 bytes allocated, 57.32% gc time) | |
@time fastRead(faux_data_fast_5M, (32 * 32)) | |
# elapsed time: 42.250951046 seconds (22120002848 bytes allocated, 39.31% gc time | |
# Note | |
# =========== | |
# 1. 5k read: 0.59 seconds (inline csv) vs 0.035 seconds (binary) | |
# 2. 500k read: 70.69 seconds seconds (inline csv) vs 2.71 seconds (binary) | |
# 3. 5Mk read took 42.02 seconds to read through a 20GB file (binary) | |
# 4. As expected this is a lot faster but we've a lot of garbage collection happening | |
# 5. We can do better! | |
# let's get rid of the unnecessary GC | |
function fastRead_No_GC(path::String, p) | |
iostrm = open(path,"r") | |
n = 0 | |
total = 0. | |
x = zeros(Float32,p) # <-- pre-allocated array of floats | |
y = zeros(Int64,1) # <-- pre-allocated array of ints (important: only 1 cell but still array!) | |
while !eof(iostrm) | |
read!(iostrm,y) # <-- re-using pre-allocated memory (very fast, no GC!) | |
read!(iostrm,x) # <-- re-using pre-allocated memory (very fast, no GC!)] | |
n += 1 | |
total += y | |
end | |
close(iostrm) | |
println("Completed read in... \ntotal = $total avg = $(total/(n))") | |
end | |
@time fastRead_No_GC(faux_data_fast_5k, (32 * 32)) | |
# elapsed time: 0.006328654 seconds (686096 bytes allocated) | |
@time fastRead_No_GC(faux_data_fast_500k, (32 * 32)) | |
# elapsed time: 0.567468112 seconds (68006256 bytes allocated, 5.51% gc time) | |
@time fastRead_No_GC(faux_data_fast_5M, (32 * 32)) | |
# elapsed time: 25.674385541 seconds (680007392 bytes allocated, 2.10% gc time) | |
# Note | |
# =========== | |
# 1. 5k read: 0.035 seconds (binary w/GC) vs 0.0063 seconds (binary low GC) | |
# 2. 500k read: 2.71 seconds (binary w/GC) vs 0.57 seconds (binary low GC) | |
# 3. 5M read took 25.6 seconds to read through a 20GB file (binary)! | |
# 4. Now that we have fast IO let's move on to inline analytics | |
# we're going to start by generating some data for a L2 logistic Regression | |
# you will need the Distributions and StreamStats.jl Package | |
# Pkg.add("Distributions") | |
# Pkg.clone("https://github.com/johnmyleswhite/StreamStats.jl.git") | |
using StreamStats, Distributions | |
invlogit(z::Real) = 1 / (1 + exp(-z)) | |
function fauxDataFast(path::String, n::Int64, β₀::Float64, β::Array{Float64,1}, addnoise::Bool) | |
iostrm = open(path,"w") | |
p = length(β) # number of features/columns | |
x = zeros(Float64,p) | |
pᵢ = 0. | |
y = 0 | |
for i in 1:n | |
x[1:p] = randn(p) | |
pᵢ = invlogit(β₀ + dot(β, x)) | |
y = addnoise? rand(Bernoulli(pᵢ)) : (pᵢ < 0.5?0:1) | |
write(iostrm,y) # writeout Y | |
write(iostrm,x) # writeout Xs | |
end | |
close(iostrm) | |
println("Completed faux data write out...") | |
end | |
p_data = 100 # number of features in our faux data | |
β₀ = randn() # the intercept we're trying to learn | |
β = randn(p_data) # the beta coefficients we're trying to learn | |
faux_data_no_noise_5k = joinpath(path,"faux_data_no_noise_5k.dat") | |
faux_data_with_noise_5k = joinpath(path,"faux_data_with_noise_5k.dat") | |
@time fauxDataFast(faux_data_no_noise_5k, 5_000, β₀, β, false) | |
# elapsed time: 0.013943915 seconds (4481656 bytes allocated) | |
@time fauxDataFast(faux_data_with_noise_5k, 5_000, β₀, β, true) | |
# elapsed time: 0.011365431 seconds (4481656 bytes allocated) | |
faux_data_no_noise_500k = joinpath(path,"faux_data_no_noise_500k.dat") | |
faux_data_with_noise_500k = joinpath(path,"faux_data_with_noise_500k.dat") | |
@time fauxDataFast(faux_data_no_noise_500k, 500_000, β₀, β, false) | |
# elapsed time: 1.283577979 seconds (448002008 bytes allocated) | |
@time fauxDataFast(faux_data_with_noise_500k,500_000, β₀, β, true) | |
# elapsed time: 1.357101841 seconds (448002216 bytes allocated) | |
faux_data_no_noise_5M = joinpath(path,"faux_data_no_noise_5M.dat") | |
faux_data_with_noise_5M = joinpath(path,"faux_data_with_noise_5M.dat") | |
@time fauxDataFast(faux_data_no_noise_5M, 5_000_000, β₀, β, false) | |
# elapsed time: 12.32556777 seconds | |
@time fauxDataFast(faux_data_with_noise_5M, 5_000_000, β₀, β, true) | |
# elapsed time: 12.91362036 seconds | |
function streamingLogistic(path::String, p, stat) | |
iostrm = open(path,"r") | |
x = zeros(Float64,p) # <-- pre-allocated array of floats | |
y = zeros(Int64,1) # <-- pre-allocated array of ints (important: only 1 cell but still array!) | |
n = 0 # number of examples | |
c = 0 # number of correct | |
while !eof(iostrm) | |
read!(iostrm,y) # <-- re-using pre-allocated memory (very fast, no GC!) | |
read!(iostrm,x) # <-- re-using pre-allocated memory (very fast, no GC!)] | |
yhat = invlogit(stat.β₀ + dot(stat.β, x)) # first predict using latest example | |
StreamStats.update!(stat, x, y[1]) # <- now use that example to update the model | |
c += (yhat < 0.5?0:1) == y[1]? 1:0 # number correct | |
n += 1 | |
end | |
close(iostrm) | |
pct_corr = round(c/n*100,2) | |
println("$n examples processed. $(pct_corr)% correct") | |
return pct_corr | |
end | |
λ = 0.00001 # L2 penalty | |
α = 0.01 # learning rate | |
# 5k examples | |
logReg = StreamStats.ApproxL2Logit(length(β),λ,α) | |
@time corr = streamingLogistic(faux_data_no_noise_5k, length(β), logReg) # 1st pass | |
# 88.72 % correct elapsed time: 0.006953184 seconds (2284 bytes allocated) | |
@time corr = streamingLogistic(faux_data_no_noise_5k, length(β), logReg) # 2nd pass | |
# 95.18 % correct elapsed time: 0.008152086 seconds (2508 bytes allocated) | |
empty!(logReg) # reset model | |
@time corr = streamingLogistic(faux_data_with_noise_5k, length(β), logReg) # 1st pass | |
# 86.92 % correct elapsed time: 0.007270419 seconds (2284 bytes allocated) | |
@time corr = streamingLogistic(faux_data_with_noise_5k, length(β), logReg) # 2nd pass | |
# 93.00 % correct elapsed time: 0.007605418 seconds (2284 bytes allocated) | |
# 500K exmaples | |
empty!(logReg) # reset model | |
@time corr = streamingLogistic(faux_data_no_noise_500k, length(β), logReg) # 1st pass | |
# 99.27 % correct elapsed time: 0.727339275 seconds (2496 bytes allocated) | |
@time corr = streamingLogistic(faux_data_no_noise_500k, length(β), logReg) # 2nd pass | |
# 99.75 % correct elapsed time: 0.712567852 seconds (2496 bytes allocated) | |
empty!(logReg) # reset model | |
@time corr = streamingLogistic(faux_data_with_noise_500k, length(β), logReg) # 1st pass | |
# 94.35 % correct elapsed time: 0.694565161 seconds (2496 bytes allocated) | |
@time corr = streamingLogistic(faux_data_with_noise_500k, length(β), logReg) # 2nd pass | |
# 94.48 % correct elapsed time: 0.731819073 seconds (2496 bytes allocated) | |
# 5M exmaples 4.04 GB files | |
empty!(logReg) # reset model | |
@time corr = streamingLogistic(faux_data_no_noise_5M, length(β), logReg) # 1st pass | |
# 99.79 % correct elapsed time: 7.277882121 seconds (2496 bytes allocated) | |
@time corr = streamingLogistic(faux_data_no_noise_5M, length(β), logReg) # 2nd pass | |
# 99.91 % correct elapsed time: 7.264460519 seconds (2496 bytes allocated) | |
empty!(logReg) # reset model | |
@time corr = streamingLogistic(faux_data_with_noise_5M, length(β), logReg) # 1st pass | |
# 94.43 % correct elapsed time: 7.241979489 seconds (2496 bytes allocated) | |
@time corr = streamingLogistic(faux_data_with_noise_5M, length(β), logReg) # 2nd pass | |
# 94.44 % correct elapsed time: 7.265981804 seconds (2496 bytes allocated) | |
# ========================= Woohoo! ============================= | |
# We just did a 5M row, 100 feature L2 regression in a little over 7.26 seconds. Wow! | |
# =============================================================== | |
# What's happening under the hood here? | |
# John Myles white's ApproxL2Logit update! from StreamStats.jl (annotated version) | |
function update!(stat::ApproxL2Logit, x::Vector{Float64}, y::Real) | |
stat.n += 1 | |
y_pred = invlogit(stat.β₀ + dot(stat.β, x)) # <- current best predicttion | |
ε = y - y_pred # <- calculate the error (simple eh?) | |
# Intercept <- single pass update of intercept | |
gr₀ = ε - stat.λ * stat.β₀ | |
stat.sum_gr₀_sq += gr₀^2 | |
α₀ = stat.α / sqrt(stat.sum_gr₀_sq) | |
stat.β₀ += α₀ * gr₀ | |
# Non-constant predictors <- single pass using adgrad to update of coefficients | |
for i in 1:length(x) # <- note the in-line update of each paramter in the vector (no GC) | |
grᵢ = ε * x[i] - stat.λ * stat.β[i] | |
stat.sum_gr_sq[i] += grᵢ^2 # <- in-line update | |
αᵢ = stat.α / sqrt(stat.sum_gr_sq[i]) | |
stat.β[i] += αᵢ * grᵢ # <- in-line update | |
end | |
return | |
end | |
# ======= End of Gist ============ | |
# Slides go on to explain NNGraph.jl and RecurrentNN.jl and... | |
# 1. "Sorry, L2 logistic regession is great, but I need non-linear algos" | |
# 2. Truns out that solution to any convex loss function can be approximated | |
# using online stochastic gradient descent (SGD) | |
# 3. How to cleanly deal with data schema using types | |
# Some interesting resources | |
# ========================= | |
# 1. Zygmunt Zając's FastML.com series on Vowpal Wabbit's and Ph | |
# VW blogs: http://fastml.com/blog/categories/vw/ | |
# Phraug.py Library (does alot in line processing) https://github.com/zygmuntz/phraug | |
# 2. John Langford's talk on Vowpal Wabbit's online learning | |
# Slides: http://cilvr.cs.nyu.edu/diglib/lsml/lecture01-online-linear.pdf | |
# part 1: http://techtalks.tv/talks/online-linear-learning-part-1/57924/ | |
# part 2: http://techtalks.tv/talks/online-linear-learning-part-2/57925/ | |
# 3. John Myles White's talk on online SDG | |
# https://www.hakkalabs.co/articles/streaming-data-analysis-and-online-learning/ | |
# 4. My own Reccurent.jl which use RMSProp (a variant of SDG) to learn deep recurrent nets | |
# https://github.com/Andy-P/RecurrentNN.jl |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment