Forked from alistaire47/2020-09-22_LA-RUG_Uncomfortable-data.R
Created
September 23, 2020 23:59
-
-
Save han-tun/266574630ee3fd296c1d2ebecdee4b21 to your computer and use it in GitHub Desktop.
2020-09-22 LA R Users Group talk: Uncomfortable data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#' Uncomfortable data | |
#' LA R Users Group | |
#' 2020-09-22 | |
#' Edward Visel | |
#' Setup | |
library(tidyverse) | |
path <- '/tmp/nycflights13/flights.csv' | |
dir.create(dirname(path)) | |
data.table::fwrite(nycflights13::flights, file = path) | |
##################################### | |
### Demo 1: Subsetting strategies ### | |
##################################### | |
#' 1. Read then subset | |
flights <- read_csv(path) | |
small_1 <- flights %>% select(carrier, origin, dest, year, month, day, dep_time) | |
#' 2. Subset on read | |
small_2d <- data.table::fread(path, select = c('carrier', 'origin', 'dest', 'year', 'month', 'day', 'dep_time')) | |
small_2t <- read_csv(path, col_types = cols_only( | |
carrier = col_guess(), | |
origin = col_guess(), | |
dest = col_guess(), | |
year = col_guess(), | |
month = col_guess(), | |
day = col_guess(), | |
dep_time = col_guess() | |
)) | |
#' 3. Batch, subset, and aggregate | |
## Unknown file size | |
# setup | |
batch_size = 10000L | |
n = batch_size | |
offset = 0L | |
col_names <- names(read.csv(path, nrows = 1)) | |
df_list <- list() | |
# iterate | |
while (n == 10000L) { | |
# read | |
message('Reading rows from ', offset, ' up to ', offset + batch_size) | |
df <- read.csv(path, skip = offset, nrows = batch_size, col.names = col_names, stringsAsFactors = FALSE) | |
# subset and append | |
df_list[[length(df_list) + 1L]] <- df[c('carrier', 'origin', 'dest', 'year', 'month', 'day', 'dep_time')] | |
# increment | |
n <- nrow(df) | |
offset <- offset + batch_size | |
} | |
# aggregate | |
small_3b <- do.call(rbind, df_list) | |
## Precalculating file size | |
batch_size <- 10000L | |
col_names <- names(read_csv(path, n_max = 0)) | |
n_lines <- scan( | |
text = system(paste('wc -l', path), intern = TRUE), | |
what = integer(1), | |
nmax = 1 | |
) | |
offsets <- seq(1L, n_lines, by = batch_size) | |
small_3t <- map_dfr(offsets, function(offset){ | |
message('Reading rows ', offset, ' to ', min(offset + batch_size, n_lines)) | |
suppressMessages( | |
read_csv(path, skip = offset, col_names = col_names, n_max = batch_size) | |
) %>% | |
select(carrier, origin, dest, year, month, day, dep_time) | |
}) | |
############################ | |
### Demo 2: Storing data ### | |
############################ | |
flights_dir <- '/tmp/flights' | |
data('flights', package = 'nycflights13') | |
flights %>% | |
group_by(carrier, year, month) %>% | |
nest() %>% | |
mutate( | |
filepath = file.path(flights_dir, carrier, year, sprintf('%02s.parquet', month)) | |
) %>% | |
pwalk(function(...) { | |
dots <- list(...) | |
if (!dir.exists(dirname(dots$filepath))) { | |
dir.create(dirname(dots$filepath), recursive = TRUE) | |
} | |
arrow::write_parquet( | |
dots$data, dots$filepath, | |
version = '1.0', chunk_size = 10000L, | |
compression = 'snappy' | |
) | |
}) | |
list.dirs(flights_dir) | |
list.files(file.path(flights_dir, "AA", "2013"), full.names = TRUE) | |
system('tree /tmp/flights') | |
############################## | |
### Demo 3: Arrow Datasets ### | |
############################## | |
#' Single-file CSV Arrow Dataset | |
list.files('/tmp/nycflights13/') | |
flights_ds_c <- arrow::open_dataset('/tmp/nycflights13/', format = 'csv') | |
flights_ds_c | |
flights_ds_c$files | |
flights_ds_c$format | |
flights_ds_c$num_cols | |
flights_ds_c$num_rows | |
flights_ds_c %>% | |
select(carrier, origin, dest, year, month, day, dep_time) | |
flights_ds_c %>% | |
select(carrier, origin, dest, year, month, day, dep_time) %>% | |
collect() | |
flights_ds_c %>% | |
select(carrier, origin, dest) %>% | |
filter(dest == 'LAX') %>% | |
collect() | |
#' Multi-file, multi-directory Parquet Arrow Dataset | |
flights_ds_p <- arrow::open_dataset(flights_dir, partitioning = c('carrier', 'year')) | |
flights_ds_p | |
flights_ds_p$files | |
flights_ds_p$format | |
flights_ds_p$num_cols | |
flights_ds_p$num_rows | |
flights_ds_p %>% | |
select(carrier, origin, dest, matches('time$')) %>% | |
collect() | |
flights_ds_p %>% | |
filter(carrier == 'UA', distance > 1000, !is.na(arr_delay)) %>% | |
collect() | |
#' Limitations | |
flights_ds_p %>% | |
group_by(carrier) %>% # works | |
filter(n() > 10000) # most expressions beyond inequality like `n()` are not supported | |
flights_ds_p %>% | |
# mutation (in `mutate`, `transmute`, or `group_by`) not supported | |
mutate(arr_time = hms::hms(hours = arr_time %/% 100L, minutes = arr_time %% 100L)) | |
flights_ds_p %>% | |
group_by(carrier) %>% | |
summarise(mean_arr_delay = mean(arr_delay)) # `summarise()` not supported | |
#' Demo 3 is in the accompanying shell script | |
######################## | |
### Demo 4: sergeant ### | |
######################## | |
library(sergeant) | |
#' DBI interface | |
dbi_con <- dbConnect(Drill()) | |
DBI::dbGetQuery(dbi_con, 'SELECT * FROM dfs.tmp.`flights` LIMIT 3') | |
#' REST interface | |
rest_con <- drill_connection() | |
drill_query(rest_con, 'SELECT * FROM dfs.tmp.`flights` LIMIT 3') | |
#' dbplyr interface | |
tbl(dbi_con, 'dfs.tmp.`flights`') | |
#' or | |
drill <- src_drill() | |
tbl(drill, 'dfs.tmp.`flights`') | |
#' SQL translation | |
?drill_custom_functions | |
sql_translate_env(dbi_con) | |
flights_drill <- tbl(drill, 'dfs.tmp.`flights`') %>% | |
mutate( | |
carrier = dir0, | |
year = dir1, | |
# as.integer is translated; LEFT is passed through to Drill | |
month = as.integer(LEFT(filename, 2L)) | |
) | |
flights_drill | |
flights_drill %>% show_query() | |
flights_drill %>% count() | |
flights_drill %>% count(carrier = dir0, sort = TRUE) | |
flights_drill %>% count(carrier = dir0, sort = TRUE) %>% collect() | |
flights_drill %>% | |
filter(dest %in% c('LAX', 'ONT', 'SNA', 'BUR', 'LGB')) %>% | |
group_by(carrier, origin, dest) %>% | |
summarise( | |
mean_arr_delay = mean(arr_delay), | |
sd_arr_delay = sd(arr_delay), | |
max_arr_delay = max(arr_delay), | |
n = n() | |
) %>% | |
arrange(mean_arr_delay) %>% | |
collect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
############################ | |
### Demo 4: Apache Drill ### | |
############################ | |
# Installation | |
brew info apache-drill | |
# brew install apache-drill | |
# Start Drill | |
drill-embedded | |
-- See Drill commands | |
!help | |
-- See workspaces | |
SHOW SCHEMAS; | |
SELECT * | |
-- built-in dataset | |
FROM cp.`employee.json` | |
LIMIT 3; | |
-- real CSV we wrote | |
SELECT * | |
FROM dfs.tmp.`nycflights13/flights.csv` | |
LIMIT 1; | |
-- subset columns and rows | |
SELECT carrier, flight, tailnum | |
FROM dfs.tmp.`nycflights13/flights.csv` | |
WHERE distance > 1000 | |
LIMIT 10; | |
-- grouped operations! aggregation! sorting! | |
SELECT carrier, count(*) as n | |
FROM dfs.tmp.`nycflights13/flights.csv` | |
GROUP BY carrier | |
ORDER BY n DESC; | |
SELECT carrier, count(*) as n | |
FROM dfs.tmp.`nycflights13/flights.csv` | |
WHERE distance < 100 | |
GROUP BY carrier | |
ORDER BY n DESC; | |
-- navigate | |
SHOW FILES | |
FROM dfs.tmp.`flights`; | |
SHOW FILES | |
FROM dfs.tmp.`flights/AA/2013/`; | |
-- query a whole directory of files with globbing | |
SELECT * | |
FROM dfs.tmp.`flights/AA/*/*.parquet` | |
LIMIT 3; | |
SELECT * | |
FROM dfs.tmp.`flights/*` | |
LIMIT 3; | |
-- implicit columns | |
SELECT DISTINCT | |
FQN, | |
FILEPATH, | |
FILENAME, | |
SUFFIX | |
-- if path refers to a directory, sucks up everything inside | |
FROM dfs.tmp.`flights` | |
LIMIT 10; | |
-- calculate useful summary stats and aggregations | |
SELECT | |
-- extract carrier from subdirectory | |
dir0 AS carrier, | |
-- use any ANSI SQL functions | |
ROUND(AVG(dep_delay), 2) AS mean_dep_delay, | |
ROUND(AVG(arr_delay), 2) AS mean_arr_delay, | |
count(*) AS n | |
FROM dfs.tmp.`flights` | |
GROUP BY carrier | |
ORDER BY n DESC; | |
SELECT | |
dir1 AS year, | |
-- extract month from filename | |
LEFT(FILENAME, 2) AS month, | |
ROUND(AVG(arr_delay), 2) AS mean_arr_delay, | |
count(*) AS n | |
FROM dfs.tmp.`flights/*` | |
WHERE dir0 = 'UA' | |
GROUP BY year, month | |
ORDER BY year, month; | |
-- how to quit Drill shell. !exit also works. | |
!quit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment