Created
          July 10, 2025 05:15 
        
      - 
      
 - 
        
Save llk23r/5ee15677ac7e177f33dfbee8e7a96045 to your computer and use it in GitHub Desktop.  
    Ampcode integration in Elixir Livebook
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | # ~/.iex.exs | |
| # Idiomatic Amp CLI integration for IEx | |
| defmodule AmpIEx do | |
| @moduledoc """ | |
| Elixir integration for Amp CLI with proper error handling and functional design. | |
| """ | |
| # Module attributes for constants | |
| @amp_executable "amp" | |
| @thread_id_regex ~r/(T-[a-f0-9-]+)/ | |
| @thread_url_regex ~r/Thread: https:\/\/ampcode\.com\/threads\/(T-[a-f0-9-]+)/ | |
| @default_visibility "public" | |
| @valid_visibilities ~w(public private team) | |
| @valid_formats ~w(ui jsonl) | |
| @type thread_id :: String.t() | |
| @type visibility :: String.t() | |
| @type amp_opts :: keyword() | |
| @type command_result :: {:ok, String.t()} | {:error, atom()} | |
| # Public API | |
| def q(thread_id \\ nil, opts \\ []) | |
| def q(nil, opts) do | |
| case execute_query(get_current_thread(), opts) do | |
| {:ok, _} -> :ok | |
| error -> error | |
| end | |
| end | |
| def q(thread_id, opts) when is_binary(thread_id) do | |
| set_current_thread(thread_id) | |
| case execute_query(thread_id, opts) do | |
| {:ok, _} -> :ok | |
| error -> error | |
| end | |
| end | |
| @doc "Create new thread and set as current" | |
| @spec q_new(amp_opts()) :: {:ok, thread_id()} | {:error, atom()} | |
| def q_new(opts \\ []) do | |
| clear_current_thread() | |
| with {:ok, cmd} <- build_command(opts, ["threads", "new"]), | |
| {:ok, output} <- execute_command_silent(cmd), | |
| {:ok, thread_id} <- extract_thread_id_from_output(output) do | |
| set_current_thread(thread_id) | |
| display_success("New thread created: #{thread_id}") | |
| {:ok, thread_id} | |
| else | |
| error -> handle_error(error, "Failed to create new thread") | |
| end | |
| end | |
| @doc "Continue with specific thread" | |
| @spec q_continue(thread_id() | nil) :: {:ok, thread_id()} | {:error, atom()} | |
| def q_continue(nil), do: handle_current_thread(&continue_with_thread/1) | |
| def q_continue(thread_id) when is_binary(thread_id), do: continue_with_thread(thread_id) | |
| @doc "Fork existing thread" | |
| @spec q_fork(thread_id() | nil, amp_opts()) :: {:ok, thread_id()} | {:error, atom()} | |
| def q_fork(source_id \\ nil, opts \\ []) | |
| def q_fork(nil, opts), do: handle_current_thread(&fork_thread(&1, opts)) | |
| def q_fork(source_id, opts) when is_binary(source_id), do: fork_thread(source_id, opts) | |
| @doc "List all threads" | |
| @spec q_list(amp_opts()) :: {:ok, String.t()} | {:error, atom()} | |
| def q_list(opts \\ []) do | |
| with {:ok, cmd} <- build_command(opts, ["threads", "list"]), | |
| {:ok, output} <- execute_command_silent(cmd) do | |
| display_output(output) | |
| {:ok, output} | |
| else | |
| error -> handle_error(error, "Failed to list threads") | |
| end | |
| end | |
| @doc "Share thread with specified visibility" | |
| @spec q_share(thread_id() | nil, visibility()) :: {:ok, String.t()} | {:error, atom()} | |
| def q_share(thread_id \\ nil, visibility \\ @default_visibility) | |
| def q_share(nil, visibility), do: handle_current_thread(&share_thread(&1, visibility)) | |
| def q_share(thread_id, visibility) when is_binary(thread_id) and visibility in @valid_visibilities do | |
| share_thread(thread_id, visibility) | |
| end | |
| def q_share(_, visibility), do: {:error, {:invalid_visibility, visibility}} | |
| @doc "Compact thread to reduce token usage" | |
| @spec q_compact(thread_id() | nil) :: {:ok, String.t()} | {:error, atom()} | |
| def q_compact(nil), do: handle_current_thread(&compact_thread/1) | |
| def q_compact(thread_id) when is_binary(thread_id), do: compact_thread(thread_id) | |
| @doc "Show available tools" | |
| @spec q_tools(amp_opts()) :: {:ok, String.t()} | {:error, atom()} | |
| def q_tools(opts \\ []) do | |
| with {:ok, cmd} <- build_command(opts, ["tools", "show"]), | |
| {:ok, output} <- execute_command_silent(cmd) do | |
| display_output(output) | |
| {:ok, output} | |
| else | |
| error -> handle_error(error, "Failed to show tools") | |
| end | |
| end | |
| @doc "Show current thread status" | |
| @spec q_status() :: {:ok, thread_id() | nil} | |
| def q_status do | |
| case get_current_thread() do | |
| nil -> | |
| display_info("No active thread") | |
| {:ok, nil} | |
| thread_id -> | |
| display_info("Active thread: #{thread_id}") | |
| {:ok, thread_id} | |
| end | |
| end | |
| @doc "Livebook-friendly query that takes prompt directly (no interactive input)" | |
| @spec q_livebook(String.t(), thread_id() | nil, amp_opts()) :: {:ok, String.t()} | {:error, atom()} | |
| def q_livebook(prompt, thread_id \\ nil, opts \\ []) do | |
| case execute_query_direct(prompt, thread_id, opts) do | |
| {:ok, _} -> :ok | |
| error -> error | |
| end | |
| end | |
| # Quick access variants | |
| def q_json(thread_id \\ nil), do: q(thread_id, format: "jsonl") | |
| def q_auto(thread_id \\ nil), do: q(thread_id, dangerously_allow_all: true) | |
| def q_private(thread_id \\ nil), do: q(thread_id, visibility: "private") | |
| def q_team(thread_id \\ nil), do: q(thread_id, visibility: "team") | |
| # Livebook-friendly variants | |
| def q_livebook_json(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, format: "jsonl") | |
| def q_livebook_auto(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, dangerously_allow_all: true) | |
| def q_livebook_private(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, visibility: "private") | |
| def q_livebook_team(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, visibility: "team") | |
| # Private implementation functions | |
| # State management abstraction | |
| defp get_current_thread, do: Process.get(:amp_thread_id) | |
| defp set_current_thread(thread_id), do: Process.put(:amp_thread_id, thread_id) | |
| defp clear_current_thread, do: Process.delete(:amp_thread_id) | |
| # Core operations | |
| defp execute_query(thread_id, opts) do | |
| with {:ok, prompt} <- read_multiline_prompt(), | |
| {:ok, cmd} <- build_query_command(thread_id, prompt, opts), | |
| {:ok, output} <- execute_command(cmd) do | |
| handle_query_response(output) | |
| else | |
| :cancelled -> {:ok, :cancelled} | |
| error -> handle_error(error, "Query failed") | |
| end | |
| end | |
| # Livebook-friendly version that takes prompt directly | |
| defp execute_query_direct(prompt, thread_id, opts) do | |
| display_info("📤 Processing prompt...") | |
| with {:ok, cmd} <- build_query_command(thread_id, prompt, opts), | |
| {:ok, output} <- execute_command(cmd) do | |
| handle_query_response(output) | |
| else | |
| error -> handle_error(error, "Query failed") | |
| end | |
| end | |
| defp continue_with_thread(thread_id) do | |
| set_current_thread(thread_id) | |
| display_info("Continuing thread: #{thread_id}") | |
| {:ok, thread_id} | |
| end | |
| defp fork_thread(source_id, opts) do | |
| with {:ok, cmd} <- build_command(opts, ["threads", "fork", source_id]), | |
| {:ok, output} <- execute_command_silent(cmd), | |
| {:ok, new_thread_id} <- extract_thread_id_from_output(output) do | |
| set_current_thread(new_thread_id) | |
| display_success("Thread forked: #{new_thread_id}") | |
| {:ok, new_thread_id} | |
| else | |
| error -> handle_error(error, "Failed to fork thread") | |
| end | |
| end | |
| defp share_thread(thread_id, visibility) do | |
| cmd = build_share_command(thread_id, visibility) | |
| case execute_command_silent(cmd) do | |
| {:ok, output} -> | |
| display_success("Thread shared: #{thread_id} (#{visibility})") | |
| {:ok, output} | |
| error -> handle_error(error, "Failed to share thread") | |
| end | |
| end | |
| defp compact_thread(thread_id) do | |
| cmd = [@amp_executable, " threads compact ", thread_id] |> IO.iodata_to_binary() | |
| case execute_command_silent(cmd) do | |
| {:ok, output} -> | |
| display_success("Thread compacted: #{thread_id}") | |
| {:ok, output} | |
| error -> handle_error(error, "Failed to compact thread") | |
| end | |
| end | |
| # Command building | |
| defp build_command(opts, subcommands) when is_list(subcommands) do | |
| case System.find_executable(@amp_executable) do | |
| nil -> {:error, :amp_not_found} | |
| _path -> | |
| cmd = [@amp_executable | build_options(opts) ++ subcommands] | |
| |> Enum.join(" ") | |
| {:ok, cmd} | |
| end | |
| end | |
| defp build_query_command(thread_id, prompt, opts) do | |
| with {:ok, base_cmd} <- build_command(opts, []), | |
| escaped_prompt <- escape_shell_input(prompt) do | |
| cmd = case thread_id do | |
| nil -> "echo '#{escaped_prompt}' | #{base_cmd}" | |
| id -> "echo '#{escaped_prompt}' | #{base_cmd} threads continue #{id}" | |
| end | |
| {:ok, cmd} | |
| end | |
| end | |
| defp build_share_command(thread_id, visibility) do | |
| [@amp_executable, " threads share ", thread_id, " --visibility ", visibility] | |
| |> IO.iodata_to_binary() | |
| end | |
| defp build_options(opts) do | |
| opts | |
| |> Enum.flat_map(&format_option/1) | |
| |> Enum.filter(&(&1 != nil)) | |
| end | |
| # Pattern match option formatting | |
| defp format_option({:format, value}) when value in @valid_formats, do: ["--format", value] | |
| defp format_option({:visibility, value}) when value in @valid_visibilities, do: ["--visibility", value] | |
| defp format_option({:dangerously_allow_all, true}), do: ["--dangerously-allow-all"] | |
| defp format_option({:no_color, true}), do: ["--no-color"] | |
| defp format_option({:no_notifications, true}), do: ["--no-notifications"] | |
| defp format_option({:interactive, true}), do: ["--interactive"] | |
| defp format_option({:jetbrains, true}), do: ["--jetbrains"] | |
| defp format_option({:log_level, level}), do: ["--log-level", level] | |
| defp format_option(_), do: [] | |
| # Command execution | |
| defp execute_command(cmd) do | |
| case System.cmd("sh", ["-c", cmd <> " 2>/dev/null"], stderr_to_stdout: false) do | |
| {output, 0} -> | |
| clean_output = clean_ansi_output(output) | |
| display_response(clean_output) | |
| handle_thread_id_extraction(output) | |
| # Return clean output but don't show the tuple | |
| IO.puts("") | |
| {:ok, clean_output} | |
| {_error, _code} -> {:error, :command_failed} | |
| end | |
| end | |
| defp execute_command_silent(cmd) do | |
| case System.cmd("sh", ["-c", cmd <> " 2>/dev/null"], stderr_to_stdout: false) do | |
| {output, 0} -> {:ok, output} | |
| {_error, code} -> {:error, {:command_failed, code}} | |
| end | |
| end | |
| # Input handling | |
| defp read_multiline_prompt do | |
| display_info("❓ (press Enter twice to submit)") | |
| case read_multiline([]) do | |
| "" -> | |
| display_info("❌ Cancelled") | |
| :cancelled | |
| prompt -> | |
| display_info("📤 Submitted! Processing...") | |
| {:ok, prompt} | |
| end | |
| end | |
| defp read_multiline(lines) do | |
| case IO.gets("") |> String.trim_trailing("\n") do | |
| "" when lines != [] -> Enum.join(lines, "\n") | |
| line -> read_multiline(lines ++ [line]) | |
| end | |
| end | |
| # String processing | |
| defp escape_shell_input(input) when is_binary(input) do | |
| String.replace(input, "'", "'\\''") | |
| end | |
| defp clean_ansi_output(output) when is_binary(output) do | |
| output | |
| |> String.replace(~r/\e\[[0-9;]*m/, "") # Remove ANSI color codes | |
| |> String.replace(~r/\e\[[0-9]*[A-Za-z]/, "") # Remove other ANSI sequences | |
| # |> String.replace(~r/Continue this thread with:.*/, "") # Remove continue instruction | |
| # |> String.replace(~r/Thread: https:\/\/ampcode\.com\/threads\/.*/, "") # Remove thread URL | |
| |> String.replace(~r/\n\s*\n\s*\n/, "\n\n") # Clean up excessive newlines | |
| |> String.trim() | |
| end | |
| # Thread ID extraction | |
| defp extract_thread_id_from_output(output) when is_binary(output) do | |
| case Regex.run(@thread_id_regex, String.trim(output)) do | |
| [_, thread_id] -> {:ok, thread_id} | |
| nil -> {:error, :no_thread_id_found} | |
| end | |
| end | |
| defp handle_thread_id_extraction(output) do | |
| case Regex.run(@thread_url_regex, output) do | |
| [_, thread_id] -> | |
| current = get_current_thread() | |
| if current != thread_id do | |
| set_current_thread(thread_id) | |
| display_thread_info(thread_id) | |
| end | |
| nil -> :ok | |
| end | |
| end | |
| # Utility functions | |
| defp handle_current_thread(fun) do | |
| case get_current_thread() do | |
| nil -> {:error, :no_current_thread} | |
| thread_id -> fun.(thread_id) | |
| end | |
| end | |
| defp handle_query_response(output) do | |
| handle_thread_id_extraction(output) | |
| {:ok, output} | |
| end | |
| # Error handling | |
| defp handle_error({:error, reason}, context) do | |
| display_error("#{context}: #{format_error(reason)}") | |
| {:error, reason} | |
| end | |
| defp handle_error(error, context), do: handle_error({:error, error}, context) | |
| defp format_error(:amp_not_found), do: "Amp CLI not found. Install: npm install -g @sourcegraph/amp" | |
| defp format_error(:no_thread_id_found), do: "No thread ID found in response" | |
| defp format_error(:no_current_thread), do: "No thread ID provided or stored" | |
| defp format_error({:invalid_visibility, vis}), do: "Invalid visibility: #{vis}. Use: #{Enum.join(@valid_visibilities, ", ")}" | |
| defp format_error({:command_failed, code}), do: "Command failed with exit code #{code}" | |
| defp format_error(reason), do: inspect(reason) | |
| # Display functions | |
| defp display_info(message), do: IO.puts("ℹ️ #{message}") | |
| defp display_success(message), do: IO.puts("✅ #{message}") | |
| defp display_error(message), do: IO.puts("❌ #{message}") | |
| defp display_response(message), do: IO.puts("#{message}") | |
| defp display_output(message), do: IO.puts(message) | |
| defp display_thread_info(thread_id), do: IO.puts("🧵 Thread: #{thread_id}") | |
| end | |
| # Import all functions for easy access | |
| import AmpIEx | |
| IO.puts(""" | |
| 🚀 Amp CLI loaded - José Valim edition | |
| Basic usage: | |
| q() - Ask a question (clean output) | |
| q("thread_id") - Ask in specific thread | |
| Livebook-friendly (no interactive input): | |
| q_livebook(prompt) - Ask with prompt directly | |
| q_livebook(prompt, id) - Ask in specific thread | |
| q_livebook_json(prompt) - Ask with JSON output | |
| q_livebook_auto(prompt) - Ask with auto-execution | |
| q_livebook_private(prompt) - Ask in private thread | |
| q_livebook_team(prompt) - Ask in team thread | |
| Thread management: | |
| q_new() - Create new thread | |
| q_continue(id) - Continue specific thread | |
| q_fork(id) - Fork existing thread | |
| q_list() - List all threads | |
| q_share(id, vis) - Share thread (public/private/team) | |
| q_compact(id) - Compact thread | |
| q_status() - Show active thread | |
| Tools & Utilities: | |
| q_tools() - Show available tools | |
| Quick variants: | |
| q_json() - Ask with JSON output | |
| q_auto() - Ask with auto-execution | |
| q_private() - Ask in private thread | |
| q_team() - Ask in team thread | |
| Management functions return {:ok, result} | {:error, reason}. | |
| Main q() function returns :ok for clean output. | |
| """) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment