Created
June 21, 2023 02:30
-
-
Save laysakura/07987c01552a5b5e01a006e98e1925f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "$schema": "https://aka.ms/codetour-schema", | |
| "title": "DirectRunnerによるGBK実行方法", | |
| "steps": [ | |
| { | |
| "file": "tests/primitives_test.rs", | |
| "description": "こいつをエントリポイントに", | |
| "line": 74, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n" | |
| }, | |
| { | |
| "file": "tests/primitives_test.rs", | |
| "description": "これでパイプライングラフにGBKのオペレーターが追加されるのは当然なのだが、protoになったGBKオペレーターを、DirectRunnerはどのように実行しているのか。\n嫌な仮説としては、GBK実行をWorker側の責務にしていること。", | |
| "line": 82, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n" | |
| }, | |
| { | |
| "file": "src/runners/direct_runner.rs", | |
| "description": "GBK含むいずれのオペレーターも、Runnerが実行すべきかを層別せずに、worker側のBundleProcessorに送り込んでいるように見える", | |
| "line": 82, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::sync::Arc;\n\nuse async_trait::async_trait;\n\nuse crate::proto::{fn_execution_v1, pipeline_v1};\nuse crate::worker::sdk_worker::BundleProcessor;\n\nuse crate::runners::runner::RunnerI;\n\npub struct DirectRunner;\n\n#[async_trait]\nimpl RunnerI for DirectRunner {\n fn new() -> Self {\n Self\n }\n\n async fn run_pipeline(&self, pipeline: Arc<std::sync::Mutex<pipeline_v1::Pipeline>>) {\n let descriptor: fn_execution_v1::ProcessBundleDescriptor;\n {\n let p = pipeline.lock().unwrap();\n\n // TODO: use this to define the descriptor instead of p\n // let proto = rewrite_side_inputs(pipeline, state_cache_ref);\n\n // TODO: review cloning\n descriptor = fn_execution_v1::ProcessBundleDescriptor {\n id: \"\".to_string(),\n transforms: p\n .components\n .as_ref()\n .expect(\"Missing components\")\n .transforms\n .clone(),\n pcollections: p\n .components\n .as_ref()\n .expect(\"Missing PCollections\")\n .pcollections\n .clone(),\n windowing_strategies: p\n .components\n .as_ref()\n .expect(\"Missing windowing strategies\")\n .windowing_strategies\n .clone(),\n coders: p\n .components\n .as_ref()\n .expect(\"Missing coders\")\n .coders\n .clone(),\n environments: p\n .components\n .as_ref()\n .expect(\"Missing environments\")\n .environments\n .clone(),\n state_api_service_descriptor: None,\n timer_api_service_descriptor: None,\n };\n }\n\n let processor = BundleProcessor::new(Arc::new(descriptor), &[crate::runners::IMPULSE_URN]);\n\n processor.process(\"bundle_id\".to_string()).await;\n }\n}\n" | |
| }, | |
| { | |
| "file": "src/worker/operators.rs", | |
| "description": "mod worker配下のこいつがGBK処理しているように見えるなぁ", | |
| "line": 489, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::any::Any;\nuse std::collections::HashMap;\nuse std::fmt;\nuse std::rc::Rc;\nuse std::str::from_utf8;\nuse std::sync::{Arc, Mutex};\n\n//use std::borrow::{Borrow, BorrowMut};\n\nuse once_cell::sync::Lazy;\nuse serde_json;\nuse strum::EnumDiscriminants;\n\nuse crate::elem_types::ElemType;\nuse crate::internals::serialize;\nuse crate::internals::urns;\nuse crate::proto::{fn_execution_v1, pipeline_v1};\n\nuse crate::worker::sdk_worker::BundleProcessor;\nuse crate::worker::test_utils::RECORDING_OPERATOR_LOGS;\n\ntype OperatorMap = HashMap<&'static str, OperatorDiscriminants>;\n\nstatic OPERATORS_BY_URN: Lazy<Mutex<OperatorMap>> = Lazy::new(|| {\n // TODO: these will have to be parameterized depending on things such as the runner used\n let m: OperatorMap = HashMap::from([\n // Test operators\n (urns::CREATE_URN, OperatorDiscriminants::Create),\n (urns::RECORDING_URN, OperatorDiscriminants::Recording),\n (urns::PARTITION_URN, OperatorDiscriminants::_Partitioning),\n (urns::IMPULSE_URN, OperatorDiscriminants::Impulse),\n (urns::GROUP_BY_KEY_URN, OperatorDiscriminants::GroupByKey),\n // Production operators\n (urns::DATA_INPUT_URN, OperatorDiscriminants::_DataSource),\n (urns::PAR_DO_URN, OperatorDiscriminants::ParDo),\n (urns::FLATTEN_URN, OperatorDiscriminants::Flatten),\n ]);\n\n Mutex::new(m)\n});\n\npub(crate) trait OperatorI {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self\n where\n Self: Sized;\n\n fn start_bundle(&self);\n\n fn process(&self, value: DynamicWindowedValue);\n\n fn finish_bundle(&self) {\n todo!()\n }\n}\n\n#[derive(Clone, fmt::Debug, EnumDiscriminants)]\npub(crate) enum Operator {\n // Test operators\n Create(CreateOperator),\n Recording(RecordingOperator),\n _Partitioning,\n GroupByKey(GroupByKeyWithinBundleOperator),\n Impulse(ImpulsePerBundleOperator),\n\n // Production operators\n _DataSource,\n ParDo(ParDoOperator),\n Flatten(FlattenOperator),\n}\n\nimpl OperatorI for Operator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n match operator_discriminant {\n OperatorDiscriminants::Create => Operator::Create(CreateOperator::new(\n transform_id,\n transform,\n context,\n operator_discriminant,\n )),\n _ => todo!(),\n }\n }\n\n fn start_bundle(&self) {\n match self {\n Operator::Create(create_op) => create_op.start_bundle(),\n Operator::Recording(recording_op) => recording_op.start_bundle(),\n Operator::Impulse(impulse_op) => impulse_op.start_bundle(),\n Operator::GroupByKey(gbk_op) => gbk_op.start_bundle(),\n Operator::ParDo(pardo_op) => pardo_op.start_bundle(),\n Operator::Flatten(flatten_op) => flatten_op.start_bundle(),\n _ => todo!(),\n };\n }\n\n fn process(&self, value: DynamicWindowedValue) {\n match self {\n Operator::Create(create_op) => {\n create_op.process(value);\n }\n Operator::Recording(recording_op) => {\n recording_op.process(value);\n }\n Operator::Impulse(impulse_op) => {\n impulse_op.process(value);\n }\n Operator::GroupByKey(gbk_op) => {\n gbk_op.process(value);\n }\n Operator::ParDo(pardo_op) => {\n pardo_op.process(value);\n }\n Operator::Flatten(flatten_op) => {\n flatten_op.process(value);\n }\n _ => todo!(),\n };\n }\n\n fn finish_bundle(&self) {\n match self {\n Operator::Create(create_op) => create_op.finish_bundle(),\n Operator::Recording(recording_op) => recording_op.finish_bundle(),\n Operator::Impulse(impulse_op) => impulse_op.finish_bundle(),\n Operator::GroupByKey(gbk_op) => gbk_op.finish_bundle(),\n Operator::ParDo(pardo_op) => pardo_op.finish_bundle(),\n Operator::Flatten(flatten_op) => flatten_op.finish_bundle(),\n _ => todo!(),\n };\n }\n}\n\npub(crate) fn create_operator(transform_id: &str, context: Arc<OperatorContext>) -> Operator {\n let descriptor: &fn_execution_v1::ProcessBundleDescriptor = context.descriptor.as_ref();\n\n let transform = descriptor\n .transforms\n .get(transform_id)\n .expect(\"Transform ID not found\");\n\n for pcoll_id in transform.outputs.values() {\n (context.get_receiver)(context.bundle_processor.clone(), pcoll_id.clone());\n }\n\n let operators_by_urn = OPERATORS_BY_URN.lock().unwrap();\n\n let spec = transform\n .spec\n .as_ref()\n .unwrap_or_else(|| panic!(\"Transform {} has no spec\", transform_id));\n\n let op_discriminant = operators_by_urn\n .get(spec.urn.as_str())\n .unwrap_or_else(|| panic!(\"Unknown transform type: {}\", spec.urn));\n\n match op_discriminant {\n OperatorDiscriminants::Create => Operator::Create(CreateOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Create,\n )),\n OperatorDiscriminants::Recording => Operator::Recording(RecordingOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Recording,\n )),\n OperatorDiscriminants::Impulse => Operator::Impulse(ImpulsePerBundleOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::Impulse,\n )),\n OperatorDiscriminants::GroupByKey => {\n Operator::GroupByKey(GroupByKeyWithinBundleOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::GroupByKey,\n ))\n }\n OperatorDiscriminants::ParDo => Operator::ParDo(ParDoOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::ParDo,\n )),\n OperatorDiscriminants::Flatten => Operator::Flatten(FlattenOperator::new(\n Arc::new(transform_id.to_string()),\n Arc::new(transform.clone()),\n context.clone(),\n OperatorDiscriminants::ParDo,\n )),\n _ => todo!(),\n }\n}\n\n#[derive(Debug)]\npub struct Receiver {\n operators: Vec<Arc<Operator>>,\n}\n\nimpl Receiver {\n pub(crate) fn new(operators: Vec<Arc<Operator>>) -> Self {\n Receiver { operators }\n }\n\n pub fn receive(&self, value: DynamicWindowedValue) {\n for op in &self.operators {\n op.process(value);\n }\n }\n}\n\npub struct OperatorContext {\n pub descriptor: Arc<fn_execution_v1::ProcessBundleDescriptor>,\n pub get_receiver: Box<dyn Fn(Arc<BundleProcessor>, String) -> Arc<Receiver> + Send + Sync>,\n // get_data_channel: fn(&str) -> MultiplexingDataChannel,\n // get_bundle_id: String,\n pub bundle_processor: Arc<BundleProcessor>,\n}\n\nimpl fmt::Debug for OperatorContext {\n fn fmt(&self, o: &mut fmt::Formatter<'_>) -> std::fmt::Result {\n o.debug_struct(\"OperatorContext\")\n .field(\"descriptor\", &self.descriptor)\n .field(\"bundle_processor\", &self.bundle_processor)\n .finish()\n }\n}\n\n// ******* Windowed Element Primitives *******\n\npub trait Window: core::fmt::Debug + Send {}\n\n#[derive(Clone, Debug)]\npub struct GlobalWindow;\n\nimpl Window for GlobalWindow {}\n\n#[derive(Clone, Copy, Debug)]\npub struct DynamicWindowedValue<'a>(&'a dyn Any);\n\nimpl<'a> DynamicWindowedValue<'a> {\n pub fn new<In: ElemType>(value: &'a WindowedValue<In>) -> DynamicWindowedValue<'a> {\n DynamicWindowedValue(value)\n }\n pub fn downcast_ref<In: ElemType>(self) -> &'a WindowedValue<In> {\n self.0.downcast_ref::<WindowedValue<In>>().unwrap()\n }\n}\n\n#[derive(Debug)]\npub struct WindowedValue<In: ElemType> {\n windows: Rc<Vec<Box<dyn Window>>>,\n timestamp: std::time::Instant,\n pane_info: Box<[u8]>,\n pub value: In,\n}\n\nimpl<In: ElemType> WindowedValue<In> {\n pub fn in_global_window(value: In) -> Self {\n Self {\n windows: Rc::new(vec![Box::new(GlobalWindow {})]),\n timestamp: std::time::Instant::now(), // TODO: MinTimestamp\n pane_info: Box::new([]),\n value,\n }\n }\n\n pub fn with_value<Out: ElemType>(&self, value: Out) -> WindowedValue<Out> {\n WindowedValue::<Out> {\n windows: self.windows.clone(),\n timestamp: self.timestamp,\n pane_info: self.pane_info.clone(),\n value,\n }\n }\n}\n\n// ******* Test Operator definitions *******\n\n#[derive(Clone, Debug)]\npub struct CreateOperator {\n _transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n data: Vec<String>,\n}\n\nimpl OperatorI for CreateOperator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let payload = transform\n .as_ref()\n .spec\n .as_ref()\n .expect(\"No spec found for transform\")\n .payload\n .clone();\n\n let data: Vec<String> = serde_json::from_slice(&payload).unwrap();\n\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self {\n _transform_id: transform_id,\n _transform: transform,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n data,\n }\n }\n\n fn start_bundle(&self) {\n for datum in &self.data {\n let wv = WindowedValue::in_global_window(datum.clone());\n for rec in self.receivers.iter() {\n rec.receive(DynamicWindowedValue(&wv));\n }\n }\n }\n\n fn process(&self, _value: DynamicWindowedValue) {}\n\n fn finish_bundle(&self) {}\n}\n\n#[derive(Clone, Debug)]\npub struct RecordingOperator {\n transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for RecordingOperator {\n fn new(\n transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self {\n transform_id,\n _transform: transform,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n }\n }\n\n fn start_bundle(&self) {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\"{}.start_bundle()\", self.transform_id));\n }\n\n fn process(&self, value: DynamicWindowedValue) {\n {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\n \"{}.process({:?})\",\n self.transform_id,\n value.downcast_ref::<String>().value\n ));\n }\n\n for rec in self.receivers.iter() {\n rec.receive(value);\n }\n }\n\n fn finish_bundle(&self) {\n let mut log = RECORDING_OPERATOR_LOGS.lock().unwrap();\n log.push(format!(\"{}.finish_bundle()\", self.transform_id));\n }\n}\n\n#[derive(Clone, Debug)]\npub struct ImpulsePerBundleOperator {\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for ImpulsePerBundleOperator {\n fn new(\n _transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self { receivers }\n }\n\n fn start_bundle(&self) {\n let wv = WindowedValue::in_global_window(Vec::<u8>::new());\n for rec in self.receivers.iter() {\n rec.receive(DynamicWindowedValue::new(&wv));\n }\n }\n\n fn process(&self, _value: DynamicWindowedValue) {}\n\n fn finish_bundle(&self) {}\n}\n\n// We could use trait DynamicGroupedValues: Any + Send + Sync once\n// trait upclassing stabilizes https://github.com/rust-lang/rust/issues/65991\npub struct DynamicGroupedValues(Box<dyn Any + Send + Sync>);\n\nimpl DynamicGroupedValues {\n pub fn new<V: ElemType>() -> Self {\n DynamicGroupedValues(Box::<HashMap<String, Vec<V>>>::default())\n }\n\n pub fn downcast_mut<V: ElemType>(&mut self) -> &mut HashMap<String, Vec<V>> {\n self.0.downcast_mut::<HashMap<String, Vec<V>>>().unwrap()\n }\n\n pub fn downcast_ref<V: ElemType>(&self) -> &HashMap<String, Vec<V>> {\n self.0.downcast_ref::<HashMap<String, Vec<V>>>().unwrap()\n }\n}\n\n#[derive(Clone)]\npub(crate) struct GroupByKeyWithinBundleOperator {\n receivers: Vec<Arc<Receiver>>,\n key_extractor: &'static dyn serialize::DynamicKeyExtractor,\n // TODO: Operator requiring locking for structures only ever manipulated in\n // a single thread seems inefficient and overkill.\n grouped_values: Arc<Mutex<DynamicGroupedValues>>,\n}\n\nimpl OperatorI for GroupByKeyWithinBundleOperator {\n fn new(\n _transform_id: Arc<String>,\n transform_proto: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n // TODO: Shared by all operators, move up?\n let receivers = transform_proto\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n let key_extractor = serialize::get_extractor(\n &String::from_utf8(transform_proto.spec.as_ref().unwrap().payload.clone()).unwrap(),\n )\n .unwrap();\n\n Self {\n receivers,\n key_extractor,\n grouped_values: Arc::new(Mutex::new(key_extractor.new_grouped_values())),\n }\n }\n\n fn start_bundle(&self) {\n self.key_extractor\n .clear_grouped_values(&mut self.grouped_values.lock().unwrap());\n }\n\n fn process(&self, element: DynamicWindowedValue) {\n // TODO: assumes global window\n self.key_extractor\n .extract(element, &mut self.grouped_values.lock().unwrap());\n }\n\n fn finish_bundle(&self) {\n self.key_extractor\n .recombine(&self.grouped_values.lock().unwrap(), &self.receivers)\n }\n}\n\nimpl std::fmt::Debug for GroupByKeyWithinBundleOperator {\n fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {\n write!(f, \"GroupByKeyWithinBundleOperator\")\n }\n}\n\n// ******* Production Operator definitions *******\n#[derive(Clone)]\npub struct ParDoOperator {\n _transform_id: Arc<String>,\n _transform: Arc<pipeline_v1::PTransform>,\n _context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n\n receivers: Vec<Arc<Receiver>>,\n dofn: &'static dyn serialize::DynamicDoFn,\n}\n\nimpl OperatorI for ParDoOperator {\n fn new(\n transform_id: Arc<String>,\n transform_proto: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n // TODO: Shared by all operators, move up?\n let receivers = transform_proto\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n let dofn = serialize::get_do_fn(\n from_utf8(&transform_proto.spec.as_ref().unwrap().payload.clone()).unwrap(),\n )\n .unwrap();\n\n Self {\n _transform_id: transform_id,\n _transform: transform_proto,\n _context: context,\n _operator_discriminant: operator_discriminant,\n receivers,\n dofn,\n }\n }\n\n fn start_bundle(&self) {\n self.dofn.start_bundle_dyn()\n }\n\n fn process(&self, windowed_element: DynamicWindowedValue) {\n self.dofn.process_dyn(windowed_element, &self.receivers)\n }\n\n fn finish_bundle(&self) {\n self.dofn.finish_bundle_dyn()\n }\n}\n\nimpl std::fmt::Debug for ParDoOperator {\n fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {\n write!(f, \"ParDoOperator\")\n }\n}\n\n#[derive(Clone, Debug)]\npub struct FlattenOperator {\n receivers: Vec<Arc<Receiver>>,\n}\n\nimpl OperatorI for FlattenOperator {\n fn new(\n _transform_id: Arc<String>,\n transform: Arc<pipeline_v1::PTransform>,\n context: Arc<OperatorContext>,\n _operator_discriminant: OperatorDiscriminants,\n ) -> Self {\n let receivers = transform\n .outputs\n .values()\n .map(|pcollection_id: &String| {\n let bp = context.bundle_processor.clone();\n (context.get_receiver)(bp, pcollection_id.clone())\n })\n .collect();\n\n Self { receivers }\n }\n\n fn start_bundle(&self) {}\n\n fn process(&self, value: DynamicWindowedValue) {\n for rec in self.receivers.iter() {\n rec.receive(value);\n }\n }\n\n fn finish_bundle(&self) {}\n}\n" | |
| }, | |
| { | |
| "file": "src/internals/serialize.rs", | |
| "description": "うん、ここで同一のKで得られるVecに対してvをpushしてる", | |
| "line": 95, | |
| "contents": "use std::collections::HashMap;\n\nuse std::boxed::Box;\nuse std::fmt;\nuse std::sync::{Arc, Mutex};\n\nuse once_cell::sync::Lazy;\n\nuse crate::elem_types::kv::KV;\nuse crate::elem_types::ElemType;\nuse crate::transforms::group_by_key::KeyExtractor;\nuse crate::transforms::pardo::DoFn;\nuse crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};\nuse crate::worker::Receiver;\n\nstatic DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\nstatic KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\npub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {\n let mut do_fns = DO_FNS.lock().unwrap();\n let name = format!(\"object{}\", do_fns.len());\n do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));\n name\n}\n\npub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {\n let binding = DO_FNS.lock().unwrap();\n binding.get(name).copied()\n}\n\npub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {\n let mut kes = KEY_EXTRACTORS.lock().unwrap();\n let name = format!(\"object{}\", kes.len());\n kes.insert(name.to_string(), Box::leak(Box::new(ke)));\n name\n}\n\npub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {\n KEY_EXTRACTORS.lock().unwrap().get(name).copied()\n}\n\npub trait DynamicDoFn: Send + Sync {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);\n fn start_bundle_dyn(&self);\n fn finish_bundle_dyn(&self);\n}\n\nimpl<D: DoFn> DynamicDoFn for D {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {\n let typed_elem = elem.downcast_ref::<D::In>();\n for value in self.process(&typed_elem.value) {\n let windowed_value = typed_elem.with_value(value);\n for receiver in receivers {\n receiver.receive(DynamicWindowedValue::new(&windowed_value))\n }\n }\n }\n\n fn start_bundle_dyn(&self) {\n self.start_bundle()\n }\n\n fn finish_bundle_dyn(&self) {\n self.finish_bundle()\n }\n}\n\npub trait DynamicKeyExtractor: Sync + Send {\n fn new_grouped_values(&self) -> DynamicGroupedValues;\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);\n}\n\nimpl<V> DynamicKeyExtractor for KeyExtractor<V>\nwhere\n V: ElemType + Clone + fmt::Debug,\n{\n fn new_grouped_values(&self) -> DynamicGroupedValues {\n DynamicGroupedValues::new::<V>()\n }\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {\n grouped_values.downcast_mut::<V>().clear()\n }\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {\n let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;\n let grouped_values = grouped_values.downcast_mut::<V>();\n\n if !grouped_values.contains_key(k) {\n grouped_values.insert(k.clone(), Vec::new());\n }\n grouped_values.get_mut(k).unwrap().push(v.clone());\n }\n\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {\n let typed_grouped_values = grouped_values.downcast_ref::<V>();\n for (key, values) in typed_grouped_values.iter() {\n // TODO: timestamp and pane info are wrong\n for receiver in receivers.iter() {\n // TODO: End-of-window timestamp, only firing pane.\n let mut typed_values: Vec<V> = Vec::new();\n for value in values.iter() {\n typed_values.push(value.clone());\n }\n let res = KV {\n k: key.to_string(),\n v: typed_values,\n };\n receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(\n res,\n )));\n }\n }\n }\n}\n" | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment