Last active
September 1, 2023 16:34
-
-
Save crakjie/2d462eaf9e185f1e168c1192e171d886 to your computer and use it in GitHub Desktop.
arrow rust
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "ckfunc" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
arrow = { version = "45.0.0", features = ["ipc_compression"] } | |
intmap = "2.0.0" | |
itertools = "0.11.0" | |
[profile.dev] | |
opt-level=3 | |
debug=2 | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::io::{StdinLock, StdoutLock}; | |
use std::io; | |
use std::io::prelude::*; | |
use arrow::error::ArrowError; | |
use arrow::ipc::{self}; | |
use arrow::record_batch::RecordBatch; | |
use ipc::reader::StreamReader; | |
use ipc::writer::*; | |
use arrow::array::*; | |
use arrow::datatypes::*; | |
use alloc::sync::Arc; | |
use itertools::izip; | |
extern crate alloc; | |
fn main() { | |
read_arrow_init(); | |
} | |
fn read_arrow_init() { | |
let stdin = io::stdin(); | |
let stdin =&mut stdin.lock(); | |
let out = std::io::stdout(); | |
let stdout = &mut out.lock(); | |
//this should be able to process multiple input pipe thoghether | |
while stdin.fill_buf().unwrap().len() > 0 { | |
//file.write_all(format!("buffer contains {}\n",stdin.fill_buf().unwrap().len()).as_bytes()).unwrap(); | |
match read_arrow( stdin, stdout) { | |
Err(_e) => { | |
break; | |
}, | |
Ok(_) => (), | |
} | |
let _ = stdout.flush(); | |
} | |
//file.write_all(format!("end read arrow {}\n",stdin.fill_buf().unwrap().len()).as_bytes()).unwrap(); | |
} | |
fn read_arrow(inbuffer : &mut StdinLock<'static>, out : &mut StdoutLock<'static>) -> Result<(), ArrowError> { | |
//stdin is already buffered | |
let mut reader = StreamReader::try_new_unbuffered( inbuffer,None).unwrap(); | |
let out_schema: Schema = Schema::new(vec![ | |
reader.schema().field(3).clone().with_name("result") | |
]); | |
let mut writer = StreamWriter::try_new( | |
out , | |
&out_schema | |
)?; | |
while let Some(Ok(mut batch)) = reader.next() { | |
writer.write(&to_adjusta_hash( &mut batch))? | |
} | |
//writer.finish() seems to not works with clickhouse execution_pool, maybe because it use MetadataVersion::V3 or lower | |
Ok(()) | |
} | |
fn to_adjusta_hash( | |
batch : &mut RecordBatch | |
) -> RecordBatch { | |
let from_label_it = &mut batch.column_by_name("fromLabel").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter(); | |
let from_value_it = &mut batch.column_by_name("fromValue").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter(); | |
let to_label_it = &mut batch.column_by_name("toLabel").unwrap().as_any().downcast_ref::<ListArray>().unwrap().iter(); | |
let to_value_col = batch.column_by_name("toValue").unwrap().as_any().downcast_ref::<ListArray>().unwrap(); | |
let to_value_col_it = &mut to_value_col.iter(); | |
// let values_builder = Float64Builder::with_capacity(to_value_col.len()*250); | |
// let mut builder = ListBuilder::new(values_builder); | |
for (from_label_array, from_value_array, to_label_array,to_value_array ) in izip!(from_label_it, from_value_it, to_label_it, to_value_col_it) { | |
let from_label_array_col = from_label_array.unwrap(); | |
let from_value_array_col = from_value_array.unwrap(); | |
let to_label_array_col = to_label_array.unwrap(); | |
let to_value_array_col = to_value_array.unwrap(); | |
let truc = &mut to_value_array_col.as_primitive::<Float64Type>(); | |
let from_label = from_label_array_col.as_any().downcast_ref::<UInt64Array>().unwrap(); | |
let from_value = from_value_array_col.as_any().downcast_ref::<Float64Array>().unwrap().values(); | |
let to_label = to_label_array_col.as_any().downcast_ref::<UInt64Array>().unwrap(); | |
let to_value = truc; | |
let to_value_values = to_value.values(); | |
let index: HashMap<u64, usize> = from_label.iter().enumerate().map(|x: (usize, Option<u64>)|return (x.1.to_owned().unwrap(), x.0)).collect(); | |
// for (i, _) in to_label.iter().enumerate() { | |
// match index.get(&to_label.value(i)){ | |
// Some(search) => { | |
// builder.values().append_value(from_value[search.to_owned().to_owned()].to_owned()); | |
// } | |
// _ => { | |
// builder.values().append_value(to_value[i].to_owned()); | |
// } | |
// }; | |
// } | |
let mut builder = to_value.into_builder().unwrap(); | |
builder | |
.values_slice_mut() | |
.iter_mut().enumerate() | |
.for_each(|(i, v)| { | |
match index.get(&to_label.value(i)){ | |
Some(search) => { | |
*v = from_value[search.to_owned().to_owned()].to_owned(); | |
} | |
_ => { | |
*v = to_value_values[i].to_owned(); //should be useless | |
} | |
}; | |
}); | |
// builder.append(true) | |
} | |
let out_schema: Schema = Schema::new(vec![ | |
batch.schema().field(3).clone().with_name("result") | |
]); | |
let out_batch: RecordBatch = RecordBatch::try_new(Arc::new(out_schema), vec!(Arc::new(to_value_col))).unwrap(); | |
out_batch | |
} | |
fn noop(batch : RecordBatch | |
) -> RecordBatch { | |
// batch.schema().field_with_name("fromValue").unwrap().with_name("result"); | |
batch.project(&[1]).unwrap() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment