Skip to content

Instantly share code, notes, and snippets.

@cdesch
Created December 8, 2017 16:13
Show Gist options
  • Save cdesch/32a5c617bd7095f01f57055272493a82 to your computer and use it in GitHub Desktop.
Save cdesch/32a5c617bd7095f01f57055272493a82 to your computer and use it in GitHub Desktop.
def try_async(files, num_process) do
files
|> Enum.chunk_every(round(length(files)/num_process))
|> Enum.each(fn(x) -> load_async(x, 3) end)
end
#Break that chunk into a few tasks to run parallel
def load_async(files, chunk_by) do
files
|> Enum.chunk_every(chunk_by)
|> Enum.map(&Task.async(AdLock.Processor , :process_job, [&1]))
|> Enum.map(&Task.await(&1))
end
def process_job(files) do
files
|> Enum.each(&process_file/1)
end
#process the file into the database
def process_file(filename) do
IO.puts "processing #{filename}"
#:timer.sleep(1000)
File.stream!(filename, [:read, :compressed])
|> Stream.drop(1)
|> CSV.decode(headers: [:identifier, :identifier_type, :timestamp)
|> Enum.each(&process_record/1)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment