Created
June 25, 2025 12:32
-
-
Save kyrtapz/9bd2c5126f5b43e82a77296f2610dad9 to your computer and use it in GitHub Desktop.
Trivial MCP server interacting with OVN in OVN-Kubernetes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"log" | |
"net/http" | |
"os" | |
"path/filepath" | |
"strings" | |
"time" | |
mcp "github.com/metoro-io/mcp-golang" | |
"github.com/metoro-io/mcp-golang/transport/stdio" | |
v1 "k8s.io/api/core/v1" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/kubernetes/scheme" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/tools/remotecommand" | |
) | |
// --- OVN Tool Definitions --- | |
const ( | |
// ovnNodePodLabel is the label used to identify the pods running OVN tools. | |
ovnNodePodLabel = "app=ovnkube-node" | |
// ovnNodeContainerName is the name of the container within the pod. | |
ovnNodeContainerName = "nb-ovsdb" | |
) | |
// OvnToolArguments defines the input parameters for our inspection tools. | |
// It requires a node name to be specified for the query. | |
type OvnToolArguments struct { | |
NodeName string `json:"nodeName" jsonschema:"required,description=The name of the Kubernetes node to inspect."` | |
} | |
// Define MCP tool details | |
var ovnTools = map[string]struct { | |
db string // "nb" for Northbound, "sb" for Southbound | |
table string | |
description string | |
}{ | |
"ovn/inspect_logical_switch": { | |
db: "nb", | |
table: "Logical_Switch", | |
description: "Fetches all OVN Logical Switches from a specific node.", | |
}, | |
"ovn/inspect_logical_switch_port": { | |
db: "nb", | |
table: "Logical_Switch_Port", | |
description: "Fetches all OVN Logical Switch Ports from a specific node.", | |
}, | |
"ovn/inspect_acl": { | |
db: "nb", | |
table: "ACL", | |
description: "Fetches all OVN ACLs from a specific node.", | |
}, | |
"ovn/inspect_nat": { | |
db: "nb", | |
table: "NAT", | |
description: "Fetches all OVN NAT rules from a specific node.", | |
}, | |
"ovn/inspect_logical_router": { | |
db: "nb", | |
table: "Logical_Router", | |
description: "Fetches all OVN Logical Routers from a specific node.", | |
}, | |
"ovn/inspect_logical_router_policy": { | |
db: "nb", | |
table: "Logical_Router_Policy", | |
description: "Fetches all OVN Logical Router Policies from a specific node.", | |
}, | |
"ovn/inspect_chassis": { | |
db: "sb", | |
table: "Chassis", | |
description: "Fetches all OVN Chassis from the Southbound DB on a specific node.", | |
}, | |
} | |
// --- Kubernetes Interaction --- | |
// KubeClient is a wrapper for Kubernetes client-go interactions. | |
type KubeClient struct { | |
clientset *kubernetes.Clientset | |
config *rest.Config | |
} | |
// NewKubeClient creates a new Kubernetes client. | |
func NewKubeClient() (*KubeClient, error) { | |
config, err := rest.InClusterConfig() | |
if err != nil { | |
log.Println("Could not create in-cluster config, falling back to kubeconfig...") | |
home, exists := os.LookupEnv("HOME") | |
if !exists { | |
home = "~" | |
} | |
kubeconfig := filepath.Join(home, ".kube", "config") | |
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) | |
if err != nil { | |
return nil, fmt.Errorf("could not create kubeconfig: %w", err) | |
} | |
} | |
clientset, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
return nil, fmt.Errorf("could not create clientset: %w", err) | |
} | |
return &KubeClient{clientset: clientset, config: config}, nil | |
} | |
// FindOvnNodePodOnNode finds the ovnkube-node pod on a specific Kubernetes node. | |
func (c *KubeClient) FindOvnNodePodOnNode(ctx context.Context, nodeName string) (*v1.Pod, error) { | |
pods, err := c.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ | |
LabelSelector: ovnNodePodLabel, | |
FieldSelector: "spec.nodeName=" + nodeName, | |
}) | |
if err != nil { | |
return nil, fmt.Errorf("failed to list pods on node %s with label %s: %w", nodeName, ovnNodePodLabel, err) | |
} | |
if len(pods.Items) == 0 { | |
return nil, fmt.Errorf("no pod with label '%s' found on node '%s'", ovnNodePodLabel, nodeName) | |
} | |
if len(pods.Items) > 1 { | |
log.Printf("WARN: Found %d pods with label %s on node %s, using the first one: %s", len(pods.Items), ovnNodePodLabel, nodeName, pods.Items[0].Name) | |
} | |
return &pods.Items[0], nil | |
} | |
// ExecInPod executes a command in a specified pod and container. | |
func (c *KubeClient) ExecInPod(ctx context.Context, namespace, podName, containerName string, command []string) (string, string, error) { | |
req := c.clientset.CoreV1().RESTClient().Post(). | |
Resource("pods"). | |
Name(podName). | |
Namespace(namespace). | |
SubResource("exec"). | |
VersionedParams(&v1.PodExecOptions{ | |
Command: command, | |
Container: containerName, | |
Stdin: false, | |
Stdout: true, | |
Stderr: true, | |
TTY: false, | |
}, scheme.ParameterCodec) | |
executor, err := remotecommand.NewSPDYExecutor(c.config, http.MethodPost, req.URL()) | |
if err != nil { | |
return "", "", fmt.Errorf("failed to create SPDY executor: %w", err) | |
} | |
var stdout, stderr bytes.Buffer | |
err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{ | |
Stdout: &stdout, | |
Stderr: &stderr, | |
Tty: false, | |
}) | |
if err != nil { | |
return stdout.String(), stderr.String(), fmt.Errorf("failed to stream exec: %w", err) | |
} | |
return stdout.String(), stderr.String(), nil | |
} | |
// --- OVN Data Inspection --- | |
// OvnInspector is responsible for fetching OVN data from pods. | |
type OvnInspector struct { | |
kubeClient *KubeClient | |
} | |
// NewOvnInspector creates a new OVN inspector. | |
func NewOvnInspector(client *KubeClient) *OvnInspector { | |
return &OvnInspector{kubeClient: client} | |
} | |
// OvnRecord represents a single row from an OVN database table. | |
type OvnRecord map[string]interface{} | |
// OvnJSONOutput is the structure of the JSON output from ovn-ctl commands. | |
type OvnJSONOutput struct { | |
Data [][]interface{} `json:"data"` | |
Headings []string `json:"headings"` | |
} | |
// FetchOvnDataForNode finds the ovnkube-node pod on a specific node and runs the command. | |
func (i *OvnInspector) FetchOvnDataForNode(ctx context.Context, nodeName, db, table string) ([]OvnRecord, error) { | |
pod, err := i.kubeClient.FindOvnNodePodOnNode(ctx, nodeName) | |
if err != nil { | |
return nil, fmt.Errorf("failed to find ovnkube-node pod: %w", err) | |
} | |
command := []string{ | |
fmt.Sprintf("ovn-%sctl", db), | |
"--format=json", | |
"list", | |
table, | |
} | |
log.Printf("Executing command on pod %s (node %s): %s", pod.Name, nodeName, strings.Join(command, " ")) | |
stdout, stderr, err := i.kubeClient.ExecInPod(ctx, pod.Namespace, pod.Name, ovnNodeContainerName, command) | |
if err != nil { | |
log.Printf("ERROR: exec failed for pod %s on node %s for table %s. Stderr: %s. Error: %v", pod.Name, pod.Spec.NodeName, table, stderr, err) | |
return nil, fmt.Errorf("exec command failed on node %s: %w. Stderr: %s", nodeName, err, stderr) | |
} | |
records, err := parseOvnJSON(stdout) | |
if err != nil { | |
return nil, fmt.Errorf("failed to parse OVN JSON from node %s: %w", nodeName, err) | |
} | |
// Add node metadata to each record | |
for i := range records { | |
records[i]["_node_name"] = pod.Spec.NodeName | |
records[i]["_pod_name"] = pod.Name | |
} | |
return records, nil | |
} | |
// parseOvnJSON converts the JSON output from ovn-ctl into a slice of structured records. | |
func parseOvnJSON(output string) ([]OvnRecord, error) { | |
if strings.TrimSpace(output) == "" { | |
return []OvnRecord{}, nil | |
} | |
var parsed OvnJSONOutput | |
if err := json.Unmarshal([]byte(output), &parsed); err != nil { | |
return nil, fmt.Errorf("failed to unmarshal OVN JSON output: %w", err) | |
} | |
records := make([]OvnRecord, len(parsed.Data)) | |
for i, rowData := range parsed.Data { | |
if len(rowData) != len(parsed.Headings) { | |
return nil, errors.New("mismatch between heading and data column count") | |
} | |
record := make(OvnRecord) | |
for j, heading := range parsed.Headings { | |
key := strings.ReplaceAll(heading, "_", "-") | |
record[key] = rowData[j] | |
} | |
records[i] = record | |
} | |
return records, nil | |
} | |
// --- Main Application --- | |
func main() { | |
log.Println("Starting OVN MCP Inspector Server...") | |
done := make(chan struct{}) | |
// 1. Initialize Kubernetes Client and OVN Inspector | |
kubeClient, err := NewKubeClient() | |
if err != nil { | |
log.Fatalf("Failed to create Kubernetes client: %v", err) | |
} | |
log.Println("Successfully connected to Kubernetes cluster.") | |
inspector := NewOvnInspector(kubeClient) | |
// 2. Setup MCP Server using stdio transport | |
server := mcp.NewServer(stdio.NewStdioServerTransport()) | |
// 3. Register a tool for each OVN data type | |
for name, details := range ovnTools { | |
// Capture loop variables for the closure | |
toolName := name | |
toolDetails := details | |
err := server.RegisterTool(toolName, toolDetails.description, func(arguments OvnToolArguments) (*mcp.ToolResponse, error) { | |
log.Printf("Received request for tool: %s with NodeName: %s", toolName, arguments.NodeName) | |
if arguments.NodeName == "" { | |
return nil, errors.New("'nodeName' argument is required") | |
} | |
// Use a context with timeout for the fetch operation | |
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) | |
defer cancel() | |
// Fetch data from the specified node for this specific table | |
records, err := inspector.FetchOvnDataForNode(ctx, arguments.NodeName, toolDetails.db, toolDetails.table) | |
if err != nil { | |
log.Printf("ERROR fetching data for %s on node %s: %v", toolName, arguments.NodeName, err) | |
// Return a descriptive error to the MCP client | |
return nil, fmt.Errorf("failed to fetch OVN data for %s on node %s: %w", toolName, arguments.NodeName, err) | |
} | |
// Marshal the records into a single JSON response | |
jsonData, err := json.MarshalIndent(records, "", " ") | |
if err != nil { | |
log.Printf("ERROR marshalling JSON for %s: %v", toolName, err) | |
return nil, fmt.Errorf("failed to marshal response for %s: %w", toolName, err) | |
} | |
log.Printf("Successfully fetched %d records for tool %s on node %s", len(records), toolName, arguments.NodeName) | |
return mcp.NewToolResponse(mcp.NewTextContent(string(jsonData))), nil | |
}) | |
if err != nil { | |
log.Fatalf("Failed to register tool %s: %v", toolName, err) | |
} | |
log.Printf("Registered tool '%s'", toolName) | |
} | |
// 4. Start the server | |
log.Println("MCP server is ready and waiting for tool requests...") | |
err = server.Serve() | |
if err != nil { | |
log.Fatalf("Server failed to start: %v", err) | |
} | |
<-done // Keep the application running | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment