Skip to content

Instantly share code, notes, and snippets.

@pjmagee
Created November 18, 2025 00:05
Show Gist options
  • Select an option

  • Save pjmagee/e917889e77d0111067cda050b33a49b0 to your computer and use it in GitHub Desktop.

Select an option

Save pjmagee/e917889e77d0111067cda050b33a49b0 to your computer and use it in GitHub Desktop.
MCP server for querying and manipulating CSV files
"""MCP CSV Server - Tools for querying and manipulating CSV files."""
__version__ = "0.1.0"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mcp-csv"
version = "0.1.0"
description = "MCP server for querying and manipulating CSV files"
readme = "README.md"
requires-python = ">=3.10"
dependencies = ["mcp>=1.0.0", "pandas>=2.0.0"]
[project.scripts]
mcp-csv = "mcp_csv.server:main"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_csv"]
"""MCP CSV Server - Provides tools for querying CSV files."""
from pathlib import Path
from typing import Any
import pandas as pd
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
# Base directory for CSV files
CSV_BASE_DIR = Path(__file__).parent.parent.parent.parent / "wiki" / "csv"
app = Server("mcp-csv")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available CSV query tools."""
return [
Tool(
name="csv_list_files",
description="List all available CSV files in the wiki/csv directory",
inputSchema={"type": "object", "properties": {}, "required": []},
),
Tool(
name="csv_get_columns",
description="Get column names and data types from a CSV file",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the CSV file (e.g., 'country-codes.csv')",
}
},
"required": ["filename"],
},
),
Tool(
name="csv_get_rows",
description="Get rows from a CSV file with optional filtering and pagination",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the CSV file",
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return (default: 100)",
"default": 100,
},
"offset": {
"type": "integer",
"description": "Number of rows to skip (default: 0)",
"default": 0,
},
"columns": {
"type": "array",
"items": {"type": "string"},
"description": "Specific columns to return (default: all columns)",
},
},
"required": ["filename"],
},
),
Tool(
name="csv_filter_rows",
description="Filter rows in a CSV file based on column values",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the CSV file",
},
"column": {
"type": "string",
"description": "Column name to filter on",
},
"value": {
"type": "string",
"description": "Value to match (case-insensitive substring match)",
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return (default: 100)",
"default": 100,
},
},
"required": ["filename", "column", "value"],
},
),
Tool(
name="csv_search",
description="Search for a value across all columns in a CSV file",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the CSV file",
},
"query": {
"type": "string",
"description": "Search term (case-insensitive)",
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return (default: 100)",
"default": 100,
},
},
"required": ["filename", "query"],
},
),
Tool(
name="csv_get_stats",
description="Get statistics about a CSV file (row count, column count, memory usage)",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the CSV file",
}
},
"required": ["filename"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls for CSV operations."""
if name == "csv_list_files":
files = [f.name for f in CSV_BASE_DIR.glob("*.csv")]
return [
TextContent(
type="text",
text="Available CSV files:\n"
+ "\n".join(f"- {f}" for f in sorted(files)),
)
]
elif name == "csv_get_columns":
filename = arguments["filename"]
filepath = CSV_BASE_DIR / filename
if not filepath.exists():
return [
TextContent(type="text", text=f"Error: File '{filename}' not found")
]
df = pd.read_csv(filepath, nrows=0)
dtypes = pd.read_csv(filepath, nrows=1000).dtypes
columns_info = []
for col in df.columns:
columns_info.append(f"- {col} ({dtypes[col]})")
return [
TextContent(
type="text", text=f"Columns in {filename}:\n" + "\n".join(columns_info)
)
]
elif name == "csv_get_rows":
filename = arguments["filename"]
limit = arguments.get("limit", 100)
offset = arguments.get("offset", 0)
columns = arguments.get("columns")
filepath = CSV_BASE_DIR / filename
if not filepath.exists():
return [
TextContent(type="text", text=f"Error: File '{filename}' not found")
]
df = pd.read_csv(filepath)
if columns:
df = df[columns]
result_df = df.iloc[offset : offset + limit]
return [
TextContent(
type="text",
text=f"Rows {offset} to {offset + len(result_df)} of {len(df)} from {filename}:\n\n{result_df.to_string()}",
)
]
elif name == "csv_filter_rows":
filename = arguments["filename"]
column = arguments["column"]
value = arguments["value"]
limit = arguments.get("limit", 100)
filepath = CSV_BASE_DIR / filename
if not filepath.exists():
return [
TextContent(type="text", text=f"Error: File '{filename}' not found")
]
df = pd.read_csv(filepath)
if column not in df.columns:
return [
TextContent(
type="text",
text=f"Error: Column '{column}' not found in {filename}",
)
]
# Case-insensitive substring match
mask = df[column].astype(str).str.contains(value, case=False, na=False)
filtered_df = df[mask].head(limit)
return [
TextContent(
type="text",
text=f"Found {mask.sum()} rows matching '{value}' in column '{column}' (showing {len(filtered_df)}):\n\n{filtered_df.to_string()}",
)
]
elif name == "csv_search":
filename = arguments["filename"]
query = arguments["query"]
limit = arguments.get("limit", 100)
filepath = CSV_BASE_DIR / filename
if not filepath.exists():
return [
TextContent(type="text", text=f"Error: File '{filename}' not found")
]
df = pd.read_csv(filepath)
# Search across all columns
mask = (
df.astype(str)
.apply(lambda x: x.str.contains(query, case=False, na=False))
.any(axis=1)
)
result_df = df[mask].head(limit)
return [
TextContent(
type="text",
text=f"Found {mask.sum()} rows containing '{query}' (showing {len(result_df)}):\n\n{result_df.to_string()}",
)
]
elif name == "csv_get_stats":
filename = arguments["filename"]
filepath = CSV_BASE_DIR / filename
if not filepath.exists():
return [
TextContent(type="text", text=f"Error: File '{filename}' not found")
]
df = pd.read_csv(filepath)
stats = f"""Statistics for {filename}:
- Total rows: {len(df)}
- Total columns: {len(df.columns)}
- Memory usage: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB
- Column names: {", ".join(df.columns)}
"""
return [TextContent(type="text", text=stats)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def main():
"""Run the MCP CSV server."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment