Skip to content

Instantly share code, notes, and snippets.

@surki
Last active March 22, 2021 13:07
Show Gist options
  • Save surki/0a848434596d8b8acc3efea1eec7b3b7 to your computer and use it in GitHub Desktop.
Save surki/0a848434596d8b8acc3efea1eec7b3b7 to your computer and use it in GitHub Desktop.
xds test
node:
id: foo
cluster: foo_envoy_test
admin:
access_log_path: "/dev/stdout"
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 9901
dynamic_resources:
cds_config:
resource_api_version: V3
ads: {}
ads_config:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: discovery
rate_limit_settings:
max_tokens: 100
fill_rate: 10
static_resources:
listeners:
- name: "listener_8000"
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 8000
per_connection_buffer_limit_bytes: 32768
drain_type: DEFAULT
listener_filters:
traffic_direction: INBOUND
reuse_port: true
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: listener_8000
codec_type: AUTO
route_config:
name: route_8000
validate_clusters: false
virtual_hosts:
- name: backend
domains:
- "*"
routes:
- match:
prefix: "/"
route:
cluster: foo
timeout: 60s
retry_policy:
retry_on: gateway-error,reset,connect-failure,refused-stream,cancelled,deadline-exceeded,resource-exhausted,unavailable
num_retries: 1
retry_host_predicate:
- name: envoy.retry_host_predicates.previous_hosts
host_selection_retry_max_attempts: 3
include_request_attempt_count: true
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
common_http_protocol_options:
#max_connection_duration: 0
idle_timeout: 300s
max_headers_count: 150
headers_with_underscores_action: DROP_HEADER
http_protocol_options:
accept_http_10: false
http2_protocol_options:
hpack_table_size: 4096
max_concurrent_streams: 128
initial_stream_window_size: 65536 # 64 KiB
initial_connection_window_size: 1048576 # 1 MiB
max_outbound_frames: 10000
max_outbound_control_frames: 1000
max_consecutive_inbound_frames_with_empty_payload: 1
max_inbound_priority_frames_per_stream: 100
max_inbound_window_update_frames_per_data_frame_sent: 10
stream_error_on_invalid_http_messaging: false
connection_keepalive:
interval: 60s
timeout: 30s
max_request_headers_kb: 96
request_timeout: 300s # 5 mins, must be disabled for long-lived and streaming requests
stream_idle_timeout: 300s # 5 mins, must be disabled for long-lived and streaming requests
drain_timeout: 10s
delayed_close_timeout: 5s
use_remote_address: true
xff_num_trusted_hops: 0
generate_request_id: true
preserve_external_request_id: false
always_set_request_id_in_response: true
forward_client_cert_details: SANITIZE
# TODO: Make it true?
normalize_path: false
# Merge contiguous slashes to one
merge_slashes: true
clusters:
- name: discovery
# per_connection_buffer_limit_bytes: 32768 # 32 KiB
type: STRICT_DNS
connect_timeout: 5s
# TODO: More evaluation for policy
lb_policy: LEAST_REQUEST
load_assignment:
# TODO: add "policy" configuration
cluster_name: discovery
# TODO: This must be plain text via NLB PrivateLink
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: discovery.edge.svc.cluster.local.
port_value: 18000
health_checks:
- interval_jitter: 1s
unhealthy_threshold: 6
healthy_threshold: 1
event_log_path: "/dev/stdout"
always_log_health_check_failures: true
timeout: 4s
interval: 10s
grpc_health_check:
service_name: xds:ready
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
upstream_http_protocol_options:
common_http_protocol_options:
idle_timeout: 3600s
max_headers_count: 170
headers_with_underscores_action: ALLOW
explicit_http_config:
http2_protocol_options:
max_concurrent_streams: 1024
# allow_connect: ???
dns_refresh_rate: 5s
dns_failure_refresh_rate:
base_interval: 1s
max_interval: 10s
respect_dns_ttl: true
dns_lookup_family: V4_ONLY
dns_resolvers:
- socket_address:
address: "127.0.0.1"
port_value: 5300
common_lb_config:
healthy_panic_threshold:
value: 0.0
ignore_new_hosts_until_first_hc: true
upstream_connection_options:
tcp_keepalive:
keepalive_probes: 5
keepalive_interval: 5
keepalive_time: 300
ignore_health_on_host_removal: true
package main
// A sample program to test envoy when xds server IP changes.
//
// - This starts an xDS server that will switch between at 127.0.0.1:18000 and 127.0.0.6:18000 on receiving SIGUSR1 signal
// - This also has built-in DNS server at :5300 that will switch the A record appropriately
// - Make sure envoy is configured to use this resolver, not system one
//
// How to use:
// - Copy this file, run
// - go mod init main
// - go mod tidy
// - go build -o xds-test
// - ./xds-test
//
// - Run envoy pointing to this xds server
//
// - Run kill -SIGUSR2 `pidof xds-test` to update the snapshot (== send cluster/endpoint update to Envoy)
//
// - Run kill -SIGUSR1 `pidof xds-test` to kill/start xds servers
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/golang/protobuf/ptypes"
"github.com/miekg/dns"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)
type XdsServer struct {
server serverv3.Server
snapshotCache cachev3.SnapshotCache
}
var log logr.Logger
var initialReadyDelay = 1 * time.Second
var runCount int
var runCtx context.Context
var runCtxCancel context.CancelFunc
var rootCtx context.Context
var xds *XdsServer
func main() {
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
l, err := config.Build()
if err != nil {
panic(err)
}
log = zapr.NewLogger(l)
defer func() { _ = l.Sync() }()
c, ctx, cancel := setupSignalHandling()
defer func() {
signal.Stop(c)
cancel()
}()
rootCtx = ctx
runServer()
runDnsServer(":5300", rootCtx)
<-ctx.Done()
}
func runServer() {
log.Info("starting server")
if runCtxCancel != nil {
runCtxCancel()
}
runCtx, runCtxCancel = context.WithCancel(rootCtx)
if runCount%2 == 0 {
go runXdsServer(runCtx, "127.0.0.7:18000")
} else {
go runXdsServer(runCtx, "127.0.0.6:18000")
}
}
func setupSignalHandling() (chan os.Signal, context.Context, context.CancelFunc) {
var cancel context.CancelFunc
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for {
select {
case s := <-c:
if s == syscall.SIGUSR1 {
runCount++
runServer()
} else if s == syscall.SIGUSR2 {
xds.updateSnapshot()
}
case <-ctx.Done():
return
}
}
}()
return c, ctx, cancel
}
func runXdsServer(ctx context.Context, addr string) {
e := &XdsServer{}
xds = e
// var ll go_control_plane_log.Logger
ll := log.(zapr.Underlier).GetUnderlying().Sugar()
e.snapshotCache = cachev3.NewSnapshotCache(true, nodeHash{}, ll)
e.server = serverv3.NewServer(context.Background(), e.snapshotCache, e)
log.Info("management server listening", "addr", addr)
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(10000))
grpcServer := grpc.NewServer(grpcOptions...)
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, e.server)
h := health.NewServer()
defer h.Shutdown()
h.SetServingStatus("xds:ready", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
time.AfterFunc(initialReadyDelay,
func() {
select {
case <-ctx.Done():
return
default:
log.Info("enabling readiness")
h.SetServingStatus("xds:ready", grpc_health_v1.HealthCheckResponse_SERVING)
}
})
grpc_health_v1.RegisterHealthServer(grpcServer, h)
reflection.Register(grpcServer)
listener, err := net.Listen("tcp", addr)
if err != nil {
panic(fmt.Sprintf("cannot listen. err=%v addr=%v", err, addr))
}
go func() {
if err := grpcServer.Serve(listener); err != nil {
log.Error(err, "grpc server error")
}
}()
<-ctx.Done()
grpcServer.Stop()
}
func (e *XdsServer) updateSnapshot() {
version := fmt.Sprint(time.Now().UnixNano())
log.Info("setting snapshot", "version", version)
clusters, eps := getResources()
snap := cachev3.NewSnapshot(version, eps, clusters, nil, nil, nil, nil)
if err := snap.Consistent(); err != nil {
log.Error(err, "snapshot inconsistency")
os.Exit(1)
}
err := e.snapshotCache.SetSnapshot("foo_envoy_test", snap)
if err != nil {
log.Error(err, "could not set snapshot")
os.Exit(1)
}
}
func (e *XdsServer) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
log.Info("OnStreamOpen", "streamID", streamID, "typeURL", typeURL)
return nil
}
func (e *XdsServer) OnStreamClosed(streamID int64) {
log.Info("OnStreamClosed", "streamID", streamID)
}
func (e *XdsServer) OnStreamRequest(streamID int64, req *discoverygrpc.DiscoveryRequest) error {
log.Info("OnStreamRequest", "streamID", streamID, "discovery.request", discoveryReqStr(req))
err := req.GetErrorDetail()
if err != nil {
log.Error(fmt.Errorf("%s", err.GetMessage()),
"envoy rejected configuration",
"version-in-use", req.GetVersionInfo(),
)
}
return nil
}
func (e *XdsServer) OnStreamResponse(streamID int64, req *discoverygrpc.DiscoveryRequest, resp *discoverygrpc.DiscoveryResponse) {
resources := ""
for _, res := range resp.Resources {
resources += res.String()
}
log.Info("OnStreamResponse", "streamID", streamID,
"discovery.request", discoveryReqStr(req),
"discovery.response", discoveryRespStr(resp),
"responseResources", resources,
)
}
func (e *XdsServer) OnFetchRequest(ctx context.Context, req *discoverygrpc.DiscoveryRequest) error {
log.Info("OnFetchRequest", "discovery.request", discoveryReqStr(req))
return nil
}
func (e *XdsServer) OnFetchResponse(req *discoverygrpc.DiscoveryRequest, resp *discoverygrpc.DiscoveryResponse) {
log.Info("OnFetchResponse",
"discovery.request", discoveryReqStr(req),
"discovery.response", discoveryRespStr(resp),
)
}
type nodeHash struct{}
func (nodeHash) ID(node *envoy_core.Node) string {
return node.Cluster
}
type discoveryNode struct {
Id string `json:"id"`
Cluster string `json:"cluster"`
Locality *envoy_core.Locality `json:"locality"`
}
type discoveryReq struct {
Node discoveryNode `json:"node"`
Version string `json:"version"`
Typeurl string `json:"typeurl"`
RespNonce string `json:"respNonce"`
ErrorDetail string `json:"errorDetail"`
}
type discoveryResp struct {
Version string `json:"version"`
Node discoveryNode `json:"node"`
Typeurl string `json:"typeurl"`
Nonce string `json:"nonce"`
}
func discoveryReqStr(req *discoverygrpc.DiscoveryRequest) discoveryReq {
var n discoveryNode
if req.Node != nil {
n.Id = req.Node.GetId()
n.Cluster = req.Node.GetCluster()
n.Locality = req.Node.GetLocality()
}
ro := discoveryReq{
Version: req.VersionInfo,
Typeurl: req.TypeUrl,
RespNonce: req.ResponseNonce,
ErrorDetail: req.GetErrorDetail().String(),
Node: n,
}
return ro
}
func discoveryRespStr(resp *discoverygrpc.DiscoveryResponse) discoveryResp {
return discoveryResp{
Version: resp.VersionInfo,
Typeurl: resp.TypeUrl,
Nonce: resp.Nonce,
}
}
func getResources() ([]types.Resource, []types.Resource) {
clusterName := "foo"
ep := &envoy_endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*envoy_endpoint.LocalityLbEndpoints{{
LbEndpoints: []*envoy_endpoint.LbEndpoint{
{
HostIdentifier: &envoy_endpoint.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint.Endpoint{
Address: &envoy_core.Address{
Address: &envoy_core.Address_SocketAddress{
SocketAddress: &envoy_core.SocketAddress{
Address: "1.2.3.4",
Protocol: envoy_core.SocketAddress_TCP,
PortSpecifier: &envoy_core.SocketAddress_PortValue{
PortValue: uint32(80),
},
},
}},
}},
},
},
}},
}
ec := &envoy_cluster.Cluster{
Name: clusterName,
ConnectTimeout: ptypes.DurationProto(10 * time.Second),
ClusterDiscoveryType: &envoy_cluster.Cluster_Type{Type: envoy_cluster.Cluster_EDS},
DnsLookupFamily: envoy_cluster.Cluster_V4_ONLY,
LbPolicy: envoy_cluster.Cluster_ROUND_ROBIN,
EdsClusterConfig: &envoy_cluster.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core.ConfigSource{
ResourceApiVersion: envoy_core.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core.ConfigSource_Ads{
Ads: &envoy_core.AggregatedConfigSource{},
},
},
},
}
return []types.Resource{ec}, []types.Resource{ep}
}
type dnsHandler struct{}
func (this *dnsHandler) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
msg := dns.Msg{}
msg.SetReply(r)
switch r.Question[0].Qtype {
case dns.TypeA:
msg.Authoritative = true
domain := msg.Question[0].Name
var address string
if runCount%2 == 0 {
address = "127.0.0.7"
} else {
address = "127.0.0.6"
}
msg.Answer = append(msg.Answer, &dns.A{
Hdr: dns.RR_Header{Name: domain, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5},
A: net.ParseIP(address),
})
}
w.WriteMsg(&msg)
}
func runDnsServer(addr string, ctx context.Context) {
srv := &dns.Server{Addr: addr, Net: "udp"}
srv.Handler = &dnsHandler{}
go func() {
if err := srv.ListenAndServe(); err != nil {
log.Error(err, "Failed to set udp listener")
os.Exit(1)
}
}()
<-ctx.Done()
srv.Shutdown()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment