Skip to content

Instantly share code, notes, and snippets.

@MillerMedia
Last active March 17, 2026 00:06
Show Gist options
  • Select an option

  • Save MillerMedia/e46377d1438eef857fc7a6aaf35d79b7 to your computer and use it in GitHub Desktop.

Select an option

Save MillerMedia/e46377d1438eef857fc7a6aaf35d79b7 to your computer and use it in GitHub Desktop.
Postgres Read-Only MCP Server - minimal local MCP for querying any Postgres database from Claude Code

Postgres Read-Only MCP Server

Minimal MCP server for running read-only SQL queries against any Postgres database. Runs locally, connects directly to the database. No third-party services involved.

Setup

1. Install dependencies

pip install mcp asyncpg

2. Add to Claude Code config

Add to ~/.claude/settings.json under mcpServers:

{
  "mcpServers": {
    "postgres-readonly": {
      "command": "python",
      "args": ["/path/to/server.py"],
      "env": {
        "DATABASE_URI": "postgresql://user:password@host:port/dbname"
      }
    }
  }
}

Replace /path/to/server.py with wherever you saved server.py, and fill in your DB credentials.

3. Use it

In Claude Code, you can now ask things like:

  • "Show me the top 20 rows from the users table"
  • "What tables are available in the public schema?"
  • "Describe the orders table"

Tools

Tool Description
query Run any read-only SQL. Returns JSON. Max 500 rows.
list_tables List tables in a schema with approximate row counts.
describe_table Full schema: columns, types, primary keys, foreign keys, indexes, unique constraints.

Safety

  • All queries run inside a READ ONLY transaction at the database level
  • Results capped at 500 rows (add LIMIT to your SQL for fewer)
  • No writes possible even if the DB user has write permissions
  • Credentials stay local in your env config, never sent to any third party
"""
Minimal read-only Postgres MCP server. Runs locally, connects
directly to your database. No third-party services involved.
Setup:
1. pip install mcp asyncpg
2. Set DATABASE_URI env var to your postgres connection string
3. Add to ~/.claude/settings.json (see README.md)
"""
import os
import json
import asyncpg
from mcp.server.fastmcp import FastMCP
DATABASE_URI = os.environ.get("DATABASE_URI")
if not DATABASE_URI:
raise ValueError("DATABASE_URI environment variable is required")
MAX_ROWS = 500
mcp = FastMCP(
"postgres-readonly",
instructions="Read-only access to a Postgres database. Use this for running SQL queries, exploring schemas, and analyzing data.",
)
@mcp.tool()
async def query(sql: str) -> str:
"""Run a read-only SQL query. Returns results as JSON.
The query is executed inside a READ ONLY transaction so writes
are rejected at the database level regardless of DB user permissions.
Results are limited to 500 rows. Add LIMIT to your query if you
need fewer.
"""
conn = await asyncpg.connect(DATABASE_URI)
try:
async with conn.transaction(readonly=True):
rows = await conn.fetch(sql)
if len(rows) > MAX_ROWS:
rows = rows[:MAX_ROWS]
truncated = True
else:
truncated = False
result = [dict(r) for r in rows]
# asyncpg returns some types that aren't JSON serializable
def serialize(obj):
if isinstance(obj, (bytes, bytearray)):
return obj.hex()
if hasattr(obj, "isoformat"):
return obj.isoformat()
return str(obj)
output = json.dumps(result, default=serialize, indent=2)
if truncated:
output += f"\n\n(Truncated to {MAX_ROWS} rows. Use LIMIT to control result size.)"
return output
finally:
await conn.close()
@mcp.tool()
async def list_tables(schema: str = "public") -> str:
"""List all tables in a schema with row counts."""
conn = await asyncpg.connect(DATABASE_URI)
try:
async with conn.transaction(readonly=True):
rows = await conn.fetch(
"""
SELECT t.tablename AS table_name,
COALESCE(s.n_live_tup, 0) AS approx_row_count
FROM pg_tables t
LEFT JOIN pg_stat_user_tables s
ON s.schemaname = t.schemaname AND s.relname = t.tablename
WHERE t.schemaname = $1
ORDER BY t.tablename
""",
schema,
)
return json.dumps([dict(r) for r in rows], indent=2)
finally:
await conn.close()
@mcp.tool()
async def describe_table(table_name: str, schema: str = "public") -> str:
"""Show full schema for a table: columns, types, primary keys, foreign keys, indexes, and constraints.
This gives enough context to write correct queries without needing
to read application code or ORM definitions.
"""
conn = await asyncpg.connect(DATABASE_URI)
try:
async with conn.transaction(readonly=True):
# Columns
columns = await conn.fetch(
"""
SELECT column_name, data_type, is_nullable, column_default,
character_maximum_length, numeric_precision
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
""",
schema,
table_name,
)
# Primary key
pk = await conn.fetch(
"""
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_schema = $1 AND tc.table_name = $2
AND tc.constraint_type = 'PRIMARY KEY'
ORDER BY kcu.ordinal_position
""",
schema,
table_name,
)
# Foreign keys
fks = await conn.fetch(
"""
SELECT kcu.column_name,
ccu.table_name AS referenced_table,
ccu.column_name AS referenced_column
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE tc.table_schema = $1 AND tc.table_name = $2
AND tc.constraint_type = 'FOREIGN KEY'
""",
schema,
table_name,
)
# Indexes
indexes = await conn.fetch(
"""
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = $1 AND tablename = $2
ORDER BY indexname
""",
schema,
table_name,
)
# Unique constraints
uniques = await conn.fetch(
"""
SELECT tc.constraint_name,
array_agg(kcu.column_name ORDER BY kcu.ordinal_position) AS columns
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_schema = $1 AND tc.table_name = $2
AND tc.constraint_type = 'UNIQUE'
GROUP BY tc.constraint_name
""",
schema,
table_name,
)
def serialize(obj):
if isinstance(obj, (bytes, bytearray)):
return obj.hex()
if hasattr(obj, "isoformat"):
return obj.isoformat()
return str(obj)
result = {
"table": f"{schema}.{table_name}",
"columns": [dict(r) for r in columns],
"primary_key": [r["column_name"] for r in pk],
"foreign_keys": [dict(r) for r in fks],
"indexes": [dict(r) for r in indexes],
"unique_constraints": [
{"name": r["constraint_name"], "columns": list(r["columns"])}
for r in uniques
],
}
return json.dumps(result, default=serialize, indent=2)
finally:
await conn.close()
if __name__ == "__main__":
mcp.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment