Skip to content

Instantly share code, notes, and snippets.

@llk23r
Created July 10, 2025 05:15
Show Gist options
  • Save llk23r/5ee15677ac7e177f33dfbee8e7a96045 to your computer and use it in GitHub Desktop.
Save llk23r/5ee15677ac7e177f33dfbee8e7a96045 to your computer and use it in GitHub Desktop.
Ampcode integration in Elixir Livebook
# ~/.iex.exs
# Idiomatic Amp CLI integration for IEx
defmodule AmpIEx do
@moduledoc """
Elixir integration for Amp CLI with proper error handling and functional design.
"""
# Module attributes for constants
@amp_executable "amp"
@thread_id_regex ~r/(T-[a-f0-9-]+)/
@thread_url_regex ~r/Thread: https:\/\/ampcode\.com\/threads\/(T-[a-f0-9-]+)/
@default_visibility "public"
@valid_visibilities ~w(public private team)
@valid_formats ~w(ui jsonl)
@type thread_id :: String.t()
@type visibility :: String.t()
@type amp_opts :: keyword()
@type command_result :: {:ok, String.t()} | {:error, atom()}
# Public API
def q(thread_id \\ nil, opts \\ [])
def q(nil, opts) do
case execute_query(get_current_thread(), opts) do
{:ok, _} -> :ok
error -> error
end
end
def q(thread_id, opts) when is_binary(thread_id) do
set_current_thread(thread_id)
case execute_query(thread_id, opts) do
{:ok, _} -> :ok
error -> error
end
end
@doc "Create new thread and set as current"
@spec q_new(amp_opts()) :: {:ok, thread_id()} | {:error, atom()}
def q_new(opts \\ []) do
clear_current_thread()
with {:ok, cmd} <- build_command(opts, ["threads", "new"]),
{:ok, output} <- execute_command_silent(cmd),
{:ok, thread_id} <- extract_thread_id_from_output(output) do
set_current_thread(thread_id)
display_success("New thread created: #{thread_id}")
{:ok, thread_id}
else
error -> handle_error(error, "Failed to create new thread")
end
end
@doc "Continue with specific thread"
@spec q_continue(thread_id() | nil) :: {:ok, thread_id()} | {:error, atom()}
def q_continue(nil), do: handle_current_thread(&continue_with_thread/1)
def q_continue(thread_id) when is_binary(thread_id), do: continue_with_thread(thread_id)
@doc "Fork existing thread"
@spec q_fork(thread_id() | nil, amp_opts()) :: {:ok, thread_id()} | {:error, atom()}
def q_fork(source_id \\ nil, opts \\ [])
def q_fork(nil, opts), do: handle_current_thread(&fork_thread(&1, opts))
def q_fork(source_id, opts) when is_binary(source_id), do: fork_thread(source_id, opts)
@doc "List all threads"
@spec q_list(amp_opts()) :: {:ok, String.t()} | {:error, atom()}
def q_list(opts \\ []) do
with {:ok, cmd} <- build_command(opts, ["threads", "list"]),
{:ok, output} <- execute_command_silent(cmd) do
display_output(output)
{:ok, output}
else
error -> handle_error(error, "Failed to list threads")
end
end
@doc "Share thread with specified visibility"
@spec q_share(thread_id() | nil, visibility()) :: {:ok, String.t()} | {:error, atom()}
def q_share(thread_id \\ nil, visibility \\ @default_visibility)
def q_share(nil, visibility), do: handle_current_thread(&share_thread(&1, visibility))
def q_share(thread_id, visibility) when is_binary(thread_id) and visibility in @valid_visibilities do
share_thread(thread_id, visibility)
end
def q_share(_, visibility), do: {:error, {:invalid_visibility, visibility}}
@doc "Compact thread to reduce token usage"
@spec q_compact(thread_id() | nil) :: {:ok, String.t()} | {:error, atom()}
def q_compact(nil), do: handle_current_thread(&compact_thread/1)
def q_compact(thread_id) when is_binary(thread_id), do: compact_thread(thread_id)
@doc "Show available tools"
@spec q_tools(amp_opts()) :: {:ok, String.t()} | {:error, atom()}
def q_tools(opts \\ []) do
with {:ok, cmd} <- build_command(opts, ["tools", "show"]),
{:ok, output} <- execute_command_silent(cmd) do
display_output(output)
{:ok, output}
else
error -> handle_error(error, "Failed to show tools")
end
end
@doc "Show current thread status"
@spec q_status() :: {:ok, thread_id() | nil}
def q_status do
case get_current_thread() do
nil ->
display_info("No active thread")
{:ok, nil}
thread_id ->
display_info("Active thread: #{thread_id}")
{:ok, thread_id}
end
end
@doc "Livebook-friendly query that takes prompt directly (no interactive input)"
@spec q_livebook(String.t(), thread_id() | nil, amp_opts()) :: {:ok, String.t()} | {:error, atom()}
def q_livebook(prompt, thread_id \\ nil, opts \\ []) do
case execute_query_direct(prompt, thread_id, opts) do
{:ok, _} -> :ok
error -> error
end
end
# Quick access variants
def q_json(thread_id \\ nil), do: q(thread_id, format: "jsonl")
def q_auto(thread_id \\ nil), do: q(thread_id, dangerously_allow_all: true)
def q_private(thread_id \\ nil), do: q(thread_id, visibility: "private")
def q_team(thread_id \\ nil), do: q(thread_id, visibility: "team")
# Livebook-friendly variants
def q_livebook_json(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, format: "jsonl")
def q_livebook_auto(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, dangerously_allow_all: true)
def q_livebook_private(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, visibility: "private")
def q_livebook_team(prompt, thread_id \\ nil), do: q_livebook(prompt, thread_id, visibility: "team")
# Private implementation functions
# State management abstraction
defp get_current_thread, do: Process.get(:amp_thread_id)
defp set_current_thread(thread_id), do: Process.put(:amp_thread_id, thread_id)
defp clear_current_thread, do: Process.delete(:amp_thread_id)
# Core operations
defp execute_query(thread_id, opts) do
with {:ok, prompt} <- read_multiline_prompt(),
{:ok, cmd} <- build_query_command(thread_id, prompt, opts),
{:ok, output} <- execute_command(cmd) do
handle_query_response(output)
else
:cancelled -> {:ok, :cancelled}
error -> handle_error(error, "Query failed")
end
end
# Livebook-friendly version that takes prompt directly
defp execute_query_direct(prompt, thread_id, opts) do
display_info("📤 Processing prompt...")
with {:ok, cmd} <- build_query_command(thread_id, prompt, opts),
{:ok, output} <- execute_command(cmd) do
handle_query_response(output)
else
error -> handle_error(error, "Query failed")
end
end
defp continue_with_thread(thread_id) do
set_current_thread(thread_id)
display_info("Continuing thread: #{thread_id}")
{:ok, thread_id}
end
defp fork_thread(source_id, opts) do
with {:ok, cmd} <- build_command(opts, ["threads", "fork", source_id]),
{:ok, output} <- execute_command_silent(cmd),
{:ok, new_thread_id} <- extract_thread_id_from_output(output) do
set_current_thread(new_thread_id)
display_success("Thread forked: #{new_thread_id}")
{:ok, new_thread_id}
else
error -> handle_error(error, "Failed to fork thread")
end
end
defp share_thread(thread_id, visibility) do
cmd = build_share_command(thread_id, visibility)
case execute_command_silent(cmd) do
{:ok, output} ->
display_success("Thread shared: #{thread_id} (#{visibility})")
{:ok, output}
error -> handle_error(error, "Failed to share thread")
end
end
defp compact_thread(thread_id) do
cmd = [@amp_executable, " threads compact ", thread_id] |> IO.iodata_to_binary()
case execute_command_silent(cmd) do
{:ok, output} ->
display_success("Thread compacted: #{thread_id}")
{:ok, output}
error -> handle_error(error, "Failed to compact thread")
end
end
# Command building
defp build_command(opts, subcommands) when is_list(subcommands) do
case System.find_executable(@amp_executable) do
nil -> {:error, :amp_not_found}
_path ->
cmd = [@amp_executable | build_options(opts) ++ subcommands]
|> Enum.join(" ")
{:ok, cmd}
end
end
defp build_query_command(thread_id, prompt, opts) do
with {:ok, base_cmd} <- build_command(opts, []),
escaped_prompt <- escape_shell_input(prompt) do
cmd = case thread_id do
nil -> "echo '#{escaped_prompt}' | #{base_cmd}"
id -> "echo '#{escaped_prompt}' | #{base_cmd} threads continue #{id}"
end
{:ok, cmd}
end
end
defp build_share_command(thread_id, visibility) do
[@amp_executable, " threads share ", thread_id, " --visibility ", visibility]
|> IO.iodata_to_binary()
end
defp build_options(opts) do
opts
|> Enum.flat_map(&format_option/1)
|> Enum.filter(&(&1 != nil))
end
# Pattern match option formatting
defp format_option({:format, value}) when value in @valid_formats, do: ["--format", value]
defp format_option({:visibility, value}) when value in @valid_visibilities, do: ["--visibility", value]
defp format_option({:dangerously_allow_all, true}), do: ["--dangerously-allow-all"]
defp format_option({:no_color, true}), do: ["--no-color"]
defp format_option({:no_notifications, true}), do: ["--no-notifications"]
defp format_option({:interactive, true}), do: ["--interactive"]
defp format_option({:jetbrains, true}), do: ["--jetbrains"]
defp format_option({:log_level, level}), do: ["--log-level", level]
defp format_option(_), do: []
# Command execution
defp execute_command(cmd) do
case System.cmd("sh", ["-c", cmd <> " 2>/dev/null"], stderr_to_stdout: false) do
{output, 0} ->
clean_output = clean_ansi_output(output)
display_response(clean_output)
handle_thread_id_extraction(output)
# Return clean output but don't show the tuple
IO.puts("")
{:ok, clean_output}
{_error, _code} -> {:error, :command_failed}
end
end
defp execute_command_silent(cmd) do
case System.cmd("sh", ["-c", cmd <> " 2>/dev/null"], stderr_to_stdout: false) do
{output, 0} -> {:ok, output}
{_error, code} -> {:error, {:command_failed, code}}
end
end
# Input handling
defp read_multiline_prompt do
display_info("❓ (press Enter twice to submit)")
case read_multiline([]) do
"" ->
display_info("❌ Cancelled")
:cancelled
prompt ->
display_info("📤 Submitted! Processing...")
{:ok, prompt}
end
end
defp read_multiline(lines) do
case IO.gets("") |> String.trim_trailing("\n") do
"" when lines != [] -> Enum.join(lines, "\n")
line -> read_multiline(lines ++ [line])
end
end
# String processing
defp escape_shell_input(input) when is_binary(input) do
String.replace(input, "'", "'\\''")
end
defp clean_ansi_output(output) when is_binary(output) do
output
|> String.replace(~r/\e\[[0-9;]*m/, "") # Remove ANSI color codes
|> String.replace(~r/\e\[[0-9]*[A-Za-z]/, "") # Remove other ANSI sequences
# |> String.replace(~r/Continue this thread with:.*/, "") # Remove continue instruction
# |> String.replace(~r/Thread: https:\/\/ampcode\.com\/threads\/.*/, "") # Remove thread URL
|> String.replace(~r/\n\s*\n\s*\n/, "\n\n") # Clean up excessive newlines
|> String.trim()
end
# Thread ID extraction
defp extract_thread_id_from_output(output) when is_binary(output) do
case Regex.run(@thread_id_regex, String.trim(output)) do
[_, thread_id] -> {:ok, thread_id}
nil -> {:error, :no_thread_id_found}
end
end
defp handle_thread_id_extraction(output) do
case Regex.run(@thread_url_regex, output) do
[_, thread_id] ->
current = get_current_thread()
if current != thread_id do
set_current_thread(thread_id)
display_thread_info(thread_id)
end
nil -> :ok
end
end
# Utility functions
defp handle_current_thread(fun) do
case get_current_thread() do
nil -> {:error, :no_current_thread}
thread_id -> fun.(thread_id)
end
end
defp handle_query_response(output) do
handle_thread_id_extraction(output)
{:ok, output}
end
# Error handling
defp handle_error({:error, reason}, context) do
display_error("#{context}: #{format_error(reason)}")
{:error, reason}
end
defp handle_error(error, context), do: handle_error({:error, error}, context)
defp format_error(:amp_not_found), do: "Amp CLI not found. Install: npm install -g @sourcegraph/amp"
defp format_error(:no_thread_id_found), do: "No thread ID found in response"
defp format_error(:no_current_thread), do: "No thread ID provided or stored"
defp format_error({:invalid_visibility, vis}), do: "Invalid visibility: #{vis}. Use: #{Enum.join(@valid_visibilities, ", ")}"
defp format_error({:command_failed, code}), do: "Command failed with exit code #{code}"
defp format_error(reason), do: inspect(reason)
# Display functions
defp display_info(message), do: IO.puts("ℹ️ #{message}")
defp display_success(message), do: IO.puts("✅ #{message}")
defp display_error(message), do: IO.puts("❌ #{message}")
defp display_response(message), do: IO.puts("#{message}")
defp display_output(message), do: IO.puts(message)
defp display_thread_info(thread_id), do: IO.puts("🧵 Thread: #{thread_id}")
end
# Import all functions for easy access
import AmpIEx
IO.puts("""
🚀 Amp CLI loaded - José Valim edition
Basic usage:
q() - Ask a question (clean output)
q("thread_id") - Ask in specific thread
Livebook-friendly (no interactive input):
q_livebook(prompt) - Ask with prompt directly
q_livebook(prompt, id) - Ask in specific thread
q_livebook_json(prompt) - Ask with JSON output
q_livebook_auto(prompt) - Ask with auto-execution
q_livebook_private(prompt) - Ask in private thread
q_livebook_team(prompt) - Ask in team thread
Thread management:
q_new() - Create new thread
q_continue(id) - Continue specific thread
q_fork(id) - Fork existing thread
q_list() - List all threads
q_share(id, vis) - Share thread (public/private/team)
q_compact(id) - Compact thread
q_status() - Show active thread
Tools & Utilities:
q_tools() - Show available tools
Quick variants:
q_json() - Ask with JSON output
q_auto() - Ask with auto-execution
q_private() - Ask in private thread
q_team() - Ask in team thread
Management functions return {:ok, result} | {:error, reason}.
Main q() function returns :ok for clean output.
""")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment