Last active
August 17, 2022 19:09
-
-
Save tiagolobocastro/4890040c9fc1f5bb526e5de5a6d0cdea to your computer and use it in GitHub Desktop.
half-closed connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Context; | |
// Example to listen on port 8080 locally, forwarding to port 80 in the example pod. | |
// Similar to `kubectl port-forward pod/example 8080:80`. | |
use futures::{StreamExt, TryStreamExt}; | |
use std::net::SocketAddr; | |
use tokio::{ | |
io::{AsyncRead, AsyncWrite}, | |
net::TcpListener, | |
}; | |
use tokio_stream::wrappers::TcpListenerStream; | |
use tracing::*; | |
use k8s_openapi::api::core::v1::Pod; | |
use kube::{ | |
api::{Api, DeleteParams, PostParams}, | |
runtime::wait::{await_condition, conditions::is_pod_running}, | |
Client, ResourceExt, | |
}; | |
#[tokio::main] | |
async fn main() -> anyhow::Result<()> { | |
tracing_subscriber::fmt::init(); | |
let client = Client::try_default().await?; | |
let p: Pod = serde_json::from_value(serde_json::json!({ | |
"apiVersion": "v1", | |
"kind": "Pod", | |
"metadata": { | |
"labels": { | |
"app": "etcd" | |
}, | |
"name": "etcd-0" | |
}, | |
"spec": { | |
"containers": [ | |
{ | |
"command": [ | |
"/usr/local/bin/etcd", | |
"--listen-client-urls", | |
"http://0.0.0.0:2379", | |
"--advertise-client-urls", | |
"http://0.0.0.0:2379" | |
], | |
"image": "quay.io/coreos/etcd:latest", | |
"name": "etcd-0", | |
"ports": [ | |
{ | |
"containerPort": 2379, | |
"name": "client", | |
"protocol": "TCP" | |
}, | |
{ | |
"containerPort": 2380, | |
"name": "peer", | |
"protocol": "TCP" | |
} | |
] | |
} | |
] | |
} | |
}))?; | |
let pods: Api<Pod> = Api::default_namespaced(client.clone()); | |
let _ = pods | |
.delete("etcd-0", &DeleteParams::default().grace_period(0)) | |
.await | |
.ok(); | |
// Stop on error including a pod already exists or is still being deleted. | |
info!("creating etcd-0 pod"); | |
pods.create(&PostParams::default(), &p).await?; | |
// Wait until the pod is running, otherwise we get 500 error. | |
info!("waiting for etcd-0 pod to start"); | |
let running = await_condition(pods.clone(), "etcd-0", is_pod_running()); | |
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), running).await?; | |
let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); | |
let pod_port = 80; | |
info!(local_addr = %addr, pod_port, "forwarding traffic to the pod"); | |
info!("use Ctrl-C to stop the server and delete the pod"); | |
let server = TcpListenerStream::new(TcpListener::bind(addr).await.unwrap()) | |
.take_until(tokio::signal::ctrl_c()) | |
.try_for_each(move |client_conn| { | |
let client = client.clone(); | |
async { | |
if let Ok(peer_addr) = client_conn.peer_addr() { | |
info!(%peer_addr, "new connection"); | |
} | |
let pods: Api<Pod> = Api::namespaced(client, "default"); | |
tokio::spawn(async move { | |
if let Err(e) = forward_connection(&pods, "etcd-0", 2379, client_conn).await { | |
error!( | |
error = e.as_ref() as &dyn std::error::Error, | |
"failed to forward connection" | |
); | |
} | |
}); | |
// keep the server running | |
Ok(()) | |
} | |
}); | |
warn!("Now run ETCDCTL_API=3 etcdctl --endpoints http://localhost:8080 watch --prefix \"\""); | |
info!("Then kill it, and you'll notice you won't see any connection closed or similar"); | |
info!("The socket should be in a zombie state:"); | |
warn!("lsof -i | grep \"pod_portf\""); | |
error!("pod_portf 464687 tiago 10u IPv4 3645183 0t0 TCP localhost:http-alt->localhost:56662 (CLOSE_WAIT)"); | |
if let Err(e) = server.await { | |
error!(error = &e as &dyn std::error::Error, "server error"); | |
} | |
info!("deleting the pod"); | |
pods.delete("etcd-0", &DeleteParams::default().grace_period(0)) | |
.await? | |
.map_left(|pdel| { | |
assert_eq!(pdel.name_any(), "etcd-0"); | |
}); | |
Ok(()) | |
} | |
async fn forward_connection( | |
pods: &Api<Pod>, | |
pod_name: &str, | |
port: u16, | |
mut client_conn: impl AsyncRead + AsyncWrite + Unpin, | |
) -> anyhow::Result<()> { | |
let mut forwarder = pods.portforward(pod_name, &[port]).await?; | |
let mut upstream_conn = forwarder | |
.take_stream(port) | |
.context("port not found in forwarder")?; | |
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?; | |
drop(upstream_conn); | |
forwarder.join().await?; | |
info!("connection closed"); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment