|
""" |
|
Minimal read-only Postgres MCP server. Runs locally, connects |
|
directly to your database. No third-party services involved. |
|
|
|
Setup: |
|
1. pip install mcp asyncpg |
|
2. Set DATABASE_URI env var to your postgres connection string |
|
3. Add to ~/.claude/settings.json (see README.md) |
|
""" |
|
|
|
import os |
|
import json |
|
import asyncpg |
|
from mcp.server.fastmcp import FastMCP |
|
|
|
DATABASE_URI = os.environ.get("DATABASE_URI") |
|
if not DATABASE_URI: |
|
raise ValueError("DATABASE_URI environment variable is required") |
|
|
|
MAX_ROWS = 500 |
|
|
|
mcp = FastMCP( |
|
"postgres-readonly", |
|
instructions="Read-only access to a Postgres database. Use this for running SQL queries, exploring schemas, and analyzing data.", |
|
) |
|
|
|
|
|
@mcp.tool() |
|
async def query(sql: str) -> str: |
|
"""Run a read-only SQL query. Returns results as JSON. |
|
|
|
The query is executed inside a READ ONLY transaction so writes |
|
are rejected at the database level regardless of DB user permissions. |
|
|
|
Results are limited to 500 rows. Add LIMIT to your query if you |
|
need fewer. |
|
""" |
|
conn = await asyncpg.connect(DATABASE_URI) |
|
try: |
|
async with conn.transaction(readonly=True): |
|
rows = await conn.fetch(sql) |
|
if len(rows) > MAX_ROWS: |
|
rows = rows[:MAX_ROWS] |
|
truncated = True |
|
else: |
|
truncated = False |
|
|
|
result = [dict(r) for r in rows] |
|
|
|
# asyncpg returns some types that aren't JSON serializable |
|
def serialize(obj): |
|
if isinstance(obj, (bytes, bytearray)): |
|
return obj.hex() |
|
if hasattr(obj, "isoformat"): |
|
return obj.isoformat() |
|
return str(obj) |
|
|
|
output = json.dumps(result, default=serialize, indent=2) |
|
if truncated: |
|
output += f"\n\n(Truncated to {MAX_ROWS} rows. Use LIMIT to control result size.)" |
|
return output |
|
finally: |
|
await conn.close() |
|
|
|
|
|
@mcp.tool() |
|
async def list_tables(schema: str = "public") -> str: |
|
"""List all tables in a schema with row counts.""" |
|
conn = await asyncpg.connect(DATABASE_URI) |
|
try: |
|
async with conn.transaction(readonly=True): |
|
rows = await conn.fetch( |
|
""" |
|
SELECT t.tablename AS table_name, |
|
COALESCE(s.n_live_tup, 0) AS approx_row_count |
|
FROM pg_tables t |
|
LEFT JOIN pg_stat_user_tables s |
|
ON s.schemaname = t.schemaname AND s.relname = t.tablename |
|
WHERE t.schemaname = $1 |
|
ORDER BY t.tablename |
|
""", |
|
schema, |
|
) |
|
return json.dumps([dict(r) for r in rows], indent=2) |
|
finally: |
|
await conn.close() |
|
|
|
|
|
@mcp.tool() |
|
async def describe_table(table_name: str, schema: str = "public") -> str: |
|
"""Show full schema for a table: columns, types, primary keys, foreign keys, indexes, and constraints. |
|
|
|
This gives enough context to write correct queries without needing |
|
to read application code or ORM definitions. |
|
""" |
|
conn = await asyncpg.connect(DATABASE_URI) |
|
try: |
|
async with conn.transaction(readonly=True): |
|
# Columns |
|
columns = await conn.fetch( |
|
""" |
|
SELECT column_name, data_type, is_nullable, column_default, |
|
character_maximum_length, numeric_precision |
|
FROM information_schema.columns |
|
WHERE table_schema = $1 AND table_name = $2 |
|
ORDER BY ordinal_position |
|
""", |
|
schema, |
|
table_name, |
|
) |
|
|
|
# Primary key |
|
pk = await conn.fetch( |
|
""" |
|
SELECT kcu.column_name |
|
FROM information_schema.table_constraints tc |
|
JOIN information_schema.key_column_usage kcu |
|
ON tc.constraint_name = kcu.constraint_name |
|
AND tc.table_schema = kcu.table_schema |
|
WHERE tc.table_schema = $1 AND tc.table_name = $2 |
|
AND tc.constraint_type = 'PRIMARY KEY' |
|
ORDER BY kcu.ordinal_position |
|
""", |
|
schema, |
|
table_name, |
|
) |
|
|
|
# Foreign keys |
|
fks = await conn.fetch( |
|
""" |
|
SELECT kcu.column_name, |
|
ccu.table_name AS referenced_table, |
|
ccu.column_name AS referenced_column |
|
FROM information_schema.table_constraints tc |
|
JOIN information_schema.key_column_usage kcu |
|
ON tc.constraint_name = kcu.constraint_name |
|
AND tc.table_schema = kcu.table_schema |
|
JOIN information_schema.constraint_column_usage ccu |
|
ON tc.constraint_name = ccu.constraint_name |
|
AND tc.table_schema = ccu.table_schema |
|
WHERE tc.table_schema = $1 AND tc.table_name = $2 |
|
AND tc.constraint_type = 'FOREIGN KEY' |
|
""", |
|
schema, |
|
table_name, |
|
) |
|
|
|
# Indexes |
|
indexes = await conn.fetch( |
|
""" |
|
SELECT indexname, indexdef |
|
FROM pg_indexes |
|
WHERE schemaname = $1 AND tablename = $2 |
|
ORDER BY indexname |
|
""", |
|
schema, |
|
table_name, |
|
) |
|
|
|
# Unique constraints |
|
uniques = await conn.fetch( |
|
""" |
|
SELECT tc.constraint_name, |
|
array_agg(kcu.column_name ORDER BY kcu.ordinal_position) AS columns |
|
FROM information_schema.table_constraints tc |
|
JOIN information_schema.key_column_usage kcu |
|
ON tc.constraint_name = kcu.constraint_name |
|
AND tc.table_schema = kcu.table_schema |
|
WHERE tc.table_schema = $1 AND tc.table_name = $2 |
|
AND tc.constraint_type = 'UNIQUE' |
|
GROUP BY tc.constraint_name |
|
""", |
|
schema, |
|
table_name, |
|
) |
|
|
|
def serialize(obj): |
|
if isinstance(obj, (bytes, bytearray)): |
|
return obj.hex() |
|
if hasattr(obj, "isoformat"): |
|
return obj.isoformat() |
|
return str(obj) |
|
|
|
result = { |
|
"table": f"{schema}.{table_name}", |
|
"columns": [dict(r) for r in columns], |
|
"primary_key": [r["column_name"] for r in pk], |
|
"foreign_keys": [dict(r) for r in fks], |
|
"indexes": [dict(r) for r in indexes], |
|
"unique_constraints": [ |
|
{"name": r["constraint_name"], "columns": list(r["columns"])} |
|
for r in uniques |
|
], |
|
} |
|
return json.dumps(result, default=serialize, indent=2) |
|
finally: |
|
await conn.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
mcp.run() |