Skip to content

Instantly share code, notes, and snippets.

@cdesch
Last active December 8, 2017 16:03
Show Gist options
  • Save cdesch/0d2bad01128421a236fdaa5e819aaca3 to your computer and use it in GitHub Desktop.
Save cdesch/0d2bad01128421a236fdaa5e819aaca3 to your computer and use it in GitHub Desktop.
defmodule AdLock.DataProcessTest do
@timeout 60000
def start do
#list of files
# files = ["/Volumes/Backup/samples/raw-000000000000.csv.gz",
# "/Volumes/Backup/samples/raw-000000000001.csv.gz"]
#files = ["/Volumes/Backup/samples/raw-000000000000.csv.gz"]
#files = ["/Volumes/Backup/samples/raw-000000000000.csv"]
files = ["/Users/cj/Downloads/raw-1-small.csv.gz",
"/Users/cj/Downloads/raw-2-small.csv.gz",
"/Users/cj/Downloads/raw-3-small.csv.gz",
"/Users/cj/Downloads/raw-0-small.csv.gz"]
#files = ["/Users/cj/Downloads/raw-0-small.csv"]
files
|> Enum.map(fn(i) -> async_call_process_file(i) end)
|> Enum.each(fn(task) -> await_and_inspect(task) end)
end
defp async_call_process_file(i) do
Task.async(fn ->
:poolboy.transaction(:data_worker, fn(pid) -> GenServer.call(pid, {:process_file, i}, @timeout) end, @timeout)
end)
end
defp await_and_inspect(task), do: task |> Task.await(@timeout) |> IO.inspect()
end
# AdLock.DataProcessTest.start()
defmodule AdLock.DataProcessWorker do
use GenServer
alias NimbleCSV.RFC4180, as: CSVDecoder
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def init(_) do
{:ok, nil}
end
def handle_call({:process_file, x}, _from, state) do
IO.puts "> start #{inspect self()} processing file #{x}"
result = Benchwarmer.benchmark(fn -> process_file(x) end)
IO.puts "> completed #{inspect result} processing file #{x}"
{:reply, result, state}
end
#process the file into the database
def process_file(filename) do
IO.puts "processing #{filename}"
#:timer.sleep(1000)
File.stream!(filename, [:read, :compressed])
|> Stream.drop(1)
|> CSV.decode(headers:
[:identifier, :identifier_type, :timestamp],
num_workers: 10)
|> Enum.each(&process_record/1)
end
#Process the record into the database
def process_record(record) do
case record do
{:ok, record_data} ->
case AdLock.Population.get_person_by_identifier(record_data.identifier) do
nil -> commit_record(record_data)
person ->
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)})
end
{err, error_message} -> IO.inspect error_message
end
end
def process_record_lite(record) do
case record do
{:ok, record_data} ->
case AdLock.Population.create_person(%{identifier: record_data.identifier, identifier_type: AdLock.Population.identifier_type_decoder(record_data.identifier_type)}) do
{:ok, person} ->
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)})
{:error, _} ->
person = AdLock.Population.get_person_by_identifier(record_data.identifier)
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)})
end
{err, error_message} -> IO.inspect error_message
end
end
def commit_record(record_data) do
case AdLock.Population.create_person(%{identifier: record_data.identifier, identifier_type: AdLock.Population.identifier_type_decoder(record_data.identifier_type)}) do
{:ok, person} ->
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)})
{:error, _} ->
person = AdLock.Population.get_person_by_identifier(record_data.identifier)
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)})
end
end
def extract_float(value) do
{float_value, string} = Float.parse(value)
float_value
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment