Last active
April 6, 2021 14:30
-
-
Save keepsimple1/d3ff6b511b95a967026d316c21ea8458 to your computer and use it in GitHub Desktop.
keepsimple1 quiche branch v0.6.0 tag v0.6.0-p10 diffs with baseline quiche 0.6.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/examples/http3-client.rs b/examples/http3-client.rs | |
index a93d67e..329c243 100644 | |
--- a/examples/http3-client.rs | |
+++ b/examples/http3-client.rs | |
@@ -264,6 +264,16 @@ fn main() { | |
info!("GOAWAY id={}", goaway_id); | |
}, | |
+ Ok((stream_id, quiche::h3::Event::StopSending { error_code })) => { | |
+ info!("StopSending received for stream {}, error_code {}", | |
+ stream_id, error_code); | |
+ }, | |
+ | |
+ Ok((stream_id, quiche::h3::Event::ResetStream { error_code, final_size })) => { | |
+ info!("ResetStream received for stream {}, error_code {}, final_size {}", | |
+ stream_id, error_code, final_size); | |
+ }, | |
+ | |
Err(quiche::h3::Error::Done) => { | |
break; | |
}, | |
diff --git a/examples/http3-server.rs b/examples/http3-server.rs | |
index 4c41cbb..ba80c6d 100644 | |
--- a/examples/http3-server.rs | |
+++ b/examples/http3-server.rs | |
@@ -361,6 +361,16 @@ fn main() { | |
Ok((_goaway_id, quiche::h3::Event::GoAway)) => (), | |
+ Ok((stream_id, quiche::h3::Event::StopSending { error_code })) => { | |
+ info!("StopSending received for stream {}, error_code {}", | |
+ stream_id, error_code); | |
+ }, | |
+ | |
+ Ok((stream_id, quiche::h3::Event::ResetStream { error_code, final_size })) => { | |
+ info!("ResetStream received for stream {}, error_code {}, final_size {}", | |
+ stream_id, error_code, final_size); | |
+ }, | |
+ | |
Err(quiche::h3::Error::Done) => { | |
break; | |
}, | |
diff --git a/src/h3/ffi.rs b/src/h3/ffi.rs | |
index 2d278c7..3783413 100644 | |
--- a/src/h3/ffi.rs | |
+++ b/src/h3/ffi.rs | |
@@ -115,6 +115,10 @@ pub extern fn quiche_h3_event_type(ev: &h3::Event) -> u32 { | |
h3::Event::Datagram { .. } => 3, | |
h3::Event::GoAway { .. } => 4, | |
+ | |
+ h3::Event::StopSending { .. } => 5, | |
+ | |
+ h3::Event::ResetStream { .. } => 6, | |
} | |
} | |
diff --git a/src/h3/mod.rs b/src/h3/mod.rs | |
index 6248688..bc74fe4 100644 | |
--- a/src/h3/mod.rs | |
+++ b/src/h3/mod.rs | |
@@ -166,6 +166,14 @@ | |
//! // Peer signalled it is going away, handle it. | |
//! }, | |
//! | |
+//! Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => { | |
+//! // Peer sent STOP_SENDING, handle it. | |
+//! }, | |
+//! | |
+//! Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => { | |
+//! // Peer sent RESET_STREAM, handle it. | |
+//! }, | |
+//! | |
//! Err(quiche::h3::Error::Done) => { | |
//! // Done reading. | |
//! break; | |
@@ -174,7 +182,7 @@ | |
//! Err(e) => { | |
//! // An error occurred, handle it. | |
//! break; | |
-//! }, | |
+//! } | |
//! } | |
//! } | |
//! # Ok::<(), quiche::h3::Error>(()) | |
@@ -219,6 +227,14 @@ | |
//! // Peer signalled it is going away, handle it. | |
//! }, | |
//! | |
+//! Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => { | |
+//! // Peer sent STOP_SENDING, handle it. | |
+//! }, | |
+//! | |
+//! Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => { | |
+//! // Peer sent RESET_STREAM, handle it. | |
+//! }, | |
+//! | |
//! Err(quiche::h3::Error::Done) => { | |
//! // Done reading. | |
//! break; | |
@@ -266,6 +282,7 @@ use std::collections::HashMap; | |
use std::collections::VecDeque; | |
use crate::octets; | |
+use crate::h3::stream::Type; | |
/// List of ALPN tokens of supported HTTP/3 versions. | |
/// | |
@@ -525,6 +542,20 @@ pub enum Event { | |
/// GOAWAY was received. | |
GoAway, | |
+ | |
+ /// STOP_SENDING was received. | |
+ StopSending { | |
+ /// Application Protocol Error Code | |
+ error_code: u64 | |
+ }, | |
+ | |
+ /// RESET_STREAM was received. | |
+ ResetStream { | |
+ /// Application Protocol Error Code | |
+ error_code: u64, | |
+ /// The final offset of data in this stream | |
+ final_size: u64, | |
+ } | |
} | |
struct ConnectionSettings { | |
@@ -1118,6 +1149,29 @@ impl Connection { | |
} | |
} | |
+ // Process STOP_SENDING, and trigger an event for streams of `Request` type | |
+ while let Some((stream_id, error_code)) = conn.poll_stop_sending() { | |
+ match self.streams.get(&stream_id) { | |
+ Some(stream) => match stream.ty() { | |
+ Some(Type::Request) => return Ok((stream_id, | |
+ Event::StopSending { error_code })), | |
+ _ => trace!("StopSending: stream {} is not Request type", stream_id), | |
+ } | |
+ None => trace!("StopSending: stream {} does not exists", stream_id), | |
+ } | |
+ } | |
+ | |
+ // Process RESET_STREAM, and trigger an event for streams of `Request` type | |
+ while let Some((stream_id, error_code, final_size)) = conn.poll_reset_stream() { | |
+ match self.streams.get(&stream_id) { | |
+ Some(stream) => match stream.ty() { | |
+ Some(Type::Request) => return Ok((stream_id, Event::ResetStream { error_code, final_size })), | |
+ _ => trace!("ResetStream: stream {} is not Request type", stream_id), | |
+ } | |
+ None => trace!("ResetStream: stream {} does not exists", stream_id), | |
+ } | |
+ } | |
+ | |
Err(Error::Done) | |
} | |
@@ -1281,7 +1335,7 @@ impl Connection { | |
fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> { | |
match self.open_uni_stream(conn, grease_value()) { | |
Ok(stream_id) => { | |
- trace!("{} open GREASE stream {}", conn.trace_id(), stream_id); | |
+ info!("{} open GREASE stream {}", conn.trace_id(), stream_id); | |
conn.stream_send(stream_id, b"GREASE is the word", false)?; | |
}, | |
@@ -2050,6 +2104,8 @@ mod tests { | |
use super::testing::*; | |
+ use crate::Shutdown; | |
+ | |
#[test] | |
/// Make sure that random GREASE values is within the specified limit. | |
fn grease_value_in_varint_limit() { | |
@@ -3159,6 +3215,53 @@ mod tests { | |
// Once the server gives flow control credits back, we can send the body. | |
assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0)); | |
} | |
+ | |
+ #[test] | |
+ /// Tests that `stream_shutdown` `read` will trigger StopSending event on the server, | |
+ /// and the server will send back RESET_STREAM, trigger ResetStream event on the client. | |
+ fn stop_sending_and_reset_stream() { | |
+ let mut s = Session::default().unwrap(); | |
+ s.handshake().unwrap(); | |
+ | |
+ // Send the request | |
+ let (stream_id, req) = s.send_request(true).unwrap(); | |
+ let ev_headers = Event::Headers { | |
+ list: req, | |
+ has_body: false, | |
+ }; | |
+ assert_eq!(s.poll_server(), Ok((stream_id, ev_headers))); | |
+ assert_eq!(s.poll_server(), Ok((stream_id, Event::Finished))); | |
+ | |
+ // Start the response | |
+ let resp = s.send_response(stream_id, false).unwrap(); | |
+ let body = s.send_body_server(stream_id, false).unwrap(); | |
+ | |
+ let mut recv_buf = vec![0; body.len()]; | |
+ let ev_headers = Event::Headers { | |
+ list: resp, | |
+ has_body: true, | |
+ }; | |
+ | |
+ assert_eq!(s.poll_client(), Ok((stream_id, ev_headers))); | |
+ assert_eq!(s.poll_client(), Ok((stream_id, Event::Data))); | |
+ assert_eq!(s.recv_body_client(stream_id, &mut recv_buf), Ok(body.len())); | |
+ | |
+ // The client shutdown will send STOP_SENDING to the server | |
+ let error_code = 12345; | |
+ s.pipe.client.stream_shutdown(stream_id, Shutdown::Read, error_code).unwrap(); | |
+ s.advance().ok(); | |
+ | |
+ // Verify StopSending event received on the server | |
+ let stop_sending = Event::StopSending { error_code }; | |
+ assert_eq!(s.poll_server(), Ok((stream_id, stop_sending))); | |
+ | |
+ // Server should respond with a RESET_STREAM | |
+ // Verify ResetStream event received on the client | |
+ let client_stream = s.pipe.client.streams.get(stream_id).unwrap(); | |
+ let final_size = client_stream.recv.max_off(); | |
+ let reset_stream = Event::ResetStream { error_code, final_size}; | |
+ assert_eq!(s.poll_client(), Ok((stream_id, reset_stream))); | |
+ } | |
} | |
mod ffi; | |
diff --git a/src/h3/stream.rs b/src/h3/stream.rs | |
index f2f8f0c..091cf66 100644 | |
--- a/src/h3/stream.rs | |
+++ b/src/h3/stream.rs | |
@@ -121,7 +121,7 @@ pub struct Stream { | |
/// The write offset in the state buffer, that is, how many bytes have | |
/// already been read from the transport for the current state. When | |
- /// it reaches `stream_len` the state can be completed. | |
+ /// it reaches `state_len` the state can be completed. | |
state_off: usize, | |
/// The type of the frame currently being parsed. | |
@@ -178,6 +178,10 @@ impl Stream { | |
self.state | |
} | |
+ pub fn ty(&self) -> Option<Type> { | |
+ self.ty | |
+ } | |
+ | |
/// Sets the stream's type and transitions to the next state. | |
pub fn set_ty(&mut self, ty: Type) -> Result<()> { | |
assert_eq!(self.state, State::StreamType); | |
diff --git a/src/lib.rs b/src/lib.rs | |
index 79c942b..474c44a 100644 | |
--- a/src/lib.rs | |
+++ b/src/lib.rs | |
@@ -413,6 +413,7 @@ impl std::convert::From<octets::BufferTooShortError> for Error { | |
/// This should be used when calling [`stream_shutdown()`]. | |
/// | |
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown | |
+#[derive(Debug, PartialEq)] | |
#[repr(C)] | |
pub enum Shutdown { | |
/// Stop receiving stream data. | |
@@ -2247,6 +2248,49 @@ impl Connection { | |
in_flight = true; | |
} | |
} | |
+ | |
+ // Create STOP_SENDING frames as needed. | |
+ for (stream_id, error_code) in self | |
+ .streams | |
+ .recv_aborted() | |
+ .map(|(&k, &v)| (k, v)) | |
+ .collect::<Vec<(u64, u64)>>() | |
+ { | |
+ let frame = frame::Frame::StopSending { stream_id, error_code }; | |
+ | |
+ if push_frame_to_pkt!(frames, frame, payload_len, left) { | |
+ self.streams.mark_recv_aborted(stream_id, false, error_code); | |
+ | |
+ ack_eliciting = true; | |
+ in_flight = true; | |
+ } | |
+ } | |
+ | |
+ // Create RESET_STREAM frames as needed | |
+ for (stream_id, error_code) in self | |
+ .streams | |
+ .will_reset() | |
+ .map(|(&k, &v)| (k, v)) | |
+ .collect::<Vec<(u64, u64)>>() | |
+ { | |
+ let stream = match self.streams.get(stream_id) { | |
+ Some(s) => s, | |
+ None => { | |
+ self.streams.mark_will_reset(stream_id, false, error_code); | |
+ continue; | |
+ }, | |
+ }; | |
+ | |
+ let final_size = stream.send.off_front(); | |
+ let frame = frame::Frame::ResetStream { stream_id, error_code, final_size }; | |
+ | |
+ if push_frame_to_pkt!(frames, frame, payload_len, left) { | |
+ self.streams.mark_will_reset(stream_id, false, error_code); | |
+ | |
+ ack_eliciting = true; | |
+ in_flight = true; | |
+ } | |
+ } | |
} | |
// Create CONNECTION_CLOSE frame. | |
@@ -2880,26 +2924,32 @@ impl Connection { | |
/// [`stream_recv()`]: struct.Connection.html#method.stream_recv | |
/// [`stream_send()`]: struct.Connection.html#method.stream_send | |
pub fn stream_shutdown( | |
- &mut self, stream_id: u64, direction: Shutdown, _err: u64, | |
+ &mut self, stream_id: u64, direction: Shutdown, err: u64, | |
) -> Result<()> { | |
+ trace!("stream_shutdown: stream {} direction {:?} err {}", stream_id, direction, err); | |
+ | |
// Get existing stream. | |
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?; | |
match direction { | |
- // TODO: send STOP_SENDING | |
Shutdown::Read => { | |
stream.recv.shutdown()?; | |
// Once shutdown, the stream is guaranteed to be non-readable. | |
self.streams.mark_readable(stream_id, false); | |
+ | |
+ // Mark for sending STOP_SENDING | |
+ self.streams.mark_recv_aborted(stream_id, true, err); | |
}, | |
- // TODO: send RESET_STREAM | |
Shutdown::Write => { | |
stream.send.shutdown()?; | |
// Once shutdown, the stream is guaranteed to be non-writable. | |
self.streams.mark_writable(stream_id, false); | |
+ | |
+ // Mark for sending RESET_STREAM | |
+ self.streams.mark_will_reset(stream_id, true, err); | |
}, | |
} | |
@@ -2934,6 +2984,17 @@ impl Connection { | |
stream.recv.is_fin() | |
} | |
+ /// Returns true if the stream is shutdown for its send direction | |
+ pub fn stream_is_shutdown(&self, stream_id: u64) -> bool { | |
+ let stream = match self.streams.get(stream_id) { | |
+ Some(v) => v, | |
+ | |
+ None => return true, | |
+ }; | |
+ | |
+ stream.send.is_shutdown() | |
+ } | |
+ | |
/// Initializes the stream's application data. | |
/// | |
/// This can be used by applications to store per-stream information without | |
@@ -3230,6 +3291,16 @@ impl Connection { | |
.is_none() | |
} | |
+ /// Returns the (stream_id, error_code) of a STOP_SENDING frame | |
+ pub fn poll_stop_sending(&mut self) -> Option<(u64, u64)> { | |
+ self.streams.poll_stop_sending() | |
+ } | |
+ | |
+ /// Returns the (stream_id, error_code, final_size) of a RESET_STREAM frame | |
+ pub fn poll_reset_stream(&mut self) -> Option<(u64, u64, u64)> { | |
+ self.streams.poll_reset_stream() | |
+ } | |
+ | |
/// Returns the amount of time until the next timeout event. | |
/// | |
/// Once the given duration has elapsed, the [`on_timeout()`] method should | |
@@ -3523,6 +3594,7 @@ impl Connection { | |
self.streams.should_update_max_streams_uni() || | |
self.streams.has_flushable() || | |
self.streams.has_almost_full() || | |
+ self.streams.has_stop_sending() || | |
self.streams.has_blocked()) | |
{ | |
return Ok(packet::EPOCH_APPLICATION); | |
@@ -3592,8 +3664,8 @@ impl Connection { | |
frame::Frame::ResetStream { | |
stream_id, | |
+ error_code, | |
final_size, | |
- .. | |
} => { | |
// Peer can't send on our unidirectional streams. | |
if !stream::is_bidi(stream_id) && | |
@@ -3602,6 +3674,8 @@ impl Connection { | |
return Err(Error::InvalidStreamState); | |
} | |
+ info!("{} stream {} received RESET_STREAM error_code {} final_size {}", self.trace_id, stream_id, error_code, final_size); | |
+ | |
// Get existing stream or create a new one, but if the stream | |
// has already been closed and collected, ignore the frame. | |
// | |
@@ -3615,7 +3689,10 @@ impl Connection { | |
let stream = match self.get_or_create_stream(stream_id, false) { | |
Ok(v) => v, | |
- Err(Error::Done) => return Ok(()), | |
+ Err(Error::Done) => { | |
+ info!("{} stream {} closed or collected, ignore RESET_STREAM", self.trace_id, stream_id); | |
+ return Ok(()); | |
+ }, | |
Err(e) => return Err(e), | |
}; | |
@@ -3625,15 +3702,44 @@ impl Connection { | |
if self.rx_data > self.max_rx_data { | |
return Err(Error::FlowControl); | |
} | |
+ | |
+ self.streams.mark_reset_stream(stream_id, error_code, final_size); | |
}, | |
- frame::Frame::StopSending { stream_id, .. } => { | |
+ frame::Frame::StopSending { stream_id, error_code } => { | |
// STOP_SENDING on a receive-only stream is a fatal error. | |
if !stream::is_local(stream_id, self.is_server) && | |
!stream::is_bidi(stream_id) | |
{ | |
+ error!("STOP_SENDING on a receive-only stream is a fatal error"); | |
return Err(Error::InvalidStreamState); | |
} | |
+ | |
+ // STOP_SENDING on a locally-initiated stream that | |
+ // has not yet been created is a fatal error | |
+ if stream::is_local(stream_id, self.is_server) && | |
+ self.streams.get(stream_id).is_none() | |
+ { | |
+ error!("STOP_SENDING on a non-existing locally-initiated stream"); | |
+ return Err(Error::InvalidStreamState); | |
+ } | |
+ | |
+ info!("{} stream {} received STOP_SENDING error_code {}", self.trace_id, stream_id, error_code); | |
+ | |
+ // what happens if this stream is in "Data sent" state? | |
+ match self.get_or_create_stream(stream_id, false) { | |
+ Ok(_) => {}, | |
+ | |
+ Err(Error::Done) => { | |
+ info!("{} stream {} closed or collected, ignore STOP_SENDING", self.trace_id, stream_id); | |
+ return Ok(()); | |
+ }, | |
+ | |
+ Err(e) => return Err(e), | |
+ } | |
+ | |
+ self.streams.mark_stop_sending(stream_id, error_code); | |
+ self.stream_shutdown(stream_id, Shutdown::Write, error_code)?; | |
}, | |
frame::Frame::Crypto { data } => { | |
@@ -7746,6 +7852,201 @@ mod tests { | |
let result2 = pipe.server.dgram_recv(&mut buf); | |
assert_eq!(result2, Err(Error::Done)); | |
} | |
+ | |
+ #[test] | |
+ /// Tests stream_shutdown read marks recv_aborted and sends out STOP_SENDING, | |
+ /// the server will shutdown its write. | |
+ fn stream_recv_aborted() { | |
+ let mut buf = [0; 65535]; | |
+ | |
+ let mut pipe = testing::Pipe::default().unwrap(); | |
+ | |
+ assert_eq!(pipe.handshake(&mut buf), Ok(())); | |
+ | |
+ // No streams recv_aborted. | |
+ let mut recv_aborted = pipe.client.streams.recv_aborted(); | |
+ assert_eq!(recv_aborted.next(), None); | |
+ | |
+ // Client sends some request | |
+ assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5)); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Server sends some response | |
+ assert_eq!( | |
+ pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false), | |
+ Ok(15) | |
+ ); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Client stream is readable | |
+ let mut readable = pipe.client.readable(); | |
+ assert_eq!(readable.next(), Some(4)); | |
+ | |
+ // Client drains stream. | |
+ let mut b = [0; 15]; | |
+ pipe.client.stream_recv(4, &mut b).unwrap(); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Server is writable | |
+ let mut writable = pipe.server.writable(); | |
+ assert_eq!(writable.next(), Some(4)); | |
+ | |
+ // Client shuts down Read, hence sends STOP_SENDING | |
+ let error_code: u64 = 12345; | |
+ pipe.client.stream_shutdown(4, Shutdown::Read, error_code).unwrap(); | |
+ let mut recv_aborted = pipe.client.streams.recv_aborted(); | |
+ assert_eq!(recv_aborted.next(), Some((&4, &error_code))); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Client stream is no longer readable | |
+ let mut readable = pipe.client.readable(); | |
+ assert_eq!(readable.next(), None); | |
+ | |
+ // Server receives STOP_SENDING, shuts down Write, no longer writable | |
+ let mut writable = pipe.server.writable(); | |
+ assert_eq!(writable.next(), None); | |
+ } | |
+ | |
+ #[test] | |
+ /// Tests stream_shutdown write marks will_reset and sends out RESET_STREAM, | |
+ /// the client will shutdown its read. | |
+ fn stream_will_reset() { | |
+ let mut buf = [0; 65535]; | |
+ | |
+ let mut pipe = testing::Pipe::default().unwrap(); | |
+ | |
+ assert_eq!(pipe.handshake(&mut buf), Ok(())); | |
+ | |
+ // No streams recv_aborted. | |
+ let mut recv_aborted = pipe.client.streams.recv_aborted(); | |
+ assert_eq!(recv_aborted.next(), None); | |
+ | |
+ // Client sends some request | |
+ assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5)); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Server sends some response | |
+ assert_eq!( | |
+ pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false), | |
+ Ok(15) | |
+ ); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Client stream is readable | |
+ let mut readable = pipe.client.readable(); | |
+ assert_eq!(readable.next(), Some(4)); | |
+ | |
+ // Client drains stream. | |
+ let mut b = [0; 15]; | |
+ pipe.client.stream_recv(4, &mut b).unwrap(); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Server is writable | |
+ let mut writable = pipe.server.writable(); | |
+ assert_eq!(writable.next(), Some(4)); | |
+ | |
+ // Server shuts down Write, hence sends RESET_STREAM | |
+ let error_code: u64 = 12345; | |
+ pipe.server.stream_shutdown(4, Shutdown::Write, error_code).unwrap(); | |
+ let mut will_reset = pipe.server.streams.will_reset(); | |
+ assert_eq!(will_reset.next(), Some((&4, &error_code))); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // Server no longer writable | |
+ let mut writable = pipe.server.writable(); | |
+ assert_eq!(writable.next(), None); | |
+ | |
+ // Client receives RESET_STREAM, is no longer readable | |
+ let mut readable = pipe.client.readable(); | |
+ assert_eq!(readable.next(), None); | |
+ } | |
+ | |
+ #[test] | |
+ /// Tests it's okay to send STOP_SENDING to create client-initiated bidirectional stream | |
+ fn stop_sending_frame_new_stream() { | |
+ let mut buf = [0; 65535]; | |
+ | |
+ let mut pipe = testing::Pipe::default().unwrap(); | |
+ | |
+ assert_eq!(pipe.handshake(&mut buf), Ok(())); | |
+ | |
+ let stream_id = 4; // a new stream | |
+ | |
+ let frames = [frame::Frame::StopSending { | |
+ stream_id, | |
+ error_code: 12345, | |
+ }]; | |
+ | |
+ let pkt_type = packet::Type::Short; | |
+ | |
+ // STOP_SENDING is okay | |
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(44)); | |
+ | |
+ // still able to send data to the server | |
+ assert_eq!(pipe.client.stream_send(stream_id, b"hello", true), Ok(5)); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ // server can read from the stream | |
+ let mut readable = pipe.server.readable(); | |
+ assert_eq!(readable.next(), Some(stream_id)); | |
+ | |
+ // server cannot write to the stream | |
+ let mut writable = pipe.server.writable(); | |
+ assert_eq!(writable.next(), None); | |
+ } | |
+ | |
+ #[test] | |
+ /// Tests STOP_SENDING is invalid for receive-only, non-locally-initiated stream | |
+ fn invalid_stop_sending_frame_recv_only() { | |
+ let mut buf = [0; 65535]; | |
+ | |
+ let mut pipe = testing::Pipe::default().unwrap(); | |
+ | |
+ assert_eq!(pipe.handshake(&mut buf), Ok(())); | |
+ | |
+ // Use stream_id 2 for client-initiated uni-directional (receive-only for server) | |
+ let stream_id = 2; | |
+ assert_eq!(pipe.client.stream_send(stream_id, b"hello", false), Ok(5)); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ | |
+ let frames = [frame::Frame::StopSending { | |
+ stream_id, | |
+ error_code: 12345, | |
+ }]; | |
+ | |
+ let pkt_type = packet::Type::Short; | |
+ | |
+ // invalid: STOP_SENDING on a receive-only stream is a fatal error | |
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), | |
+ Err(Error::InvalidStreamState)); | |
+ | |
+ // The stream still works | |
+ assert_eq!(pipe.client.stream_send(stream_id, b"world", true), Ok(5)); | |
+ assert_eq!(pipe.advance(&mut buf), Ok(())); | |
+ } | |
+ | |
+ #[test] | |
+ /// Tests STOP_SENDING is invalid for non-existing, locally-initiated stream | |
+ fn invalid_stop_sending_frame_non_existing_stream() { | |
+ let mut buf = [0; 65535]; | |
+ | |
+ let mut pipe = testing::Pipe::default().unwrap(); | |
+ | |
+ assert_eq!(pipe.handshake(&mut buf), Ok(())); | |
+ | |
+ // Use stream_id 3 for server-initiated stream, non-existing yet | |
+ let stream_id = 3; | |
+ let frames = [frame::Frame::StopSending { | |
+ stream_id, | |
+ error_code: 12345, | |
+ }]; | |
+ | |
+ let pkt_type = packet::Type::Short; | |
+ | |
+ // invalid: STOP_SENDING on a non-existing locally-initiated stream is a fatal error | |
+ assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), | |
+ Err(Error::InvalidStreamState)); | |
+ } | |
} | |
pub use crate::packet::Header; | |
diff --git a/src/stream.rs b/src/stream.rs | |
index c2fbf36..10a9e9b 100644 | |
--- a/src/stream.rs | |
+++ b/src/stream.rs | |
@@ -113,6 +113,22 @@ pub struct StreamMap { | |
/// of the map elements represents the offset of the stream at which the | |
/// blocking occurred. | |
blocked: HashMap<u64, u64>, | |
+ | |
+ /// Set of stream IDs corresponding to streams that have shut down recv and need | |
+ /// to send STOP_SENDING. The value of the map elements represents Application | |
+ /// Error Code. | |
+ recv_aborted: HashMap<u64, u64>, | |
+ | |
+ /// Set of stream IDs corresponding to streams that have shut down send or | |
+ /// received STOP_SENDING. In both cases, we want to send RESET_STREAM to the peer. | |
+ /// The value of the map elements represents the error code. | |
+ will_reset: HashMap<u64, u64>, | |
+ | |
+ /// Queue of (stream_id, error_code) corresponding to STOP_SENDING received. | |
+ stop_sending: VecDeque<(u64, u64)>, | |
+ | |
+ /// Queue of (stream_id, error_code, final_size) corresponding to RESET_STREAM received. | |
+ reset_stream: VecDeque<(u64, u64, u64)>, | |
} | |
impl StreamMap { | |
@@ -229,6 +245,7 @@ impl StreamMap { | |
}, | |
}; | |
+ info!("get_or_create: creating stream {}", id); | |
let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local); | |
v.insert(s) | |
}, | |
@@ -355,6 +372,38 @@ impl StreamMap { | |
} | |
} | |
+ pub fn mark_recv_aborted(&mut self, stream_id: u64, aborted: bool, error_code: u64) { | |
+ if aborted { | |
+ self.recv_aborted.insert(stream_id, error_code); | |
+ } else { | |
+ self.recv_aborted.remove(&stream_id); | |
+ } | |
+ } | |
+ | |
+ pub fn mark_will_reset(&mut self, stream_id: u64, reset: bool, error_code: u64) { | |
+ if reset { | |
+ self.will_reset.insert(stream_id, error_code); | |
+ } else { | |
+ self.will_reset.remove(&stream_id); | |
+ } | |
+ } | |
+ | |
+ pub fn mark_stop_sending(&mut self, stream_id: u64, error_code: u64) { | |
+ self.stop_sending.push_back((stream_id, error_code)); | |
+ } | |
+ | |
+ pub fn poll_stop_sending(&mut self) -> Option<(u64, u64)> { | |
+ self.stop_sending.pop_front() | |
+ } | |
+ | |
+ pub fn mark_reset_stream(&mut self, stream_id: u64, error_code: u64, final_size: u64) { | |
+ self.reset_stream.push_back((stream_id, error_code, final_size)); | |
+ } | |
+ | |
+ pub fn poll_reset_stream(&mut self) -> Option<(u64, u64, u64)> { | |
+ self.reset_stream.pop_front() | |
+ } | |
+ | |
/// Updates the peer's maximum bidirectional stream count limit. | |
pub fn update_peer_max_streams_bidi(&mut self, v: u64) { | |
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v); | |
@@ -421,6 +470,14 @@ impl StreamMap { | |
StreamIter::from(&self.almost_full) | |
} | |
+ pub fn recv_aborted(&self) -> hash_map::Iter<u64, u64> { | |
+ self.recv_aborted.iter() | |
+ } | |
+ | |
+ pub fn will_reset(&self) -> hash_map::Iter<u64, u64> { | |
+ self.will_reset.iter() | |
+ } | |
+ | |
/// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED. | |
pub fn blocked(&self) -> hash_map::Iter<u64, u64> { | |
self.blocked.iter() | |
@@ -442,6 +499,10 @@ impl StreamMap { | |
!self.blocked.is_empty() | |
} | |
+ pub fn has_stop_sending(&self) -> bool { | |
+ !self.recv_aborted.is_empty() | |
+ } | |
+ | |
/// Returns true if the max bidirectional streams count needs to be updated | |
/// by sending a MAX_STREAMS frame to the peer. | |
pub fn should_update_max_streams_bidi(&self) -> bool { | |
@@ -922,6 +983,7 @@ impl SendBuf { | |
if self.shutdown { | |
// Since we won't write any more data anyway, pretend that we sent | |
// all data that was passed in. | |
+ error!("Pretending sending {} bytes even the stream is shutdown.", data.len()); | |
return Ok(data.len()); | |
} | |
@@ -1100,6 +1162,11 @@ impl SendBuf { | |
false | |
} | |
+ /// Returns true if the SendBuf is shutdown | |
+ pub fn is_shutdown(&self) -> bool { | |
+ self.shutdown | |
+ } | |
+ | |
/// Returns true if the send-side of the stream is complete. | |
/// | |
/// This happens when the stream's send final size is known, and the peer | |
diff --git a/tools/apps/src/lib.rs b/tools/apps/src/lib.rs | |
index 3f72fc9..52fd10e 100644 | |
--- a/tools/apps/src/lib.rs | |
+++ b/tools/apps/src/lib.rs | |
@@ -1255,6 +1255,23 @@ impl HttpConn for Http3Conn { | |
); | |
}, | |
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => { | |
+ info!( | |
+ "Received STOP_SENDING stream_id={} error_code={}", | |
+ stream_id, | |
+ error_code | |
+ ); | |
+ }, | |
+ | |
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => { | |
+ info!( | |
+ "Received RESET_STREAM stream_id={} error_code={} final_size={}", | |
+ stream_id, | |
+ error_code, | |
+ final_size | |
+ ); | |
+ }, | |
+ | |
Err(quiche::h3::Error::Done) => { | |
break; | |
}, | |
@@ -1405,6 +1422,23 @@ impl HttpConn for Http3Conn { | |
.send_goaway(conn, self.largest_processed_request)?; | |
}, | |
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => { | |
+ info!( | |
+ "Received STOP_SENDING stream_id={} error_code={}", | |
+ stream_id, | |
+ error_code | |
+ ); | |
+ }, | |
+ | |
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => { | |
+ info!( | |
+ "Received RESET_STREAM stream_id={} error_code={} final_size={}", | |
+ stream_id, | |
+ error_code, | |
+ final_size | |
+ ); | |
+ }, | |
+ | |
Err(quiche::h3::Error::Done) => { | |
break; | |
}, | |
diff --git a/tools/http3_test/src/runner.rs b/tools/http3_test/src/runner.rs | |
index 7befde1..6a5de8a 100644 | |
--- a/tools/http3_test/src/runner.rs | |
+++ b/tools/http3_test/src/runner.rs | |
@@ -277,6 +277,20 @@ pub fn run( | |
Ok((_goaway_id, quiche::h3::Event::GoAway)) => (), | |
+ Ok((stream_id, quiche::h3::Event::StopSending {error_code})) => { | |
+ info!( | |
+ "StopSending received for stream {} error_code {}", | |
+ stream_id, error_code | |
+ ); | |
+ }, | |
+ | |
+ Ok((stream_id, quiche::h3::Event::ResetStream {error_code, final_size})) => { | |
+ info!( | |
+ "ResetStream received from stream {} error_code {} final_size {}", | |
+ stream_id, error_code, final_size | |
+ ); | |
+ }, | |
+ | |
Err(quiche::h3::Error::Done) => { | |
break; | |
}, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment