Last active
March 22, 2021 13:07
-
-
Save surki/0a848434596d8b8acc3efea1eec7b3b7 to your computer and use it in GitHub Desktop.
xds test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
node: | |
id: foo | |
cluster: foo_envoy_test | |
admin: | |
access_log_path: "/dev/stdout" | |
address: | |
socket_address: | |
protocol: TCP | |
address: 0.0.0.0 | |
port_value: 9901 | |
dynamic_resources: | |
cds_config: | |
resource_api_version: V3 | |
ads: {} | |
ads_config: | |
api_type: GRPC | |
transport_api_version: V3 | |
grpc_services: | |
- envoy_grpc: | |
cluster_name: discovery | |
rate_limit_settings: | |
max_tokens: 100 | |
fill_rate: 10 | |
static_resources: | |
listeners: | |
- name: "listener_8000" | |
address: | |
socket_address: | |
protocol: TCP | |
address: 0.0.0.0 | |
port_value: 8000 | |
per_connection_buffer_limit_bytes: 32768 | |
drain_type: DEFAULT | |
listener_filters: | |
traffic_direction: INBOUND | |
reuse_port: true | |
filter_chains: | |
- filters: | |
- name: envoy.filters.network.http_connection_manager | |
typed_config: | |
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager | |
stat_prefix: listener_8000 | |
codec_type: AUTO | |
route_config: | |
name: route_8000 | |
validate_clusters: false | |
virtual_hosts: | |
- name: backend | |
domains: | |
- "*" | |
routes: | |
- match: | |
prefix: "/" | |
route: | |
cluster: foo | |
timeout: 60s | |
retry_policy: | |
retry_on: gateway-error,reset,connect-failure,refused-stream,cancelled,deadline-exceeded,resource-exhausted,unavailable | |
num_retries: 1 | |
retry_host_predicate: | |
- name: envoy.retry_host_predicates.previous_hosts | |
host_selection_retry_max_attempts: 3 | |
include_request_attempt_count: true | |
http_filters: | |
- name: envoy.filters.http.router | |
typed_config: | |
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router | |
common_http_protocol_options: | |
#max_connection_duration: 0 | |
idle_timeout: 300s | |
max_headers_count: 150 | |
headers_with_underscores_action: DROP_HEADER | |
http_protocol_options: | |
accept_http_10: false | |
http2_protocol_options: | |
hpack_table_size: 4096 | |
max_concurrent_streams: 128 | |
initial_stream_window_size: 65536 # 64 KiB | |
initial_connection_window_size: 1048576 # 1 MiB | |
max_outbound_frames: 10000 | |
max_outbound_control_frames: 1000 | |
max_consecutive_inbound_frames_with_empty_payload: 1 | |
max_inbound_priority_frames_per_stream: 100 | |
max_inbound_window_update_frames_per_data_frame_sent: 10 | |
stream_error_on_invalid_http_messaging: false | |
connection_keepalive: | |
interval: 60s | |
timeout: 30s | |
max_request_headers_kb: 96 | |
request_timeout: 300s # 5 mins, must be disabled for long-lived and streaming requests | |
stream_idle_timeout: 300s # 5 mins, must be disabled for long-lived and streaming requests | |
drain_timeout: 10s | |
delayed_close_timeout: 5s | |
use_remote_address: true | |
xff_num_trusted_hops: 0 | |
generate_request_id: true | |
preserve_external_request_id: false | |
always_set_request_id_in_response: true | |
forward_client_cert_details: SANITIZE | |
# TODO: Make it true? | |
normalize_path: false | |
# Merge contiguous slashes to one | |
merge_slashes: true | |
clusters: | |
- name: discovery | |
# per_connection_buffer_limit_bytes: 32768 # 32 KiB | |
type: STRICT_DNS | |
connect_timeout: 5s | |
# TODO: More evaluation for policy | |
lb_policy: LEAST_REQUEST | |
load_assignment: | |
# TODO: add "policy" configuration | |
cluster_name: discovery | |
# TODO: This must be plain text via NLB PrivateLink | |
endpoints: | |
- lb_endpoints: | |
- endpoint: | |
address: | |
socket_address: | |
address: discovery.edge.svc.cluster.local. | |
port_value: 18000 | |
health_checks: | |
- interval_jitter: 1s | |
unhealthy_threshold: 6 | |
healthy_threshold: 1 | |
event_log_path: "/dev/stdout" | |
always_log_health_check_failures: true | |
timeout: 4s | |
interval: 10s | |
grpc_health_check: | |
service_name: xds:ready | |
typed_extension_protocol_options: | |
envoy.extensions.upstreams.http.v3.HttpProtocolOptions: | |
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions | |
upstream_http_protocol_options: | |
common_http_protocol_options: | |
idle_timeout: 3600s | |
max_headers_count: 170 | |
headers_with_underscores_action: ALLOW | |
explicit_http_config: | |
http2_protocol_options: | |
max_concurrent_streams: 1024 | |
# allow_connect: ??? | |
dns_refresh_rate: 5s | |
dns_failure_refresh_rate: | |
base_interval: 1s | |
max_interval: 10s | |
respect_dns_ttl: true | |
dns_lookup_family: V4_ONLY | |
dns_resolvers: | |
- socket_address: | |
address: "127.0.0.1" | |
port_value: 5300 | |
common_lb_config: | |
healthy_panic_threshold: | |
value: 0.0 | |
ignore_new_hosts_until_first_hc: true | |
upstream_connection_options: | |
tcp_keepalive: | |
keepalive_probes: 5 | |
keepalive_interval: 5 | |
keepalive_time: 300 | |
ignore_health_on_host_removal: true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// A sample program to test envoy when xds server IP changes. | |
// | |
// - This starts an xDS server that will switch between at 127.0.0.1:18000 and 127.0.0.6:18000 on receiving SIGUSR1 signal | |
// - This also has built-in DNS server at :5300 that will switch the A record appropriately | |
// - Make sure envoy is configured to use this resolver, not system one | |
// | |
// How to use: | |
// - Copy this file, run | |
// - go mod init main | |
// - go mod tidy | |
// - go build -o xds-test | |
// - ./xds-test | |
// | |
// - Run envoy pointing to this xds server | |
// | |
// - Run kill -SIGUSR2 `pidof xds-test` to update the snapshot (== send cluster/endpoint update to Envoy) | |
// | |
// - Run kill -SIGUSR1 `pidof xds-test` to kill/start xds servers | |
import ( | |
"context" | |
"fmt" | |
"net" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" | |
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" | |
envoy_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" | |
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" | |
"github.com/envoyproxy/go-control-plane/pkg/cache/types" | |
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" | |
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" | |
"github.com/go-logr/logr" | |
"github.com/go-logr/zapr" | |
"github.com/golang/protobuf/ptypes" | |
"github.com/miekg/dns" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/health" | |
"google.golang.org/grpc/health/grpc_health_v1" | |
"google.golang.org/grpc/reflection" | |
) | |
type XdsServer struct { | |
server serverv3.Server | |
snapshotCache cachev3.SnapshotCache | |
} | |
var log logr.Logger | |
var initialReadyDelay = 1 * time.Second | |
var runCount int | |
var runCtx context.Context | |
var runCtxCancel context.CancelFunc | |
var rootCtx context.Context | |
var xds *XdsServer | |
func main() { | |
config := zap.NewDevelopmentConfig() | |
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder | |
l, err := config.Build() | |
if err != nil { | |
panic(err) | |
} | |
log = zapr.NewLogger(l) | |
defer func() { _ = l.Sync() }() | |
c, ctx, cancel := setupSignalHandling() | |
defer func() { | |
signal.Stop(c) | |
cancel() | |
}() | |
rootCtx = ctx | |
runServer() | |
runDnsServer(":5300", rootCtx) | |
<-ctx.Done() | |
} | |
func runServer() { | |
log.Info("starting server") | |
if runCtxCancel != nil { | |
runCtxCancel() | |
} | |
runCtx, runCtxCancel = context.WithCancel(rootCtx) | |
if runCount%2 == 0 { | |
go runXdsServer(runCtx, "127.0.0.7:18000") | |
} else { | |
go runXdsServer(runCtx, "127.0.0.6:18000") | |
} | |
} | |
func setupSignalHandling() (chan os.Signal, context.Context, context.CancelFunc) { | |
var cancel context.CancelFunc | |
ctx, cancel := context.WithCancel(context.Background()) | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2) | |
go func() { | |
for { | |
select { | |
case s := <-c: | |
if s == syscall.SIGUSR1 { | |
runCount++ | |
runServer() | |
} else if s == syscall.SIGUSR2 { | |
xds.updateSnapshot() | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}() | |
return c, ctx, cancel | |
} | |
func runXdsServer(ctx context.Context, addr string) { | |
e := &XdsServer{} | |
xds = e | |
// var ll go_control_plane_log.Logger | |
ll := log.(zapr.Underlier).GetUnderlying().Sugar() | |
e.snapshotCache = cachev3.NewSnapshotCache(true, nodeHash{}, ll) | |
e.server = serverv3.NewServer(context.Background(), e.snapshotCache, e) | |
log.Info("management server listening", "addr", addr) | |
var grpcOptions []grpc.ServerOption | |
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(10000)) | |
grpcServer := grpc.NewServer(grpcOptions...) | |
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, e.server) | |
h := health.NewServer() | |
defer h.Shutdown() | |
h.SetServingStatus("xds:ready", grpc_health_v1.HealthCheckResponse_NOT_SERVING) | |
time.AfterFunc(initialReadyDelay, | |
func() { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
log.Info("enabling readiness") | |
h.SetServingStatus("xds:ready", grpc_health_v1.HealthCheckResponse_SERVING) | |
} | |
}) | |
grpc_health_v1.RegisterHealthServer(grpcServer, h) | |
reflection.Register(grpcServer) | |
listener, err := net.Listen("tcp", addr) | |
if err != nil { | |
panic(fmt.Sprintf("cannot listen. err=%v addr=%v", err, addr)) | |
} | |
go func() { | |
if err := grpcServer.Serve(listener); err != nil { | |
log.Error(err, "grpc server error") | |
} | |
}() | |
<-ctx.Done() | |
grpcServer.Stop() | |
} | |
func (e *XdsServer) updateSnapshot() { | |
version := fmt.Sprint(time.Now().UnixNano()) | |
log.Info("setting snapshot", "version", version) | |
clusters, eps := getResources() | |
snap := cachev3.NewSnapshot(version, eps, clusters, nil, nil, nil, nil) | |
if err := snap.Consistent(); err != nil { | |
log.Error(err, "snapshot inconsistency") | |
os.Exit(1) | |
} | |
err := e.snapshotCache.SetSnapshot("foo_envoy_test", snap) | |
if err != nil { | |
log.Error(err, "could not set snapshot") | |
os.Exit(1) | |
} | |
} | |
func (e *XdsServer) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error { | |
log.Info("OnStreamOpen", "streamID", streamID, "typeURL", typeURL) | |
return nil | |
} | |
func (e *XdsServer) OnStreamClosed(streamID int64) { | |
log.Info("OnStreamClosed", "streamID", streamID) | |
} | |
func (e *XdsServer) OnStreamRequest(streamID int64, req *discoverygrpc.DiscoveryRequest) error { | |
log.Info("OnStreamRequest", "streamID", streamID, "discovery.request", discoveryReqStr(req)) | |
err := req.GetErrorDetail() | |
if err != nil { | |
log.Error(fmt.Errorf("%s", err.GetMessage()), | |
"envoy rejected configuration", | |
"version-in-use", req.GetVersionInfo(), | |
) | |
} | |
return nil | |
} | |
func (e *XdsServer) OnStreamResponse(streamID int64, req *discoverygrpc.DiscoveryRequest, resp *discoverygrpc.DiscoveryResponse) { | |
resources := "" | |
for _, res := range resp.Resources { | |
resources += res.String() | |
} | |
log.Info("OnStreamResponse", "streamID", streamID, | |
"discovery.request", discoveryReqStr(req), | |
"discovery.response", discoveryRespStr(resp), | |
"responseResources", resources, | |
) | |
} | |
func (e *XdsServer) OnFetchRequest(ctx context.Context, req *discoverygrpc.DiscoveryRequest) error { | |
log.Info("OnFetchRequest", "discovery.request", discoveryReqStr(req)) | |
return nil | |
} | |
func (e *XdsServer) OnFetchResponse(req *discoverygrpc.DiscoveryRequest, resp *discoverygrpc.DiscoveryResponse) { | |
log.Info("OnFetchResponse", | |
"discovery.request", discoveryReqStr(req), | |
"discovery.response", discoveryRespStr(resp), | |
) | |
} | |
type nodeHash struct{} | |
func (nodeHash) ID(node *envoy_core.Node) string { | |
return node.Cluster | |
} | |
type discoveryNode struct { | |
Id string `json:"id"` | |
Cluster string `json:"cluster"` | |
Locality *envoy_core.Locality `json:"locality"` | |
} | |
type discoveryReq struct { | |
Node discoveryNode `json:"node"` | |
Version string `json:"version"` | |
Typeurl string `json:"typeurl"` | |
RespNonce string `json:"respNonce"` | |
ErrorDetail string `json:"errorDetail"` | |
} | |
type discoveryResp struct { | |
Version string `json:"version"` | |
Node discoveryNode `json:"node"` | |
Typeurl string `json:"typeurl"` | |
Nonce string `json:"nonce"` | |
} | |
func discoveryReqStr(req *discoverygrpc.DiscoveryRequest) discoveryReq { | |
var n discoveryNode | |
if req.Node != nil { | |
n.Id = req.Node.GetId() | |
n.Cluster = req.Node.GetCluster() | |
n.Locality = req.Node.GetLocality() | |
} | |
ro := discoveryReq{ | |
Version: req.VersionInfo, | |
Typeurl: req.TypeUrl, | |
RespNonce: req.ResponseNonce, | |
ErrorDetail: req.GetErrorDetail().String(), | |
Node: n, | |
} | |
return ro | |
} | |
func discoveryRespStr(resp *discoverygrpc.DiscoveryResponse) discoveryResp { | |
return discoveryResp{ | |
Version: resp.VersionInfo, | |
Typeurl: resp.TypeUrl, | |
Nonce: resp.Nonce, | |
} | |
} | |
func getResources() ([]types.Resource, []types.Resource) { | |
clusterName := "foo" | |
ep := &envoy_endpoint.ClusterLoadAssignment{ | |
ClusterName: clusterName, | |
Endpoints: []*envoy_endpoint.LocalityLbEndpoints{{ | |
LbEndpoints: []*envoy_endpoint.LbEndpoint{ | |
{ | |
HostIdentifier: &envoy_endpoint.LbEndpoint_Endpoint{ | |
Endpoint: &envoy_endpoint.Endpoint{ | |
Address: &envoy_core.Address{ | |
Address: &envoy_core.Address_SocketAddress{ | |
SocketAddress: &envoy_core.SocketAddress{ | |
Address: "1.2.3.4", | |
Protocol: envoy_core.SocketAddress_TCP, | |
PortSpecifier: &envoy_core.SocketAddress_PortValue{ | |
PortValue: uint32(80), | |
}, | |
}, | |
}}, | |
}}, | |
}, | |
}, | |
}}, | |
} | |
ec := &envoy_cluster.Cluster{ | |
Name: clusterName, | |
ConnectTimeout: ptypes.DurationProto(10 * time.Second), | |
ClusterDiscoveryType: &envoy_cluster.Cluster_Type{Type: envoy_cluster.Cluster_EDS}, | |
DnsLookupFamily: envoy_cluster.Cluster_V4_ONLY, | |
LbPolicy: envoy_cluster.Cluster_ROUND_ROBIN, | |
EdsClusterConfig: &envoy_cluster.Cluster_EdsClusterConfig{ | |
EdsConfig: &envoy_core.ConfigSource{ | |
ResourceApiVersion: envoy_core.ApiVersion_V3, | |
ConfigSourceSpecifier: &envoy_core.ConfigSource_Ads{ | |
Ads: &envoy_core.AggregatedConfigSource{}, | |
}, | |
}, | |
}, | |
} | |
return []types.Resource{ec}, []types.Resource{ep} | |
} | |
type dnsHandler struct{} | |
func (this *dnsHandler) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { | |
msg := dns.Msg{} | |
msg.SetReply(r) | |
switch r.Question[0].Qtype { | |
case dns.TypeA: | |
msg.Authoritative = true | |
domain := msg.Question[0].Name | |
var address string | |
if runCount%2 == 0 { | |
address = "127.0.0.7" | |
} else { | |
address = "127.0.0.6" | |
} | |
msg.Answer = append(msg.Answer, &dns.A{ | |
Hdr: dns.RR_Header{Name: domain, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, | |
A: net.ParseIP(address), | |
}) | |
} | |
w.WriteMsg(&msg) | |
} | |
func runDnsServer(addr string, ctx context.Context) { | |
srv := &dns.Server{Addr: addr, Net: "udp"} | |
srv.Handler = &dnsHandler{} | |
go func() { | |
if err := srv.ListenAndServe(); err != nil { | |
log.Error(err, "Failed to set udp listener") | |
os.Exit(1) | |
} | |
}() | |
<-ctx.Done() | |
srv.Shutdown() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment