Skip to content

Instantly share code, notes, and snippets.

@tiagolobocastro
Last active August 17, 2022 19:09
Show Gist options
  • Save tiagolobocastro/4890040c9fc1f5bb526e5de5a6d0cdea to your computer and use it in GitHub Desktop.
Save tiagolobocastro/4890040c9fc1f5bb526e5de5a6d0cdea to your computer and use it in GitHub Desktop.
half-closed connection
use anyhow::Context;
// Example to listen on port 8080 locally, forwarding to port 80 in the example pod.
// Similar to `kubectl port-forward pod/example 8080:80`.
use futures::{StreamExt, TryStreamExt};
use std::net::SocketAddr;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
};
use tokio_stream::wrappers::TcpListenerStream;
use tracing::*;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, DeleteParams, PostParams},
runtime::wait::{await_condition, conditions::is_pod_running},
Client, ResourceExt,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"labels": {
"app": "etcd"
},
"name": "etcd-0"
},
"spec": {
"containers": [
{
"command": [
"/usr/local/bin/etcd",
"--listen-client-urls",
"http://0.0.0.0:2379",
"--advertise-client-urls",
"http://0.0.0.0:2379"
],
"image": "quay.io/coreos/etcd:latest",
"name": "etcd-0",
"ports": [
{
"containerPort": 2379,
"name": "client",
"protocol": "TCP"
},
{
"containerPort": 2380,
"name": "peer",
"protocol": "TCP"
}
]
}
]
}
}))?;
let pods: Api<Pod> = Api::default_namespaced(client.clone());
let _ = pods
.delete("etcd-0", &DeleteParams::default().grace_period(0))
.await
.ok();
// Stop on error including a pod already exists or is still being deleted.
info!("creating etcd-0 pod");
pods.create(&PostParams::default(), &p).await?;
// Wait until the pod is running, otherwise we get 500 error.
info!("waiting for etcd-0 pod to start");
let running = await_condition(pods.clone(), "etcd-0", is_pod_running());
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), running).await?;
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let pod_port = 80;
info!(local_addr = %addr, pod_port, "forwarding traffic to the pod");
info!("use Ctrl-C to stop the server and delete the pod");
let server = TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
.take_until(tokio::signal::ctrl_c())
.try_for_each(move |client_conn| {
let client = client.clone();
async {
if let Ok(peer_addr) = client_conn.peer_addr() {
info!(%peer_addr, "new connection");
}
let pods: Api<Pod> = Api::namespaced(client, "default");
tokio::spawn(async move {
if let Err(e) = forward_connection(&pods, "etcd-0", 2379, client_conn).await {
error!(
error = e.as_ref() as &dyn std::error::Error,
"failed to forward connection"
);
}
});
// keep the server running
Ok(())
}
});
warn!("Now run ETCDCTL_API=3 etcdctl --endpoints http://localhost:8080 watch --prefix \"\"");
info!("Then kill it, and you'll notice you won't see any connection closed or similar");
info!("The socket should be in a zombie state:");
warn!("lsof -i | grep \"pod_portf\"");
error!("pod_portf 464687 tiago 10u IPv4 3645183 0t0 TCP localhost:http-alt->localhost:56662 (CLOSE_WAIT)");
if let Err(e) = server.await {
error!(error = &e as &dyn std::error::Error, "server error");
}
info!("deleting the pod");
pods.delete("etcd-0", &DeleteParams::default().grace_period(0))
.await?
.map_left(|pdel| {
assert_eq!(pdel.name_any(), "etcd-0");
});
Ok(())
}
async fn forward_connection(
pods: &Api<Pod>,
pod_name: &str,
port: u16,
mut client_conn: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let mut forwarder = pods.portforward(pod_name, &[port]).await?;
let mut upstream_conn = forwarder
.take_stream(port)
.context("port not found in forwarder")?;
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?;
drop(upstream_conn);
forwarder.join().await?;
info!("connection closed");
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment