def create(%{req_headers: headers, remote_ip: remote_ip, request_path: path, body_params: payload, query_params: query_params} = conn, %{"source" => source }) do Task.async fn -> merged_headers = headers |> add_path(path) |> add_ip(remote_ip) |> Enum.into(%{}) changeset = Event.validate(%Event{}, source, merged_headers, payload, query_params) case Repo.insert(changeset) do {:ok, event} -> event = Map.drop(event, [:__meta__, :__struct__]) Steelbroker.Endpoint.broadcast!("events:all", "new-event", %{response: event}) Repo.all(Subscription) |> Enum.each fn (sub) -> Task.async fn -> HTTPoison.post(sub.url, Poison.encode!(event)) end end end end send_resp(conn, 200, "OK") end