You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
defmoduleBenchCSVdo# Original input specification: ~250 MB, 35_000 rows, 2_000 cols@default_rows35_000@default_cols2_000@default_chunk_rows1_000@default_filename"nimble_bench.csv"defgenerate!(opts\\[])dorows=Keyword.get(opts,:rows,@default_rows)cols=Keyword.get(opts,:cols,@default_cols)chunk_rows=Keyword.get(opts,:chunk_rows,@default_chunk_rows)dir=Keyword.get(opts,:dir,System.tmp_dir!())filename=Keyword.get(opts,:filename,@default_filename)path=Path.join(dir,filename)write!(path,rows,cols,chunk_rows)mb=path|>File.stat!()|>Map.get(:size)|>Kernel./(1_048_576)|>Float.round(1)IO.puts("#{mb} MB written to #{path}")pathenddefpwrite!(path,rows,cols,chunk_rows)dofull_chunks=div(rows,chunk_rows)remainder=rem(rows,chunk_rows)data_row=build_data_row(cols)File.open!(path,[:write,:raw,:binary],fnf->IO.binwrite(f,build_header(cols)<>"\n")iffull_chunks>0dochunk=build_chunk(data_row,chunk_rows)for_<-1..full_chunks,do: IO.binwrite(f,chunk)endifremainder>0,do: IO.binwrite(f,build_chunk(data_row,remainder))end)enddefpbuild_header(cols),do: Enum.map_join(1..cols,",",&"col#{&1}")# Each column gets a distinct 3-byte value so the BEAM can't over-optimise binary sharingdefpbuild_data_row(cols)doEnum.map_join(1..cols,",",fni-><<97+rem(i,26),48+rem(i,10),48+rem(div(i,10),10)>>end)enddefpbuild_chunk(data_row,n),do: Enum.map_join(1..n,"\n",fn_->data_rowend)<>"\n"endBenchCSV.generate!()
267.0 MB written to /tmp/nimble_bench.csv
"/tmp/nimble_bench.csv"
2. Parsing strategies
defmoduleBenchdo@csv_pathSystem.tmp_dir!()|>Path.join("nimble_bench.csv")@schedulersSystem.schedulers_online()# one contiguous binary, single pass, zero field allocationdefparse_stringdo@csv_path|>File.read!()|>NimbleCSV.RFC4180.parse_string(skip_headers: true)|>Enum.each(&length/1)end# lazy, low memory, single-threaded# `read_ahead` avoids one syscall per line (nice for wide files)defparse_streamdo@csv_path|>File.stream!(read_ahead: 1_000_000)|>NimbleCSV.RFC4180.parse_stream(skip_headers: true)|>Stream.each(&length/1)|>Stream.run()end# chunk split across all schedulers# `ordered: false` skips the merge stepdefparallel_parse_stringdo@csv_path|>File.read!()|>chunk_binary(@schedulers)|>Task.async_stream(&parse_chunk/1,max_concurrency: @schedulers,ordered: false,timeout: :infinity)|>Stream.run()end# Splitsbinary into n line-aligned chunks, header strippeddefpchunk_binary(binary,n)do[_header|lines]=:binary.split(binary,"\n",[:global])lines|>Enum.chunk_every(max(1,div(length(lines),n)))|>Enum.map(&Enum.join(&1,"\n"))enddefpparse_chunk(chunk)dochunk|>NimbleCSV.RFC4180.parse_string(skip_headers: false)|>Enum.each(&length/1)enddefexplorer_eagerdo@csv_path|>Explorer.DataFrame.from_csv!(infer_schema_length: 0)|>Explorer.DataFrame.n_rows()enddefexplorer_lazydo@csv_path|>Explorer.DataFrame.from_csv!(lazy: true,infer_schema_length: 0)|>Explorer.DataFrame.collect()|>Explorer.DataFrame.n_rows()endend