Last active
June 16, 2020 11:57
-
-
Save krainboltgreene/dd2ed2677217c505f9a96872b9b1db2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule LiveDataImport.ESI do | |
require Logger | |
@default_host URI.parse("https://esi.evetech.net/") | |
@default_fetch_query %{datasource: "tranquility"} | |
@type allowed_verbs :: :get | :patch | :post | :put | |
@spec read(String.t(), map) :: Flow.t() | |
def read(resource, query \\ %{datasource: "tranquility"}) | |
when is_binary(resource) and is_map(query) do | |
{:get, resource, query} | |
|> fetch() | |
|> paginate() | |
|> Flow.map(&parse/1) | |
|> Flow.map(&cleanup/1) | |
|> Flow.reject(&is_nil/1) | |
end | |
@spec fetch({verb, resource, query}) :: | |
{verb, resource, query, any} | |
when verb: allowed_verbs, resource: String.t(), query: map | |
def fetch({verb, resource, query}) | |
when is_atom(verb) and is_binary(resource) and is_map(query) do | |
encoded_query = URI.encode_query(Map.merge(@default_fetch_query, query)) | |
uri = URI.merge(URI.merge(@default_host, resource), encoded_query) |> URI.to_string() | |
etag_header = Cachex.get( | |
:live_data_import, | |
"esi/#{verb} #{uri}/etag" | |
) |> case do | |
{:ok, nil} -> | |
Logger.info("Not using etag") | |
[] | |
{:ok, value} -> | |
Logger.info("Using etag", etag: value) | |
[{"If-None-Match", value}] | |
end | |
headers = [ | |
{"Accept", "application/json"}, | |
{"User-Agent", "Affinity MATRiX Service Cluster v1 ([email protected])"} | |
] ++ etag_header | |
Logger.info("Initiating request", uri: uri, verb: verb, query: encoded_query) | |
{ | |
verb, | |
resource, | |
query, | |
Mojito.request( | |
verb, | |
uri, | |
headers | |
) | |
|> case do | |
{:ok, %{headers: headers} = response} -> | |
last_modified = Utilities.find_header(headers, "last-modified") | |
pages = | |
Utilities.find_header(headers, "x-pages", {"x-pages", "1"}) |> String.to_integer() | |
etag = Utilities.find_header(headers, "etag") | |
Logger.info("Merging important headers into response", | |
uri: uri, | |
last_modified: last_modified, | |
pages: pages, | |
etag: etag | |
) | |
{:ok, | |
Map.merge(response, %{uri: uri, last_modified: last_modified, pages: pages, etag: etag})} | |
{:error, _} = rejection -> | |
rejection | |
end | |
|> case do | |
{:ok, %{status_code: 200, etag: etag}} = response -> | |
Logger.info("Server: OK", status_code: 200) | |
Logger.info("Storing response etag into cache", | |
key: "esi/#{verb} #{uri}/etag", | |
etag: etag | |
) | |
{:ok, true} = | |
Cachex.put( | |
:live_data_import, | |
"esi/#{verb} #{uri}/etag", | |
etag | |
) | |
Logger.info("Storing response", key: "esi/#{verb} #{uri}/#{etag}") | |
{:ok, true} = | |
Cachex.put( | |
:live_data_import, | |
"esi/#{verb} #{uri}/#{etag}", | |
response | |
) | |
response | |
{:ok, %{status_code: 304, etag: etag}} -> | |
Logger.info("Server: Not Modified", status_code: 304) | |
Logger.info("Using stored response", key: "esi/#{verb} #{uri}/#{etag}", etag: etag) | |
Cachex.get!( | |
:live_data_import, | |
"esi/#{verb} #{uri}/#{etag}" | |
) | |
{:ok, %{status_code: 401}} = response -> | |
Logger.warn("Server: Unauthorized", status_code: 401) | |
{:error, response} | |
{:ok, %{status_code: 404}} = response -> | |
Logger.warn("Server: Not Found", status_code: 404) | |
{:error, response} | |
{:ok, %{status_code: 500}} = response -> | |
Logger.warn("Server: Internal Server Exception", status_code: 500) | |
{:error, response} | |
{:ok, %{status_code: 501}} = response -> | |
Logger.warn("Server: Not implemented", status_code: 501) | |
{:error, response} | |
{:ok, %{status_code: 502}} = response -> | |
Logger.warn("Server: Bad Gateway", status_code: 502) | |
{:retry, response} | |
{:ok, %{status_code: 503}} = response -> | |
Logger.warn("Server: Service Unavailable", status_code: 503) | |
{:retry, response} | |
{:ok, %{status_code: 504}} = response -> | |
Logger.warn("Server: Gateway Timeout", status_code: 504) | |
{:retry, response} | |
{:error, message} = rejection -> | |
Logger.warn("Client: Failure", message: message) | |
rejection | |
end | |
|> case do | |
{:retry, _} -> | |
Logger.info("Retrying request") | |
# NOTE: This is definitely going to break spec | |
fetch({verb, resource, query}) | |
# NOTE: What about failure here? | |
resolution -> | |
resolution | |
end | |
} | |
end | |
# TODO: Write a paginate that allows for failure | |
@spec paginate({verb, resource, query, {:ok, map}}) :: Flow.t() | |
when verb: allowed_verbs, resource: String.t(), query: map | |
def paginate({verb, resource, query, {:ok, %{pages: pages} = response}}) | |
when is_atom(verb) and is_binary(resource) and is_map(query) and is_map(response) and | |
is_integer(pages) do | |
Logger.info("Spinning up paginated requests", pages: pages) | |
1..pages | |
|> Flow.from_enumerable() | |
|> Flow.map(fn | |
1 -> | |
{verb, resource, query, {:ok, response}} | |
page -> | |
Logger.info("Spawning page request", page: page) | |
fetch({verb, resource, Map.merge(query, %{page: page})}) | |
end) | |
end | |
# TODO: Write a parse that allows for failure | |
@spec parse({verb, resource, query, {:ok, map}}) :: | |
{:ok, any} | |
| {:error, atom} | |
| {:error, Jason.DecodeError.t()} | |
when verb: allowed_verbs, resource: String.t(), query: map | |
def parse({verb, resource, query, {:ok, %{body: body, uri: uri, etag: etag} = response}}) | |
when is_atom(verb) and is_binary(resource) and is_map(query) and is_map(response) and | |
is_binary(etag) and is_binary(uri) do | |
Logger.info("Fetching JSON body from cache", key: "esi/#{verb} #{uri}/#{etag}/parsed") | |
Cachex.fetch( | |
:live_data_import, | |
"esi/#{verb} #{uri}/#{etag}/parsed", | |
fn _key -> | |
Logger.info("Decoding JSON") | |
Jason.decode(body) | |
|> case do | |
{:ok, data} -> | |
Logger.info("Committing JSON to cache") | |
{:commit, data} | |
{:error, _} = failure -> | |
Logger.info("Failed to decode JSON") | |
{:ignore, failure} | |
end | |
end | |
) | |
|> case do | |
{:ok, _} = success -> success | |
{:commit, data} -> {:ok, data} | |
{:ignore, data} -> {:ok, data} | |
rejection -> rejection | |
end | |
end | |
@spec cleanup({:error, any} | {:ok, response}) :: response | nil when response: any | |
def cleanup({:ok, response}) do | |
response | |
end | |
def cleanup({:error, message}) do | |
Logger.error(Kernel.inspect(message)) | |
nil | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment