Created
July 5, 2019 16:18
-
-
Save ConnorRigby/d2145c561eadf811d2c24080962cde45 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule FarmbotExt.API do | |
alias FarmbotExt.{API, JWT} | |
alias FarmbotCore.JSON | |
alias FarmbotCore.Asset.{ | |
StorageAuth, | |
FbosConfig, | |
FirmwareConfig | |
} | |
alias FarmbotCore.{BotState, BotState.JobProgress.Percent} | |
require FarmbotCore.Logger | |
import FarmbotCore.Config, only: [get_config_value: 3] | |
use Tesla | |
alias Tesla.Multipart | |
# adapter(Tesla.Adapter.Hackney) | |
plug(Tesla.Middleware.JSON, decode: &JSON.decode/1, encode: &JSON.encode/1) | |
plug(Tesla.Middleware.FollowRedirects) | |
plug(Tesla.Middleware.Logger) | |
def client do | |
binary_token = get_config_value(:string, "authorization", "token") | |
server = get_config_value(:string, "authorization", "server") | |
{:ok, _tkn} = JWT.decode(binary_token) | |
uri = URI.parse(server) | |
url = (uri.scheme || "https") <> "://" <> uri.host <> ":" <> to_string(uri.port) | |
Tesla.client([ | |
{Tesla.Middleware.BaseUrl, url}, | |
{Tesla.Middleware.Headers, | |
[ | |
{"content-type", "application/json"}, | |
{"authorization", "Bearer: " <> binary_token}, | |
{"user-agent", "farmbot-os"} | |
]} | |
]) | |
end | |
def storage_client(%StorageAuth{url: url}) do | |
Tesla.client( | |
[ | |
{Tesla.Middleware.BaseUrl, "https:" <> url}, | |
{Tesla.Middleware.Headers, | |
[ | |
{"user-agent", "farmbot-os"} | |
]} | |
], | |
[ | |
{Tesla.Middleware.FormUrlencoded, []} | |
] | |
) | |
end | |
@file_chunk 4096 | |
@progress_steps 50 | |
def upload_image(image_filename, meta \\ %{}) do | |
{:ok, changeset} = API.get_changeset(StorageAuth) | |
storage_auth = %StorageAuth{form_data: form_data} = Ecto.Changeset.apply_changes(changeset) | |
content_length = :filelib.file_size(image_filename) | |
{:ok, pid} = Agent.start_link(fn -> 0 end) | |
prog = %Percent{ | |
status: "working", | |
percent: 0, | |
file_type: Path.extname(image_filename), | |
type: :image, | |
time: DateTime.utc_now() | |
} | |
stream = | |
image_filename | |
|> File.stream!([], @file_chunk) | |
|> Stream.each(fn chunk -> | |
Agent.update(pid, fn sent -> | |
size = sent + byte_size(chunk) | |
prog = put_progress(prog, size, content_length) | |
BotState.set_job_progress(image_filename, prog) | |
size | |
end) | |
end) | |
mp = | |
Multipart.new() | |
|> Multipart.add_field("key", form_data.key) | |
|> Multipart.add_field("acl", form_data.acl) | |
|> Multipart.add_field("policy", form_data.policy) | |
|> Multipart.add_field("signature", form_data.signature) | |
|> Multipart.add_field("Content-Type", form_data."Content-Type") | |
|> Multipart.add_field("GoogleAccessId", form_data."GoogleAccessId") | |
|> Multipart.add_file_content(stream, Path.basename(image_filename), name: "file") | |
storage_resp = | |
storage_auth | |
|> API.storage_client() | |
|> API.request( | |
method: String.to_existing_atom(String.downcase(storage_auth.verb)), | |
body: mp | |
) | |
with {:ok, %{url: url, status: s}} when s > 199 and s < 300 <- storage_resp, | |
attachment_url <- Path.join(url, form_data.key), | |
client <- API.client(), | |
body <- %{attachment_url: attachment_url, meta: meta}, | |
{:ok, %{status: s}} = r when s > 199 and s < 300 <- API.post(client, "/api/images", body) do | |
BotState.set_job_progress(image_filename, %{prog | status: "complete", percent: 100}) | |
r | |
else | |
er -> | |
FarmbotCore.Logger.error(1, "Failed to upload image") | |
BotState.set_job_progress(image_filename, %{prog | percent: -1, status: "error"}) | |
er | |
end | |
end | |
def put_progress(prog, size, max) do | |
fraction = size / max | |
completed = trunc(fraction * @progress_steps) | |
percent = trunc(fraction * 100) | |
unfilled = @progress_steps - completed | |
IO.write( | |
:stderr, | |
"\r|#{String.duplicate("=", completed)}#{String.duplicate(" ", unfilled)}| #{percent}% (#{ | |
bytes_to_mb(size) | |
} / #{bytes_to_mb(max)}) MB" | |
) | |
status = if percent == 100, do: "complete", else: "working" | |
%Percent{ | |
prog | |
| status: status, | |
percent: percent | |
} | |
end | |
defp bytes_to_mb(bytes) do | |
trunc(bytes / 1024 / 1024) | |
end | |
@doc "helper for `GET`ing a path." | |
def get_body!(path) do | |
case API.get(API.client(), path) do | |
{:ok, %{body: body, status: 200}} -> | |
{:ok, body} | |
{:ok, %{body: error, status: status}} when is_binary(error) -> | |
get_body_error(path, status, error) | |
{:ok, %{body: %{"error" => error}, status: status}} when is_binary(error) -> | |
get_body_error(path, status, error) | |
{:ok, %{body: error, status: status}} when is_binary(error) -> | |
get_body_error(path, status, inspect(error)) | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
defp get_body_error(path, status, error) when is_binary(error) do | |
msg = """ | |
HTTP Error getting: #{path} | |
Status Code = #{status} | |
#{error} | |
""" | |
{:error, msg} | |
end | |
@callback get_changeset(module) :: {:ok, %Ecto.Changeset{}} | {:error, term()} | |
@callback get_changeset(data :: module | map(), Path.t()) :: | |
{:ok, %Ecto.Changeset{}} | {:error, term()} | |
@doc "helper for `GET`ing api resources." | |
def get_changeset(module) when is_atom(module) do | |
get_changeset(struct(module)) | |
end | |
def get_changeset(%module{} = data) do | |
get_body!(module.path()) | |
|> case do | |
{:ok, %{} = single} -> | |
{:ok, module.changeset(data, single)} | |
{:ok, many} when is_list(many) -> | |
{:ok, Enum.map(many, &module.changeset(data, &1))} | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
@doc "helper for `GET`ing api resources." | |
def get_changeset(asset, path) | |
# Hacks for dealing with these resources not having #show | |
def get_changeset(FbosConfig, _), | |
do: get_changeset(FbosConfig) | |
def get_changeset(FirmwareConfig, _), | |
do: get_changeset(FirmwareConfig) | |
def get_changeset(%FbosConfig{} = data, _), | |
do: get_changeset(data) | |
def get_changeset(%FirmwareConfig{} = data, _), | |
do: get_changeset(data) | |
def get_changeset(module, path) when is_atom(module) do | |
get_changeset(struct(module), path) | |
end | |
def get_changeset(%module{} = data, path) do | |
get_body!(Path.join(module.path(), to_string(path))) | |
|> case do | |
{:ok, %{} = single} -> | |
{:ok, module.changeset(data, single)} | |
{:ok, many} when is_list(many) -> | |
{:ok, Enum.map(many, &module.changeset(data, &1))} | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment