Last active
December 8, 2017 16:03
-
-
Save cdesch/0d2bad01128421a236fdaa5e819aaca3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule AdLock.DataProcessTest do | |
@timeout 60000 | |
def start do | |
#list of files | |
# files = ["/Volumes/Backup/samples/raw-000000000000.csv.gz", | |
# "/Volumes/Backup/samples/raw-000000000001.csv.gz"] | |
#files = ["/Volumes/Backup/samples/raw-000000000000.csv.gz"] | |
#files = ["/Volumes/Backup/samples/raw-000000000000.csv"] | |
files = ["/Users/cj/Downloads/raw-1-small.csv.gz", | |
"/Users/cj/Downloads/raw-2-small.csv.gz", | |
"/Users/cj/Downloads/raw-3-small.csv.gz", | |
"/Users/cj/Downloads/raw-0-small.csv.gz"] | |
#files = ["/Users/cj/Downloads/raw-0-small.csv"] | |
files | |
|> Enum.map(fn(i) -> async_call_process_file(i) end) | |
|> Enum.each(fn(task) -> await_and_inspect(task) end) | |
end | |
defp async_call_process_file(i) do | |
Task.async(fn -> | |
:poolboy.transaction(:data_worker, fn(pid) -> GenServer.call(pid, {:process_file, i}, @timeout) end, @timeout) | |
end) | |
end | |
defp await_and_inspect(task), do: task |> Task.await(@timeout) |> IO.inspect() | |
end | |
# AdLock.DataProcessTest.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule AdLock.DataProcessWorker do | |
use GenServer | |
alias NimbleCSV.RFC4180, as: CSVDecoder | |
def start_link(_) do | |
GenServer.start_link(__MODULE__, nil, []) | |
end | |
def init(_) do | |
{:ok, nil} | |
end | |
def handle_call({:process_file, x}, _from, state) do | |
IO.puts "> start #{inspect self()} processing file #{x}" | |
result = Benchwarmer.benchmark(fn -> process_file(x) end) | |
IO.puts "> completed #{inspect result} processing file #{x}" | |
{:reply, result, state} | |
end | |
#process the file into the database | |
def process_file(filename) do | |
IO.puts "processing #{filename}" | |
#:timer.sleep(1000) | |
File.stream!(filename, [:read, :compressed]) | |
|> Stream.drop(1) | |
|> CSV.decode(headers: | |
[:identifier, :identifier_type, :timestamp], | |
num_workers: 10) | |
|> Enum.each(&process_record/1) | |
end | |
#Process the record into the database | |
def process_record(record) do | |
case record do | |
{:ok, record_data} -> | |
case AdLock.Population.get_person_by_identifier(record_data.identifier) do | |
nil -> commit_record(record_data) | |
person -> | |
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)}) | |
end | |
{err, error_message} -> IO.inspect error_message | |
end | |
end | |
def process_record_lite(record) do | |
case record do | |
{:ok, record_data} -> | |
case AdLock.Population.create_person(%{identifier: record_data.identifier, identifier_type: AdLock.Population.identifier_type_decoder(record_data.identifier_type)}) do | |
{:ok, person} -> | |
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)}) | |
{:error, _} -> | |
person = AdLock.Population.get_person_by_identifier(record_data.identifier) | |
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)}) | |
end | |
{err, error_message} -> IO.inspect error_message | |
end | |
end | |
def commit_record(record_data) do | |
case AdLock.Population.create_person(%{identifier: record_data.identifier, identifier_type: AdLock.Population.identifier_type_decoder(record_data.identifier_type)}) do | |
{:ok, person} -> | |
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)}) | |
{:error, _} -> | |
person = AdLock.Population.get_person_by_identifier(record_data.identifier) | |
AdLock.Population.create_bump(%{person_id: person.id, timestamp: Timex.parse!(record_data.timestamp, "%Y-%m-%d %T %Z", :strftime)}) | |
end | |
end | |
def extract_float(value) do | |
{float_value, string} = Float.parse(value) | |
float_value | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment