Created
June 21, 2023 00:56
-
-
Save laysakura/592332b3e478db9318ba6e889f66ac8f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "$schema": "https://aka.ms/codetour-schema", | |
| "title": "UDF実行の仕組み", | |
| "steps": [ | |
| { | |
| "file": "tests/primitives_test.rs", | |
| "description": "ここをエントリポイントとして読む", | |
| "line": 74, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n" | |
| }, | |
| { | |
| "file": "src/internals/pipeline.rs", | |
| "description": "PTransformはFunctionSpecを持つ(それがFn APIでWorkerに渡る)はずだが、PCollection::apply()から呼ばれるここで、FunctionSpecはNoneに設定されている。\n現状のコードだとこの後FunctionSpecがSomeになることはないので、FunctionSpecからの関数deserはしていない。", | |
| "line": 225, | |
| "contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::collections::{HashMap, HashSet};\nuse std::sync::{Arc, Mutex};\n\nuse crate::coders::CoderUrnTree;\nuse crate::elem_types::ElemType;\nuse crate::proto::pipeline_v1;\n\nuse crate::internals::pvalue::{flatten_pvalue, PTransform, PValue};\n\nconst _CODER_ID_PREFIX: &str = \"coder_\";\n\n/// A part of a `Pipeline` to help construct / look up it.\nstruct PipelineContext {\n component_prefix: String,\n counter: Arc<Mutex<usize>>,\n}\n\nimpl PipelineContext {\n pub fn new(component_prefix: String) -> Self {\n Self {\n component_prefix,\n counter: Arc::new(Mutex::new(0)),\n }\n }\n\n pub fn create_unique_name(&self, prefix: String) -> String {\n format!(\n \"{}{}_{}\",\n self.component_prefix,\n prefix,\n self.get_and_increment_counter()\n )\n }\n\n fn get_and_increment_counter(&self) -> usize {\n let mut counter = self.counter.lock().unwrap();\n *counter += 1;\n\n *counter - 1\n }\n}\n\n/// Corresponds to the `Pipeline` in Runner API's proto.\n///\n/// The pipeline is instantiated on `Runner::run()`, and used by a (remote or direct) runner.\npub struct Pipeline {\n context: PipelineContext,\n default_environment: String,\n proto: Arc<Mutex<pipeline_v1::Pipeline>>,\n transform_stack: Arc<Mutex<Vec<String>>>,\n used_stage_names: Arc<Mutex<HashSet<String>>>,\n\n coder_proto_counter: Mutex<usize>,\n}\n\nimpl Pipeline {\n pub fn new(component_prefix: String) -> Self {\n let proto = pipeline_v1::Pipeline {\n components: Some(pipeline_v1::Components {\n transforms: HashMap::with_capacity(0),\n pcollections: HashMap::with_capacity(0),\n windowing_strategies: HashMap::with_capacity(0),\n coders: HashMap::with_capacity(0),\n environments: HashMap::with_capacity(0),\n }),\n root_transform_ids: Vec::with_capacity(0),\n display_data: Vec::with_capacity(0),\n requirements: Vec::with_capacity(0),\n };\n\n Pipeline {\n context: PipelineContext::new(component_prefix.clone()),\n default_environment: format!(\"{}rustEnvironment\", component_prefix),\n proto: Arc::new(Mutex::new(proto)),\n transform_stack: Arc::new(Mutex::new(Vec::with_capacity(0))),\n used_stage_names: Arc::new(Mutex::new(HashSet::with_capacity(0))),\n\n coder_proto_counter: Mutex::new(0),\n }\n }\n\n pub fn get_proto(&self) -> Arc<std::sync::Mutex<pipeline_v1::Pipeline>> {\n self.proto.clone()\n }\n\n /// Recursively construct a coder (and its component coders) in protocol buffer representation for the Runner API.\n /// A coder in protobuf format can be shared with other components such as Beam runners,\n /// SDK workers; and reconstructed into its runtime representation if necessary.\n fn coder_to_proto(&self, coder_urn_tree: &CoderUrnTree) -> pipeline_v1::Coder {\n fn helper(coder_urn: &str, component_coder_ids: Vec<String>) -> pipeline_v1::Coder {\n let spec = pipeline_v1::FunctionSpec {\n urn: coder_urn.to_string(),\n payload: vec![], // unused in Rust SDK\n };\n pipeline_v1::Coder {\n spec: Some(spec),\n component_coder_ids,\n }\n }\n\n let component_coder_ids = coder_urn_tree\n .component_coder_urns\n .iter()\n .map(|component_coder_urn| {\n let coder_proto = self.coder_to_proto(component_coder_urn);\n self.get_coder_id(coder_proto)\n })\n .collect();\n\n helper(&coder_urn_tree.coder_urn, component_coder_ids)\n }\n\n /// If the `coder_proto` is already registered, return its ID.\n /// Else, the `coder_proto` is registered and its newly-created ID is returned.\n fn get_coder_id(&self, coder_proto: pipeline_v1::Coder) -> String {\n let mut pipeline_proto = self.proto.lock().unwrap();\n\n let proto_coders = &mut pipeline_proto.components.as_mut().unwrap().coders;\n\n for (coder_id, coder) in proto_coders.iter() {\n if *coder == coder_proto {\n return coder_id.clone();\n }\n }\n\n let mut coder_counter = self.coder_proto_counter.lock().unwrap();\n *coder_counter += 1;\n let new_coder_id = format!(\"{}{}\", _CODER_ID_PREFIX, *coder_counter);\n proto_coders.insert(new_coder_id.clone(), coder_proto);\n\n new_coder_id\n }\n\n pub fn register_proto_transform(&self, transform: pipeline_v1::PTransform) {\n let mut pipeline_proto = self.proto.lock().unwrap();\n\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform.unique_name.clone(), transform);\n }\n\n pub fn pre_apply_transform<In, Out, F>(\n &self,\n _transform: &F,\n input: &PValue<In>,\n ) -> (String, pipeline_v1::PTransform)\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n let transform_id = self.context.create_unique_name(\"transform\".to_string());\n let mut parent: Option<&pipeline_v1::PTransform> = None;\n\n let mut pipeline_proto = self.proto.lock().unwrap();\n let transform_stack = self.transform_stack.lock().unwrap();\n\n if transform_stack.is_empty() {\n pipeline_proto.root_transform_ids.push(transform_id.clone());\n } else {\n let p = pipeline_proto\n .components\n .as_mut()\n .expect(\"No components on pipeline proto\")\n .transforms\n .get_mut(transform_stack.last().expect(\"Transform stack is empty\"))\n .expect(\"Transform ID not registered on pipeline proto\");\n\n let p_subtransforms: &mut Vec<String> = p.subtransforms.as_mut();\n p_subtransforms.push(transform_id.clone());\n\n parent = Some(p);\n }\n drop(transform_stack);\n\n let parent_name = match parent {\n Some(p) => {\n format!(\"{}/\", p.unique_name.clone())\n }\n None => \"\".to_string(),\n };\n\n // TODO: extract unique transform name properly\n let bad_transform_name = crate::internals::utils::get_bad_id();\n let unique_name = format!(\"{}{}\", parent_name, bad_transform_name);\n\n {\n let mut used_stage_names = self.used_stage_names.lock().unwrap();\n\n if used_stage_names.contains(&unique_name) {\n panic!(\"Duplicate stage name: {}\", unique_name)\n }\n used_stage_names.insert(unique_name.clone());\n }\n\n let flattened = flatten_pvalue(input, None);\n let mut inputs: HashMap<String, String> = HashMap::new();\n for (name, id) in flattened {\n inputs.insert(name.clone(), id);\n }\n\n let transform_proto = pipeline_v1::PTransform {\n unique_name,\n spec: None,\n subtransforms: Vec::with_capacity(0),\n inputs,\n outputs: HashMap::with_capacity(0),\n display_data: Vec::with_capacity(0),\n environment_id: self.default_environment.clone(),\n annotations: HashMap::with_capacity(0),\n };\n\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform_id.clone(), transform_proto.clone());\n\n (transform_id, transform_proto)\n }\n\n pub(crate) fn apply_transform<In, Out, F>(\n &self,\n transform: F,\n input: &PValue<In>,\n pipeline: Arc<Pipeline>,\n\n // Coder's URN that encode/decode `Out`.\n out_coder_urn: &CoderUrnTree,\n ) -> PValue<Out>\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n // TODO: Inline pre_apply and post_apply.\n // (They exist in typescript only to share code between the sync and\n // async variants).\n let (transform_id, mut transform_proto) = self.pre_apply_transform(&transform, input);\n\n {\n let mut transform_stack = self.transform_stack.lock().unwrap();\n transform_stack.push(transform_id.clone());\n drop(transform_stack);\n }\n\n let result = transform.expand(input, pipeline, out_coder_urn, &mut transform_proto);\n\n for (name, id) in flatten_pvalue(&result, None) {\n // Causes test to hang...\n transform_proto.outputs.insert(name.clone(), id);\n }\n\n // Re-insert the transform with its outputs and any mutation that\n // expand_internal performed.\n let mut pipeline_proto = self.proto.lock().unwrap();\n // This may have been mutated.\n // TODO: Perhaps only insert at the end?\n transform_proto.subtransforms = pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .get(&transform_id)\n .unwrap()\n .subtransforms\n .clone();\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform_id, transform_proto.clone());\n drop(pipeline_proto);\n\n // TODO: ensure this happens even if an error takes place above\n {\n let mut transform_stack = self.transform_stack.lock().unwrap();\n transform_stack.pop();\n drop(transform_stack);\n }\n\n self.post_apply_transform(transform, transform_proto, result)\n }\n\n // TODO: deal with bounds and windows\n pub fn post_apply_transform<In, Out, F>(\n &self,\n _transform: F,\n _transform_proto: pipeline_v1::PTransform,\n result: PValue<Out>,\n ) -> PValue<Out>\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n result\n }\n\n pub(crate) fn create_pcollection_internal<Out>(\n &self,\n coder_urn_tree: &CoderUrnTree,\n pipeline: Arc<Pipeline>,\n ) -> PValue<Out>\n where\n Out: ElemType,\n {\n let coder_id = {\n let coder_proto = self.coder_to_proto(coder_urn_tree);\n self.get_coder_id(coder_proto)\n };\n\n PValue::new(\n crate::internals::pvalue::PType::PCollection,\n pipeline,\n self.create_pcollection_id_internal(coder_id),\n coder_urn_tree.coder_urn.to_string(),\n )\n }\n\n fn create_pcollection_id_internal(&self, coder_id: String) -> String {\n let pcoll_id = self.context.create_unique_name(\"pc\".to_string());\n let pcoll_proto: pipeline_v1::PCollection = pipeline_v1::PCollection {\n unique_name: pcoll_id.clone(),\n coder_id,\n ..Default::default()\n };\n\n let mut pipeline_proto = self.proto.lock().unwrap();\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .pcollections\n .insert(pcoll_id.clone(), pcoll_proto);\n\n pcoll_id\n }\n}\n\nimpl Default for Pipeline {\n fn default() -> Self {\n Self::new(\"\".to_string())\n }\n}\n\n#[cfg(test)]\nmod tests {\n use crate::{\n coders::{\n required_coders::BytesCoder,\n urns::{BYTES_CODER_URN, ITERABLE_CODER_URN},\n },\n transforms::impulse::Impulse,\n };\n\n use super::*;\n\n fn coder_urn_from_pvalue<E: ElemType>(pvalue: &PValue<E>) -> String {\n let coder_urn_tree = CoderUrnTree::from(pvalue);\n coder_urn_tree.coder_urn\n }\n\n #[test]\n fn test_default_coder_in_proto() {\n let root = PValue::<()>::root();\n let pvalue = root.apply(Impulse::new());\n\n let coder_urn = coder_urn_from_pvalue(&pvalue);\n assert_eq!(coder_urn, ITERABLE_CODER_URN);\n }\n\n #[test]\n fn test_override_coder_in_proto() {\n let root = PValue::<()>::root();\n let pvalue = root.apply_with_coder::<BytesCoder, _, _>(Impulse::new());\n\n let coder_urn = coder_urn_from_pvalue(&pvalue);\n assert_eq!(coder_urn, BYTES_CODER_URN);\n }\n}\n" | |
| }, | |
| { | |
| "file": "src/internals/serialize.rs", | |
| "description": "結局のところ、DoFn (ParDoのインスタンス的なもの、つまりUDF) は、ここでトレイトオブジェクト *のアドレス(!)* として格納されている。\nそれがキーから復元され呼び出されている。\n\nトレイトオブジェクトのアドレスはFn APIをまたぐと当然異なるので、この機構はFn APIとしては破綻している。", | |
| "line": 17, | |
| "contents": "use std::collections::HashMap;\n\nuse std::boxed::Box;\nuse std::fmt;\nuse std::sync::{Arc, Mutex};\n\nuse once_cell::sync::Lazy;\n\nuse crate::elem_types::kv::KV;\nuse crate::elem_types::ElemType;\nuse crate::transforms::group_by_key::KeyExtractor;\nuse crate::transforms::pardo::DoFn;\nuse crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};\nuse crate::worker::Receiver;\n\nstatic DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\nstatic KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\npub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {\n let mut do_fns = DO_FNS.lock().unwrap();\n let name = format!(\"object{}\", do_fns.len());\n do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));\n name\n}\n\npub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {\n let binding = DO_FNS.lock().unwrap();\n binding.get(name).copied()\n}\n\npub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {\n let mut kes = KEY_EXTRACTORS.lock().unwrap();\n let name = format!(\"object{}\", kes.len());\n kes.insert(name.to_string(), Box::leak(Box::new(ke)));\n name\n}\n\npub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {\n KEY_EXTRACTORS.lock().unwrap().get(name).copied()\n}\n\npub trait DynamicDoFn: Send + Sync {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);\n fn start_bundle_dyn(&self);\n fn finish_bundle_dyn(&self);\n}\n\nimpl<D: DoFn> DynamicDoFn for D {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {\n let typed_elem = elem.downcast_ref::<D::In>();\n for value in self.process(&typed_elem.value) {\n let windowed_value = typed_elem.with_value(value);\n for receiver in receivers {\n receiver.receive(DynamicWindowedValue::new(&windowed_value))\n }\n }\n }\n\n fn start_bundle_dyn(&self) {\n self.start_bundle()\n }\n\n fn finish_bundle_dyn(&self) {\n self.finish_bundle()\n }\n}\n\npub trait DynamicKeyExtractor: Sync + Send {\n fn new_grouped_values(&self) -> DynamicGroupedValues;\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);\n}\n\nimpl<V> DynamicKeyExtractor for KeyExtractor<V>\nwhere\n V: ElemType + Clone + fmt::Debug,\n{\n fn new_grouped_values(&self) -> DynamicGroupedValues {\n DynamicGroupedValues::new::<V>()\n }\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {\n grouped_values.downcast_mut::<V>().clear()\n }\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {\n let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;\n let grouped_values = grouped_values.downcast_mut::<V>();\n\n if !grouped_values.contains_key(k) {\n grouped_values.insert(k.clone(), Vec::new());\n }\n grouped_values.get_mut(k).unwrap().push(v.clone());\n }\n\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {\n let typed_grouped_values = grouped_values.downcast_ref::<V>();\n for (key, values) in typed_grouped_values.iter() {\n // TODO: timestamp and pane info are wrong\n for receiver in receivers.iter() {\n // TODO: End-of-window timestamp, only firing pane.\n let mut typed_values: Vec<V> = Vec::new();\n for value in values.iter() {\n typed_values.push(value.clone());\n }\n let res = KV {\n k: key.to_string(),\n v: typed_values,\n };\n receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(\n res,\n )));\n }\n }\n }\n}\n" | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment