Created
July 16, 2022 16:31
-
-
Save Akanoa/b53f4f042d24607f00f1c36b1ec01585 to your computer and use it in GitHub Desktop.
Playground article télémétrie
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use opentelemetry::global::shutdown_tracer_provider; | |
use std::future::Future; | |
use std::pin::Pin; | |
use opentelemetry::runtime::Tokio; | |
use rand::Rng; | |
use tokio::io::{AsyncReadExt, AsyncWriteExt}; | |
use tokio::{io, join, select, spawn}; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::sync::mpsc::{channel, Receiver, Sender}; | |
use tracing::{debug, error, info, info_span, span, trace_span, warn, Instrument, Span}; | |
use tracing_subscriber::layer::SubscriberExt; | |
use tracing_subscriber::util::SubscriberInitExt; | |
use futures::future; | |
#[tokio::main] | |
#[tracing::instrument] | |
async fn main() { | |
// Create a Jaeger Tracer | |
let tracer_jaeger = opentelemetry_jaeger::new_pipeline() | |
.with_service_name("plus ou moins5") | |
.install_batch(Tokio) | |
.unwrap(); | |
// Create a tracing layer with the configured tracer | |
let opentelemetry_layer = tracing_opentelemetry::layer() | |
.with_tracer(tracer_jaeger) | |
.with_threads(true); | |
// Register Layers | |
tracing_subscriber::registry() | |
.with(opentelemetry_layer) | |
.init(); | |
let (_tx, rx) = channel::<()>(1); | |
let (_tx1, rx1) = channel::<()>(1); | |
let future_randomizer = spawn(randomizer(rx)); | |
let future_game_server = spawn(game_server(rx1)); | |
let (_randomizer_result, _game_server_result) = join!(future_randomizer, future_game_server); | |
// Force all Span to be sent | |
shutdown_tracer_provider(); | |
} | |
#[tracing::instrument] | |
async fn compute_random_number() -> u8 { | |
debug!("Start asking number"); | |
//sleep(Duration::from_millis(100)); | |
debug!("End asking number"); | |
rand::thread_rng().gen_range(0..100) | |
} | |
#[tracing::instrument] | |
async fn handle_randomizer_request(mut stream: TcpStream) -> io::Result<()> { | |
let number = compute_random_number().await; | |
info!(number = number, "Number chosen"); | |
stream.write_u8(number).await.map_err(|err| { | |
warn!(error=%err, "Unable to write to stream"); | |
err | |
})?; | |
Ok(()) | |
} | |
enum Response { | |
More, | |
Less, | |
Win(u8), | |
} | |
fn handle_player( | |
tx: Sender<u8>, | |
mut rx: Receiver<Response>, | |
) -> Pin<Box<dyn Fn(TcpStream) -> dyn Future<Output = io::Result<()>>>> { | |
let inner2 = |mut stream: TcpStream| async move { | |
rx.recv().await; | |
stream.write_u8(1).await; | |
Ok::<(), io::Error>(()) | |
}; | |
async fn inner(mut stream: TcpStream) -> io::Result<()> { | |
let a = rx.recv().await; | |
loop { | |
let mut data = [0_u8; 32]; | |
let size = stream.read(&mut data).await?; | |
println!("data receive"); | |
if size == 0 { | |
info!("Peer has left"); | |
break; | |
} | |
stream.write_all(b"PONG").await.map_err(|err| { | |
warn!(error=%err, "Unable to write to stream"); | |
err | |
})?; | |
info!("PONG sent") | |
} | |
Ok(()) | |
} | |
Box::pin(inner2) | |
} | |
#[tracing::instrument(skip(closure))] | |
async fn tcp_server<F, O>( | |
port: u16, | |
closure: F, | |
mut signal: Receiver<()>, | |
name: &str, | |
) -> io::Result<()> | |
where | |
O: Future<Output = io::Result<()>>, | |
F: Fn(TcpStream) -> O, | |
{ | |
println!("start tcp server {}", name); | |
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)) | |
.await | |
.map_err(|error| { | |
error!(port, %error, "Unable to start {} service", name); | |
error | |
})?; | |
loop { | |
select! { | |
_ = signal.recv() => break, | |
result_accept = listener.accept() => { | |
match result_accept { | |
Ok((stream, peer)) => { | |
info!(peer=%peer, "New peer connected"); | |
let handle_response = closure(stream).await; | |
if let Err(error) = handle_response { | |
error!(%error, "An error occurred while handling {} request", name); | |
continue; | |
} | |
} | |
Err(error) => { | |
warn!(error=%error, "Unable to accept connection"); | |
continue; | |
} | |
}; | |
} | |
}; | |
} | |
info!("End of {} service", name); | |
Ok(()) | |
} | |
#[tracing::instrument] | |
async fn randomizer(signal: Receiver<()>) -> io::Result<()> { | |
println!("Start randomizer"); | |
let port: u16 = 3004; | |
tcp_server(port, handle_randomizer_request, signal, "randomizer").await | |
} | |
#[tracing::instrument] | |
async fn call_number() -> io::Result<u8> { | |
let port_randomizer: u16 = 3004; | |
let mut connection_randomizer = TcpStream::connect(format!("127.0.0.1:{}", port_randomizer)) | |
.await | |
.map_err(|err| { | |
error!(%err, "Unable to connect to randomizer service"); | |
err | |
})?; | |
connection_randomizer.write_all(b"").await.map_err(|err| { | |
warn!(%err, "Unable to write to randomizer stream"); | |
err | |
})?; | |
let chosen_number = connection_randomizer.read_u8().await.map_err(|err| { | |
warn!(%err, "Unable to read number choser"); | |
err | |
})?; | |
info!(number = chosen_number, "New number chosen"); | |
Ok(chosen_number) | |
} | |
#[tracing::instrument] | |
async fn game_server(signal: Receiver<()>) -> io::Result<()> { | |
info_span!("Init game server").in_scope(|| info!("Game server is starting")); | |
let port: u16 = 3003; | |
let _number = trace_span!("asking for new random number") | |
.in_scope(|| { | |
info!("test"); | |
//sleep(Duration::from_millis(666)); | |
call_number.in_current_span().inner()() | |
}) | |
.await?; | |
let (tx, rx) = channel(1); | |
let (tx1, rx1) = channel(1); | |
tcp_server(port, handle_player(tx, rx1), signal, "game_server").await | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment