Skip to content

Instantly share code, notes, and snippets.

@kyrtapz
Created June 25, 2025 12:32
Show Gist options
  • Save kyrtapz/9bd2c5126f5b43e82a77296f2610dad9 to your computer and use it in GitHub Desktop.
Save kyrtapz/9bd2c5126f5b43e82a77296f2610dad9 to your computer and use it in GitHub Desktop.
Trivial MCP server interacting with OVN in OVN-Kubernetes
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
mcp "github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/stdio"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)
// --- OVN Tool Definitions ---
const (
// ovnNodePodLabel is the label used to identify the pods running OVN tools.
ovnNodePodLabel = "app=ovnkube-node"
// ovnNodeContainerName is the name of the container within the pod.
ovnNodeContainerName = "nb-ovsdb"
)
// OvnToolArguments defines the input parameters for our inspection tools.
// It requires a node name to be specified for the query.
type OvnToolArguments struct {
NodeName string `json:"nodeName" jsonschema:"required,description=The name of the Kubernetes node to inspect."`
}
// Define MCP tool details
var ovnTools = map[string]struct {
db string // "nb" for Northbound, "sb" for Southbound
table string
description string
}{
"ovn/inspect_logical_switch": {
db: "nb",
table: "Logical_Switch",
description: "Fetches all OVN Logical Switches from a specific node.",
},
"ovn/inspect_logical_switch_port": {
db: "nb",
table: "Logical_Switch_Port",
description: "Fetches all OVN Logical Switch Ports from a specific node.",
},
"ovn/inspect_acl": {
db: "nb",
table: "ACL",
description: "Fetches all OVN ACLs from a specific node.",
},
"ovn/inspect_nat": {
db: "nb",
table: "NAT",
description: "Fetches all OVN NAT rules from a specific node.",
},
"ovn/inspect_logical_router": {
db: "nb",
table: "Logical_Router",
description: "Fetches all OVN Logical Routers from a specific node.",
},
"ovn/inspect_logical_router_policy": {
db: "nb",
table: "Logical_Router_Policy",
description: "Fetches all OVN Logical Router Policies from a specific node.",
},
"ovn/inspect_chassis": {
db: "sb",
table: "Chassis",
description: "Fetches all OVN Chassis from the Southbound DB on a specific node.",
},
}
// --- Kubernetes Interaction ---
// KubeClient is a wrapper for Kubernetes client-go interactions.
type KubeClient struct {
clientset *kubernetes.Clientset
config *rest.Config
}
// NewKubeClient creates a new Kubernetes client.
func NewKubeClient() (*KubeClient, error) {
config, err := rest.InClusterConfig()
if err != nil {
log.Println("Could not create in-cluster config, falling back to kubeconfig...")
home, exists := os.LookupEnv("HOME")
if !exists {
home = "~"
}
kubeconfig := filepath.Join(home, ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("could not create kubeconfig: %w", err)
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("could not create clientset: %w", err)
}
return &KubeClient{clientset: clientset, config: config}, nil
}
// FindOvnNodePodOnNode finds the ovnkube-node pod on a specific Kubernetes node.
func (c *KubeClient) FindOvnNodePodOnNode(ctx context.Context, nodeName string) (*v1.Pod, error) {
pods, err := c.clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
LabelSelector: ovnNodePodLabel,
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
return nil, fmt.Errorf("failed to list pods on node %s with label %s: %w", nodeName, ovnNodePodLabel, err)
}
if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pod with label '%s' found on node '%s'", ovnNodePodLabel, nodeName)
}
if len(pods.Items) > 1 {
log.Printf("WARN: Found %d pods with label %s on node %s, using the first one: %s", len(pods.Items), ovnNodePodLabel, nodeName, pods.Items[0].Name)
}
return &pods.Items[0], nil
}
// ExecInPod executes a command in a specified pod and container.
func (c *KubeClient) ExecInPod(ctx context.Context, namespace, podName, containerName string, command []string) (string, string, error) {
req := c.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
Command: command,
Container: containerName,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(c.config, http.MethodPost, req.URL())
if err != nil {
return "", "", fmt.Errorf("failed to create SPDY executor: %w", err)
}
var stdout, stderr bytes.Buffer
err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if err != nil {
return stdout.String(), stderr.String(), fmt.Errorf("failed to stream exec: %w", err)
}
return stdout.String(), stderr.String(), nil
}
// --- OVN Data Inspection ---
// OvnInspector is responsible for fetching OVN data from pods.
type OvnInspector struct {
kubeClient *KubeClient
}
// NewOvnInspector creates a new OVN inspector.
func NewOvnInspector(client *KubeClient) *OvnInspector {
return &OvnInspector{kubeClient: client}
}
// OvnRecord represents a single row from an OVN database table.
type OvnRecord map[string]interface{}
// OvnJSONOutput is the structure of the JSON output from ovn-ctl commands.
type OvnJSONOutput struct {
Data [][]interface{} `json:"data"`
Headings []string `json:"headings"`
}
// FetchOvnDataForNode finds the ovnkube-node pod on a specific node and runs the command.
func (i *OvnInspector) FetchOvnDataForNode(ctx context.Context, nodeName, db, table string) ([]OvnRecord, error) {
pod, err := i.kubeClient.FindOvnNodePodOnNode(ctx, nodeName)
if err != nil {
return nil, fmt.Errorf("failed to find ovnkube-node pod: %w", err)
}
command := []string{
fmt.Sprintf("ovn-%sctl", db),
"--format=json",
"list",
table,
}
log.Printf("Executing command on pod %s (node %s): %s", pod.Name, nodeName, strings.Join(command, " "))
stdout, stderr, err := i.kubeClient.ExecInPod(ctx, pod.Namespace, pod.Name, ovnNodeContainerName, command)
if err != nil {
log.Printf("ERROR: exec failed for pod %s on node %s for table %s. Stderr: %s. Error: %v", pod.Name, pod.Spec.NodeName, table, stderr, err)
return nil, fmt.Errorf("exec command failed on node %s: %w. Stderr: %s", nodeName, err, stderr)
}
records, err := parseOvnJSON(stdout)
if err != nil {
return nil, fmt.Errorf("failed to parse OVN JSON from node %s: %w", nodeName, err)
}
// Add node metadata to each record
for i := range records {
records[i]["_node_name"] = pod.Spec.NodeName
records[i]["_pod_name"] = pod.Name
}
return records, nil
}
// parseOvnJSON converts the JSON output from ovn-ctl into a slice of structured records.
func parseOvnJSON(output string) ([]OvnRecord, error) {
if strings.TrimSpace(output) == "" {
return []OvnRecord{}, nil
}
var parsed OvnJSONOutput
if err := json.Unmarshal([]byte(output), &parsed); err != nil {
return nil, fmt.Errorf("failed to unmarshal OVN JSON output: %w", err)
}
records := make([]OvnRecord, len(parsed.Data))
for i, rowData := range parsed.Data {
if len(rowData) != len(parsed.Headings) {
return nil, errors.New("mismatch between heading and data column count")
}
record := make(OvnRecord)
for j, heading := range parsed.Headings {
key := strings.ReplaceAll(heading, "_", "-")
record[key] = rowData[j]
}
records[i] = record
}
return records, nil
}
// --- Main Application ---
func main() {
log.Println("Starting OVN MCP Inspector Server...")
done := make(chan struct{})
// 1. Initialize Kubernetes Client and OVN Inspector
kubeClient, err := NewKubeClient()
if err != nil {
log.Fatalf("Failed to create Kubernetes client: %v", err)
}
log.Println("Successfully connected to Kubernetes cluster.")
inspector := NewOvnInspector(kubeClient)
// 2. Setup MCP Server using stdio transport
server := mcp.NewServer(stdio.NewStdioServerTransport())
// 3. Register a tool for each OVN data type
for name, details := range ovnTools {
// Capture loop variables for the closure
toolName := name
toolDetails := details
err := server.RegisterTool(toolName, toolDetails.description, func(arguments OvnToolArguments) (*mcp.ToolResponse, error) {
log.Printf("Received request for tool: %s with NodeName: %s", toolName, arguments.NodeName)
if arguments.NodeName == "" {
return nil, errors.New("'nodeName' argument is required")
}
// Use a context with timeout for the fetch operation
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// Fetch data from the specified node for this specific table
records, err := inspector.FetchOvnDataForNode(ctx, arguments.NodeName, toolDetails.db, toolDetails.table)
if err != nil {
log.Printf("ERROR fetching data for %s on node %s: %v", toolName, arguments.NodeName, err)
// Return a descriptive error to the MCP client
return nil, fmt.Errorf("failed to fetch OVN data for %s on node %s: %w", toolName, arguments.NodeName, err)
}
// Marshal the records into a single JSON response
jsonData, err := json.MarshalIndent(records, "", " ")
if err != nil {
log.Printf("ERROR marshalling JSON for %s: %v", toolName, err)
return nil, fmt.Errorf("failed to marshal response for %s: %w", toolName, err)
}
log.Printf("Successfully fetched %d records for tool %s on node %s", len(records), toolName, arguments.NodeName)
return mcp.NewToolResponse(mcp.NewTextContent(string(jsonData))), nil
})
if err != nil {
log.Fatalf("Failed to register tool %s: %v", toolName, err)
}
log.Printf("Registered tool '%s'", toolName)
}
// 4. Start the server
log.Println("MCP server is ready and waiting for tool requests...")
err = server.Serve()
if err != nil {
log.Fatalf("Server failed to start: %v", err)
}
<-done // Keep the application running
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment