Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save laysakura/07987c01552a5b5e01a006e98e1925f6 to your computer and use it in GitHub Desktop.

Select an option

Save laysakura/07987c01552a5b5e01a006e98e1925f6 to your computer and use it in GitHub Desktop.
{
"$schema": "https://aka.ms/codetour-schema",
"title": "DirectRunnerによるGBK実行方法",
"steps": [
{
"file": "tests/primitives_test.rs",
"description": "こいつをエントリポイントに",
"line": 74,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n"
},
{
"file": "tests/primitives_test.rs",
"description": "これでパイプライングラフにGBKのオペレーターが追加されるのは当然なのだが、protoになったGBKオペレーターを、DirectRunnerはどのように実行しているのか。\n嫌な仮説としては、GBK実行をWorker側の責務にしていること。",
"line": 82,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n"
},
{
"file": "src/runners/direct_runner.rs",
"description": "GBK含むいずれのオペレーターも、Runnerが実行すべきかを層別せずに、worker側のBundleProcessorに送り込んでいるように見える",
"line": 82,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::sync::Arc;\n\nuse async_trait::async_trait;\n\nuse crate::proto::{fn_execution_v1, pipeline_v1};\nuse crate::worker::sdk_worker::BundleProcessor;\n\nuse crate::runners::runner::RunnerI;\n\npub struct DirectRunner;\n\n#[async_trait]\nimpl RunnerI for DirectRunner {\n fn new() -> Self {\n Self\n }\n\n async fn run_pipeline(&self, pipeline: Arc<std::sync::Mutex<pipeline_v1::Pipeline>>) {\n let descriptor: fn_execution_v1::ProcessBundleDescriptor;\n {\n let p = pipeline.lock().unwrap();\n\n // TODO: use this to define the descriptor instead of p\n // let proto = rewrite_side_inputs(pipeline, state_cache_ref);\n\n // TODO: review cloning\n descriptor = fn_execution_v1::ProcessBundleDescriptor {\n id: \"\".to_string(),\n transforms: p\n .components\n .as_ref()\n .expect(\"Missing components\")\n .transforms\n .clone(),\n pcollections: p\n .components\n .as_ref()\n .expect(\"Missing PCollections\")\n .pcollections\n .clone(),\n windowing_strategies: p\n .components\n .as_ref()\n .expect(\"Missing windowing strategies\")\n .windowing_strategies\n .clone(),\n coders: p\n .components\n .as_ref()\n .expect(\"Missing coders\")\n .coders\n .clone(),\n environments: p\n .components\n .as_ref()\n .expect(\"Missing environments\")\n .environments\n .clone(),\n state_api_service_descriptor: None,\n timer_api_service_descriptor: None,\n };\n }\n\n let processor = BundleProcessor::new(Arc::new(descriptor), &[crate::runners::IMPULSE_URN]);\n\n processor.process(\"bundle_id\".to_string()).await;\n }\n}\n"
},
{
"file": "src/worker/operators.rs",
"description": "mod worker配下のこいつがGBK処理しているように見えるなぁ",
"line": 489,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::any::Any;\nuse std::collections::HashMap;\nuse std::fmt;\nuse std::rc::Rc;\nuse std::str::from_utf8;\nuse std::sync::{Arc, Mutex};\n\n//use std::borrow::{Borrow, BorrowMut};\n\nuse once_cell::sync::Lazy;\nuse serde_json;\nuse strum::EnumDiscriminants;\n\nuse crate::elem_types::ElemType;\nuse crate::internals::serialize;\nuse crate::internals::urns;\nuse crate::proto::{fn_execution_v1, pipeline_v1};\n\nuse crate::worker::sdk_worker::BundleProcessor;\nuse crate::worker::test_utils::RECORDING_OPERATOR_LOGS;\n\ntype OperatorMap = HashMap<&'static str, OperatorDiscriminants>;\n\nstatic OPERATORS_BY_URN: Lazy<Mutex<OperatorMap>> = Lazy::new(|| {\n // TODO: these will have to be parameterized depending on things such as the runner used\n let m: OperatorMap = HashMap::from([\n // Test operators\n (urns::CREATE_URN, OperatorDiscriminants::Create),\n (urns::RECORDING_URN, OperatorDiscriminants::Recording),\n (urns::PARTITION_URN, OperatorDiscriminants::_Partitioning),\n (urns::IMPULSE_URN, OperatorDiscriminants::Impulse),\n (urns::GROUP_BY_KEY_URN, OperatorDiscriminants::GroupByKey),\n // Production operators\n (urns::DATA_INPUT_URN, OperatorDiscriminants::_DataSource),\n (urns::PAR_DO_URN, OperatorDiscriminants::ParDo),\n (urns::FLATTEN_URN, OperatorDiscriminants::Flatten),\n ]);\n\n Mutex::new(m)\n});\n\npub(crate) trait OperatorI {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self\n where\n Self: Sized;\n\n fn start_bundle(&self);\n\n fn process(&self, value: DynamicWindowedValue);\n\n fn finish_bundle(&self) {\n todo!()\n }\n}\n\n#[derive(Clone, fmt::Debug, EnumDiscriminants)]\npub(crate) enum Operator {\n // Test operators\n Create(CreateOperator),\n Recording(RecordingOperator),\n _Partitioning,\n GroupByKey(GroupByKeyWithinBundleOperator),\n Impulse(ImpulsePerBundleOperator),\n\n // Production operators\n _DataSource,\n ParDo(ParDoOperator),\n Flatten(FlattenOperator),\n}\n\nimpl OperatorI for Operator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n match operator_discriminant {\n OperatorDiscriminants::Create => Operator::Create(CreateOperator::new(\n transform_id,\n transform,\n context,\n operator_discriminant,\n )),\n _ => todo!(),\n }\n }\n\n fn start_bundle(&self) {\n match self {\n Operator::Create(create_op) => create_op.start_bundle(),\n Operator::Recording(recording_op) => recording_op.start_bundle(),\n Operator::Impulse(impulse_op) => impulse_op.start_bundle(),\n Operator::GroupByKey(gbk_op) => gbk_op.start_bundle(),\n Operator::ParDo(pardo_op) => pardo_op.start_bundle(),\n Operator::Flatten(flatten_op) => flatten_op.start_bundle(),\n _ => todo!(),\n };\n }\n\n fn process(&self, value: DynamicWindowedValue) {\n match self {\n Operator::Create(create_op) => {\n create_op.process(value);\n }\n Operator::Recording(recording_op) => {\n recording_op.process(value);\n }\n Operator::Impulse(impulse_op) => {\n impulse_op.process(value);\n }\n Operator::GroupByKey(gbk_op) => {\n gbk_op.process(value);\n }\n Operator::ParDo(pardo_op) => {\n pardo_op.process(value);\n }\n Operator::Flatten(flatten_op) => {\n flatten_op.process(value);\n }\n _ => todo!(),\n };\n }\n\n fn finish_bundle(&self) {\n match self {\n Operator::Create(create_op) => create_op.finish_bundle(),\n Operator::Recording(recording_op) => recording_op.finish_bundle(),\n Operator::Impulse(impulse_op) => impulse_op.finish_bundle(),\n Operator::GroupByKey(gbk_op) => gbk_op.finish_bundle(),\n Operator::ParDo(pardo_op) => pardo_op.finish_bundle(),\n Operator::Flatten(flatten_op) => flatten_op.finish_bundle(),\n _ => todo!(),\n };\n }\n}\n\npub(crate) fn create_operator(transform_id: &str, context: Arc<OperatorContext>) -> Operator {\n let descriptor: &fn_execution_v1::ProcessBundleDescriptor = context.descriptor.as_ref();\n\n let transform = descriptor\n .transforms\n .get(transform_id)\n .expect(\"Transform ID not found\");\n\n for pcoll_id in transform.outputs.values() {\n (context.get_receiver)(context.bundle_processor.clone(), pcoll_id.clone());\n }\n\n let operators_by_urn = OPERATORS_BY_URN.lock().unwrap();\n\n let spec = transform\n .spec\n .as_ref()\n .unwrap_or_else(|| panic!(\"Transform {} has no spec\", transform_id));\n\n let op_discriminant = operators_by_urn\n .get(spec.urn.as_str())\n .unwrap_or_else(|| panic!(\"Unknown transform type: {}\", spec.urn));\n\n match op_discriminant {\n OperatorDiscriminants::Create => Operator::Create(CreateOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Create,\n )),\n OperatorDiscriminants::Recording => Operator::Recording(RecordingOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Recording,\n )),\n OperatorDiscriminants::Impulse => Operator::Impulse(ImpulsePerBundleOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Impulse,\n )),\n OperatorDiscriminants::GroupByKey => {\n Operator::GroupByKey(GroupByKeyWithinBundleOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::GroupByKey,\n ))\n }\n OperatorDiscriminants::ParDo => Operator::ParDo(ParDoOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::ParDo,\n )),\n OperatorDiscriminants::Flatten => Operator::Flatten(FlattenOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::ParDo,\n )),\n _ => todo!(),\n }\n}\n\n#[derive(Debug)]\npub struct Receiver {\n operators: Vec<Arc<Operator>>,\n}\n\nimpl Receiver {\n pub(crate) fn new(operators: Vec<Arc<Operator>>) -> Self {\n Receiver { operators }\n }\n\n pub fn receive(&self, value: DynamicWindowedValue) {\n for op in &self.operators {\n op.process(value);\n }\n }\n}\n\npub struct OperatorContext {\n pub descriptor: Arc<fn_execution_v1::ProcessBundleDescriptor>,\n pub get_receiver: Box<dyn Fn(Arc<BundleProcessor>, String) -> Arc<Receiver> + Send + Sync>,\n // get_data_channel: fn(&str) -> MultiplexingDataChannel,\n // get_bundle_id: String,\n pub bundle_processor: Arc<BundleProcessor>,\n}\n\nimpl fmt::Debug for OperatorContext {\n fn fmt(&self, o: &mut fmt::Formatter<'_>) -> std::fmt::Result {\n o.debug_struct(\"OperatorContext\")\n .field(\"descriptor\", &self.descriptor)\n .field(\"bundle_processor\", &self.bundle_processor)\n .finish()\n }\n}\n\n// ******* Windowed Element Primitives *******\n\npub trait Window: core::fmt::Debug + Send {}\n\n#[derive(Clone, Debug)]\npub struct GlobalWindow;\n\nimpl Window for GlobalWindow {}\n\n#[derive(Clone, Copy, Debug)]\npub struct DynamicWindowedValue<'a>(&'a dyn Any);\n\nimpl<'a> DynamicWindowedValue<'a> {\n pub fn new<In: ElemType>(value: &'a WindowedValue<In>) -> DynamicWindowedValue<'a> {\n DynamicWindowedValue(value)\n }\n pub fn downcast_ref<In: ElemType>(self) -> &'a WindowedValue<In> {\n self.0.downcast_ref::<WindowedValue<In>>().unwrap()\n }\n}\n\n#[derive(Debug)]\npub struct WindowedValue<In: ElemType> {\n windows: Rc<Vec<Box<dyn Window>>>,\n timestamp: std::time::Instant,\n pane_info: Box<[u8]>,\n pub value: In,\n}\n\nimpl<In: ElemType> WindowedValue<In> {\n pub fn in_global_window(value: In) -> Self {\n Self {\n windows: Rc::new(vec![Box::new(GlobalWindow {})]),\n timestamp: std::time::Instant::now(), // TODO: MinTimestamp\n pane_info: Box::new([]),\n value,\n }\n }\n\n pub fn with_value<Out: ElemType>(&self, value: Out) -> WindowedValue<Out> {\n WindowedValue::<Out> {\n windows: self.windows.clone(),\n timestamp: self.timestamp,\n pane_info: self.pane_info.clone(),\n value,\n }\n }\n}\n\n// ******* Test Operator definitions *******\n\n#[derive(Clone, Debug)]\npub struct CreateOperator {\n _transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n data: Vec<String>,\n}\n\nimpl OperatorI for CreateOperator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let payload = transform\n .as_ref()\n .spec\n .as_ref()\n .expect(\"No spec found for transform\")\n .payload\n .clone();\n\n let data: Vec<String> = serde_json::from_slice(&payload).unwrap();\n\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self {\n _transform_id: transform_id,\n _transform: transform,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n data,\n }\n }\n\n fn start_bundle(&self) {\n for datum in &self.data {\n let wv = WindowedValue::in_global_window(datum.clone());\n for rec in self.receivers.iter() {\n rec.receive(DynamicWindowedValue(&wv));\n }\n }\n }\n\n fn process(&self, _value: DynamicWindowedValue) {}\n\n fn finish_bundle(&self) {}\n}\n\n#[derive(Clone, Debug)]\npub struct RecordingOperator {\n transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for RecordingOperator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self {\n transform_id,\n _transform: transform,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n }\n }\n\n fn start_bundle(&self) {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\"{}.start_bundle()\", self.transform_id));\n }\n\n fn process(&self, value: DynamicWindowedValue) {\n {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\n \"{}.process({:?})\",\n self.transform_id,\n value.downcast_ref::<String>().value\n ));\n }\n\n for rec in self.receivers.iter() {\n rec.receive(value);\n }\n }\n\n fn finish_bundle(&self) {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\"{}.finish_bundle()\", self.transform_id));\n }\n}\n\n#[derive(Clone, Debug)]\npub struct ImpulsePerBundleOperator {\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for ImpulsePerBundleOperator {\n fn new(\n _transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self { receivers }\n }\n\n fn start_bundle(&self) {\n let wv = WindowedValue::in_global_window(Vec::<u8>::new());\n for rec in self.receivers.iter() {\n rec.receive(DynamicWindowedValue::new(&wv));\n }\n }\n\n fn process(&self, _value: DynamicWindowedValue) {}\n\n fn finish_bundle(&self) {}\n}\n\n// We could use trait DynamicGroupedValues: Any + Send + Sync once\n// trait upclassing stabilizes https://github.com/rust-lang/rust/issues/65991\npub struct DynamicGroupedValues(Box<dyn Any + Send + Sync>);\n\nimpl DynamicGroupedValues {\n pub fn new<V: ElemType>() -> Self {\n DynamicGroupedValues(Box::<HashMap<String, Vec<V>>>::default())\n }\n\n pub fn downcast_mut<V: ElemType>(&mut self) -> &mut HashMap<String, Vec<V>> {\n self.0.downcast_mut::<HashMap<String, Vec<V>>>().unwrap()\n }\n\n pub fn downcast_ref<V: ElemType>(&self) -> &HashMap<String, Vec<V>> {\n self.0.downcast_ref::<HashMap<String, Vec<V>>>().unwrap()\n }\n}\n\n#[derive(Clone)]\npub(crate) struct GroupByKeyWithinBundleOperator {\n receivers: Vec<Arc<Receiver>>,\n key_extractor: &'static dyn serialize::DynamicKeyExtractor,\n // TODO: Operator requiring locking for structures only ever manipulated in\n // a single thread seems inefficient and overkill.\n grouped_values: Arc<Mutex<DynamicGroupedValues>>,\n}\n\nimpl OperatorI for GroupByKeyWithinBundleOperator {\n fn new(\n _transform_id: Arc<String>,\n transform_proto: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n // TODO: Shared by all operators, move up?\n let receivers = transform_proto\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n let key_extractor = serialize::get_extractor(\n &String::from_utf8(transform_proto.spec.as_ref().unwrap().payload.clone()).unwrap(),\n )\n .unwrap();\n\n Self {\n receivers,\n key_extractor,\n grouped_values: Arc::new(Mutex::new(key_extractor.new_grouped_values())),\n }\n }\n\n fn start_bundle(&self) {\n self.key_extractor\n .clear_grouped_values(&mut self.grouped_values.lock().unwrap());\n }\n\n fn process(&self, element: DynamicWindowedValue) {\n // TODO: assumes global window\n self.key_extractor\n .extract(element, &mut self.grouped_values.lock().unwrap());\n }\n\n fn finish_bundle(&self) {\n self.key_extractor\n .recombine(&self.grouped_values.lock().unwrap(), &self.receivers)\n }\n}\n\nimpl std::fmt::Debug for GroupByKeyWithinBundleOperator {\n fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {\n write!(f, \"GroupByKeyWithinBundleOperator\")\n }\n}\n\n// ******* Production Operator definitions *******\n#[derive(Clone)]\npub struct ParDoOperator {\n _transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n dofn: &'static dyn serialize::DynamicDoFn,\n}\n\nimpl OperatorI for ParDoOperator {\n fn new(\n transform_id: Arc<String>,\n transform_proto: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n // TODO: Shared by all operators, move up?\n let receivers = transform_proto\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n let dofn = serialize::get_do_fn(\n from_utf8(&transform_proto.spec.as_ref().unwrap().payload.clone()).unwrap(),\n )\n .unwrap();\n\n Self {\n _transform_id: transform_id,\n _transform: transform_proto,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n dofn,\n }\n }\n\n fn start_bundle(&self) {\n self.dofn.start_bundle_dyn()\n }\n\n fn process(&self, windowed_element: DynamicWindowedValue) {\n self.dofn.process_dyn(windowed_element, &self.receivers)\n }\n\n fn finish_bundle(&self) {\n self.dofn.finish_bundle_dyn()\n }\n}\n\nimpl std::fmt::Debug for ParDoOperator {\n fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {\n write!(f, \"ParDoOperator\")\n }\n}\n\n#[derive(Clone, Debug)]\npub struct FlattenOperator {\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for FlattenOperator {\n fn new(\n _transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self { receivers }\n }\n\n fn start_bundle(&self) {}\n\n fn process(&self, value: DynamicWindowedValue) {\n for rec in self.receivers.iter() {\n rec.receive(value);\n }\n }\n\n fn finish_bundle(&self) {}\n}\n"
},
{
"file": "src/internals/serialize.rs",
"description": "うん、ここで同一のKで得られるVecに対してvをpushしてる",
"line": 95,
"contents": "use std::collections::HashMap;\n\nuse std::boxed::Box;\nuse std::fmt;\nuse std::sync::{Arc, Mutex};\n\nuse once_cell::sync::Lazy;\n\nuse crate::elem_types::kv::KV;\nuse crate::elem_types::ElemType;\nuse crate::transforms::group_by_key::KeyExtractor;\nuse crate::transforms::pardo::DoFn;\nuse crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};\nuse crate::worker::Receiver;\n\nstatic DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\nstatic KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\npub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {\n let mut do_fns = DO_FNS.lock().unwrap();\n let name = format!(\"object{}\", do_fns.len());\n do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));\n name\n}\n\npub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {\n let binding = DO_FNS.lock().unwrap();\n binding.get(name).copied()\n}\n\npub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {\n let mut kes = KEY_EXTRACTORS.lock().unwrap();\n let name = format!(\"object{}\", kes.len());\n kes.insert(name.to_string(), Box::leak(Box::new(ke)));\n name\n}\n\npub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {\n KEY_EXTRACTORS.lock().unwrap().get(name).copied()\n}\n\npub trait DynamicDoFn: Send + Sync {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);\n fn start_bundle_dyn(&self);\n fn finish_bundle_dyn(&self);\n}\n\nimpl<D: DoFn> DynamicDoFn for D {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {\n let typed_elem = elem.downcast_ref::<D::In>();\n for value in self.process(&typed_elem.value) {\n let windowed_value = typed_elem.with_value(value);\n for receiver in receivers {\n receiver.receive(DynamicWindowedValue::new(&windowed_value))\n }\n }\n }\n\n fn start_bundle_dyn(&self) {\n self.start_bundle()\n }\n\n fn finish_bundle_dyn(&self) {\n self.finish_bundle()\n }\n}\n\npub trait DynamicKeyExtractor: Sync + Send {\n fn new_grouped_values(&self) -> DynamicGroupedValues;\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);\n}\n\nimpl<V> DynamicKeyExtractor for KeyExtractor<V>\nwhere\n V: ElemType + Clone + fmt::Debug,\n{\n fn new_grouped_values(&self) -> DynamicGroupedValues {\n DynamicGroupedValues::new::<V>()\n }\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {\n grouped_values.downcast_mut::<V>().clear()\n }\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {\n let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;\n let grouped_values = grouped_values.downcast_mut::<V>();\n\n if !grouped_values.contains_key(k) {\n grouped_values.insert(k.clone(), Vec::new());\n }\n grouped_values.get_mut(k).unwrap().push(v.clone());\n }\n\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {\n let typed_grouped_values = grouped_values.downcast_ref::<V>();\n for (key, values) in typed_grouped_values.iter() {\n // TODO: timestamp and pane info are wrong\n for receiver in receivers.iter() {\n // TODO: End-of-window timestamp, only firing pane.\n let mut typed_values: Vec<V> = Vec::new();\n for value in values.iter() {\n typed_values.push(value.clone());\n }\n let res = KV {\n k: key.to_string(),\n v: typed_values,\n };\n receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(\n res,\n )));\n }\n }\n }\n}\n"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment