Created
March 27, 2022 15:38
-
-
Save MiSawa/4e4086bd649f4639f390b93ca214f7f0 to your computer and use it in GitHub Desktop.
Trying something for `--stream`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::sync::mpsc::{sync_channel, SyncSender}; | |
use anyhow::{anyhow, Result}; | |
#[derive(Clone, Debug)] | |
enum Index { | |
Array(usize), | |
Map(String), | |
} | |
type Path = Vec<Index>; | |
#[derive(Debug)] | |
enum PrimitiveValue { | |
Null, | |
Boolean(bool), | |
Number(f64), | |
String(String), | |
} | |
struct Value { | |
path: Path, | |
value: Option<PrimitiveValue>, | |
} | |
impl Value { | |
fn print(&self) { | |
print!("["); | |
print!("["); | |
for (i, v) in self.path.iter().enumerate() { | |
if i != 0 { | |
print!(","); | |
} | |
match v { | |
Index::Array(i) => print!("{i}"), | |
Index::Map(s) => print!("{s:?}"), | |
} | |
} | |
print!("]"); | |
if let Some(value) = &self.value { | |
print!(","); | |
match value { | |
PrimitiveValue::Null => print!("null"), | |
PrimitiveValue::Boolean(v) => print!("{v}"), | |
PrimitiveValue::Number(v) => print!("{v}"), | |
PrimitiveValue::String(v) => print!("{v:?}"), | |
} | |
} | |
print!("]"); | |
println!(""); | |
} | |
} | |
struct StreamState<'a> { | |
sender: SyncSender<Value>, | |
path: &'a mut Path, | |
} | |
impl<'a> StreamState<'a> { | |
fn emit_value(&mut self, value: PrimitiveValue) { | |
self.sender | |
.send(Value { | |
path: self.path.clone(), | |
value: Some(value), | |
}) | |
.ok(); // Discarding err since this indicates that the recever has already been dropped so they should already know what to do. | |
} | |
fn emit_close(&self) { | |
self.sender | |
.send(Value { | |
path: self.path.clone(), | |
value: None, | |
}) | |
.ok(); // Discarding err since this indicates that the recever has already been dropped so they should already know what to do. | |
} | |
} | |
impl<'de, 'a> serde::de::Visitor<'de> for &mut StreamState<'a> { | |
type Value = (); | |
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { | |
write!( | |
formatter, | |
"null, boolean, number, string, array, or map keyed with string" | |
) | |
} | |
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::Boolean(v)); | |
Ok(()) | |
} | |
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::Number(v as f64)); | |
Ok(()) | |
} | |
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::Number(v as f64)); | |
Ok(()) | |
} | |
fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::Number(v as f64)); | |
Ok(()) | |
} | |
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.visit_string(v.into()) | |
} | |
fn visit_string<E>(self, v: String) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::String(v)); | |
Ok(()) | |
} | |
fn visit_none<E>(self) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.emit_value(PrimitiveValue::Null); | |
Ok(()) | |
} | |
fn visit_unit<E>(self) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
self.visit_none() | |
} | |
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> | |
where | |
A: serde::de::SeqAccess<'de>, | |
{ | |
let mut i = 0; | |
self.path.push(Index::Array(i)); | |
while let Some(_) = seq.next_element_seed(&mut *self)? { | |
self.path.pop(); | |
i += 1; | |
self.path.push(Index::Array(i)); | |
} | |
self.path.pop(); | |
i -= 1; | |
self.path.push(Index::Array(i)); | |
self.emit_close(); | |
self.path.pop(); | |
Ok(()) | |
} | |
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> | |
where | |
A: serde::de::MapAccess<'de>, | |
{ | |
struct Str; | |
impl<'de> serde::de::DeserializeSeed<'de> for Str { | |
type Value = String; | |
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> | |
where | |
D: serde::Deserializer<'de>, | |
{ | |
struct V; | |
impl<'de> serde::de::Visitor<'de> for V { | |
type Value = String; | |
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { | |
write!(formatter, "string as the key of a map") | |
} | |
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
Ok(v.into()) | |
} | |
fn visit_string<E>(self, v: String) -> Result<Self::Value, E> | |
where | |
E: serde::de::Error, | |
{ | |
Ok(v) | |
} | |
} | |
deserializer.deserialize_any(V) | |
} | |
} | |
self.path.push(Index::Map("".into())); | |
while let Some(key) = map.next_key_seed(Str)? { | |
self.path.pop(); | |
self.path.push(Index::Map(key)); | |
map.next_value_seed(&mut *self)?; | |
} | |
self.emit_close(); | |
self.path.pop(); | |
Ok(()) | |
} | |
} | |
impl<'de, 'a> serde::de::DeserializeSeed<'de> for &mut StreamState<'a> { | |
type Value = (); | |
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error> | |
where | |
D: serde::Deserializer<'de>, | |
{ | |
deserializer.deserialize_any(self)?; | |
Ok(()) | |
} | |
} | |
struct Stream; | |
impl<'de> serde::Deserialize<'de> for Stream { | |
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> | |
where | |
D: serde::Deserializer<'de>, | |
{ | |
let mut path = vec![]; | |
// Actually this sync_channel(1) call should be done only once, sender should be | |
// cloned, and recv has to be passed to the main thread. | |
let (sender, _recv) = sync_channel(8); | |
let mut visitor = StreamState { | |
sender, | |
path: &mut path, | |
}; | |
deserializer.deserialize_any(&mut visitor)?; | |
Ok(Self) | |
} | |
} | |
fn main() -> Result<()> { | |
let (sender, recv) = sync_channel(1); | |
let handler = std::thread::spawn(move || { | |
for de in serde_yaml::Deserializer::from_reader(std::io::stdin()).into_iter() { | |
use serde::de::DeserializeSeed; | |
let mut path = vec![]; | |
let mut state = StreamState { | |
sender: sender.clone(), | |
path: &mut path, | |
}; | |
state.deserialize(de).expect("Deserialization failed"); | |
} | |
}); | |
while let Ok(v) = recv.recv() { | |
v.print() | |
} | |
handler | |
.join() | |
.map_err(|e| anyhow!("Failed to join {e:?}"))?; | |
// for v in serde_json::de::Deserializer::from_reader(stdin).into_iter::<Stream>() { | |
// if let Err(e) = v { | |
// eprintln!("{:?}", e); | |
// } | |
// } | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment