Skip to content

Instantly share code, notes, and snippets.

@laysakura
Created June 21, 2023 00:56
Show Gist options
  • Select an option

  • Save laysakura/592332b3e478db9318ba6e889f66ac8f to your computer and use it in GitHub Desktop.

Select an option

Save laysakura/592332b3e478db9318ba6e889f66ac8f to your computer and use it in GitHub Desktop.
{
"$schema": "https://aka.ms/codetour-schema",
"title": "UDF実行の仕組み",
"steps": [
{
"file": "tests/primitives_test.rs",
"description": "ここをエントリポイントとして読む",
"line": 74,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n"
},
{
"file": "src/internals/pipeline.rs",
"description": "PTransformはFunctionSpecを持つ(それがFn APIでWorkerに渡る)はずだが、PCollection::apply()から呼ばれるここで、FunctionSpecはNoneに設定されている。\n現状のコードだとこの後FunctionSpecがSomeになることはないので、FunctionSpecからの関数deserはしていない。",
"line": 225,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::collections::{HashMap, HashSet};\nuse std::sync::{Arc, Mutex};\n\nuse crate::coders::CoderUrnTree;\nuse crate::elem_types::ElemType;\nuse crate::proto::pipeline_v1;\n\nuse crate::internals::pvalue::{flatten_pvalue, PTransform, PValue};\n\nconst _CODER_ID_PREFIX: &str = \"coder_\";\n\n/// A part of a `Pipeline` to help construct / look up it.\nstruct PipelineContext {\n component_prefix: String,\n counter: Arc<Mutex<usize>>,\n}\n\nimpl PipelineContext {\n pub fn new(component_prefix: String) -> Self {\n Self {\n component_prefix,\n counter: Arc::new(Mutex::new(0)),\n }\n }\n\n pub fn create_unique_name(&self, prefix: String) -> String {\n format!(\n \"{}{}_{}\",\n self.component_prefix,\n prefix,\n self.get_and_increment_counter()\n )\n }\n\n fn get_and_increment_counter(&self) -> usize {\n let mut counter = self.counter.lock().unwrap();\n *counter += 1;\n\n *counter - 1\n }\n}\n\n/// Corresponds to the `Pipeline` in Runner API's proto.\n///\n/// The pipeline is instantiated on `Runner::run()`, and used by a (remote or direct) runner.\npub struct Pipeline {\n context: PipelineContext,\n default_environment: String,\n proto: Arc<Mutex<pipeline_v1::Pipeline>>,\n transform_stack: Arc<Mutex<Vec<String>>>,\n used_stage_names: Arc<Mutex<HashSet<String>>>,\n\n coder_proto_counter: Mutex<usize>,\n}\n\nimpl Pipeline {\n pub fn new(component_prefix: String) -> Self {\n let proto = pipeline_v1::Pipeline {\n components: Some(pipeline_v1::Components {\n transforms: HashMap::with_capacity(0),\n pcollections: HashMap::with_capacity(0),\n windowing_strategies: HashMap::with_capacity(0),\n coders: HashMap::with_capacity(0),\n environments: HashMap::with_capacity(0),\n }),\n root_transform_ids: Vec::with_capacity(0),\n display_data: Vec::with_capacity(0),\n requirements: Vec::with_capacity(0),\n };\n\n Pipeline {\n context: PipelineContext::new(component_prefix.clone()),\n default_environment: format!(\"{}rustEnvironment\", component_prefix),\n proto: Arc::new(Mutex::new(proto)),\n transform_stack: Arc::new(Mutex::new(Vec::with_capacity(0))),\n used_stage_names: Arc::new(Mutex::new(HashSet::with_capacity(0))),\n\n coder_proto_counter: Mutex::new(0),\n }\n }\n\n pub fn get_proto(&self) -> Arc<std::sync::Mutex<pipeline_v1::Pipeline>> {\n self.proto.clone()\n }\n\n /// Recursively construct a coder (and its component coders) in protocol buffer representation for the Runner API.\n /// A coder in protobuf format can be shared with other components such as Beam runners,\n /// SDK workers; and reconstructed into its runtime representation if necessary.\n fn coder_to_proto(&self, coder_urn_tree: &CoderUrnTree) -> pipeline_v1::Coder {\n fn helper(coder_urn: &str, component_coder_ids: Vec<String>) -> pipeline_v1::Coder {\n let spec = pipeline_v1::FunctionSpec {\n urn: coder_urn.to_string(),\n payload: vec![], // unused in Rust SDK\n };\n pipeline_v1::Coder {\n spec: Some(spec),\n component_coder_ids,\n }\n }\n\n let component_coder_ids = coder_urn_tree\n .component_coder_urns\n .iter()\n .map(|component_coder_urn| {\n let coder_proto = self.coder_to_proto(component_coder_urn);\n self.get_coder_id(coder_proto)\n })\n .collect();\n\n helper(&coder_urn_tree.coder_urn, component_coder_ids)\n }\n\n /// If the `coder_proto` is already registered, return its ID.\n /// Else, the `coder_proto` is registered and its newly-created ID is returned.\n fn get_coder_id(&self, coder_proto: pipeline_v1::Coder) -> String {\n let mut pipeline_proto = self.proto.lock().unwrap();\n\n let proto_coders = &mut pipeline_proto.components.as_mut().unwrap().coders;\n\n for (coder_id, coder) in proto_coders.iter() {\n if *coder == coder_proto {\n return coder_id.clone();\n }\n }\n\n let mut coder_counter = self.coder_proto_counter.lock().unwrap();\n *coder_counter += 1;\n let new_coder_id = format!(\"{}{}\", _CODER_ID_PREFIX, *coder_counter);\n proto_coders.insert(new_coder_id.clone(), coder_proto);\n\n new_coder_id\n }\n\n pub fn register_proto_transform(&self, transform: pipeline_v1::PTransform) {\n let mut pipeline_proto = self.proto.lock().unwrap();\n\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform.unique_name.clone(), transform);\n }\n\n pub fn pre_apply_transform<In, Out, F>(\n &self,\n _transform: &F,\n input: &PValue<In>,\n ) -> (String, pipeline_v1::PTransform)\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n let transform_id = self.context.create_unique_name(\"transform\".to_string());\n let mut parent: Option<&pipeline_v1::PTransform> = None;\n\n let mut pipeline_proto = self.proto.lock().unwrap();\n let transform_stack = self.transform_stack.lock().unwrap();\n\n if transform_stack.is_empty() {\n pipeline_proto.root_transform_ids.push(transform_id.clone());\n } else {\n let p = pipeline_proto\n .components\n .as_mut()\n .expect(\"No components on pipeline proto\")\n .transforms\n .get_mut(transform_stack.last().expect(\"Transform stack is empty\"))\n .expect(\"Transform ID not registered on pipeline proto\");\n\n let p_subtransforms: &mut Vec<String> = p.subtransforms.as_mut();\n p_subtransforms.push(transform_id.clone());\n\n parent = Some(p);\n }\n drop(transform_stack);\n\n let parent_name = match parent {\n Some(p) => {\n format!(\"{}/\", p.unique_name.clone())\n }\n None => \"\".to_string(),\n };\n\n // TODO: extract unique transform name properly\n let bad_transform_name = crate::internals::utils::get_bad_id();\n let unique_name = format!(\"{}{}\", parent_name, bad_transform_name);\n\n {\n let mut used_stage_names = self.used_stage_names.lock().unwrap();\n\n if used_stage_names.contains(&unique_name) {\n panic!(\"Duplicate stage name: {}\", unique_name)\n }\n used_stage_names.insert(unique_name.clone());\n }\n\n let flattened = flatten_pvalue(input, None);\n let mut inputs: HashMap<String, String> = HashMap::new();\n for (name, id) in flattened {\n inputs.insert(name.clone(), id);\n }\n\n let transform_proto = pipeline_v1::PTransform {\n unique_name,\n spec: None,\n subtransforms: Vec::with_capacity(0),\n inputs,\n outputs: HashMap::with_capacity(0),\n display_data: Vec::with_capacity(0),\n environment_id: self.default_environment.clone(),\n annotations: HashMap::with_capacity(0),\n };\n\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform_id.clone(), transform_proto.clone());\n\n (transform_id, transform_proto)\n }\n\n pub(crate) fn apply_transform<In, Out, F>(\n &self,\n transform: F,\n input: &PValue<In>,\n pipeline: Arc<Pipeline>,\n\n // Coder's URN that encode/decode `Out`.\n out_coder_urn: &CoderUrnTree,\n ) -> PValue<Out>\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n // TODO: Inline pre_apply and post_apply.\n // (They exist in typescript only to share code between the sync and\n // async variants).\n let (transform_id, mut transform_proto) = self.pre_apply_transform(&transform, input);\n\n {\n let mut transform_stack = self.transform_stack.lock().unwrap();\n transform_stack.push(transform_id.clone());\n drop(transform_stack);\n }\n\n let result = transform.expand(input, pipeline, out_coder_urn, &mut transform_proto);\n\n for (name, id) in flatten_pvalue(&result, None) {\n // Causes test to hang...\n transform_proto.outputs.insert(name.clone(), id);\n }\n\n // Re-insert the transform with its outputs and any mutation that\n // expand_internal performed.\n let mut pipeline_proto = self.proto.lock().unwrap();\n // This may have been mutated.\n // TODO: Perhaps only insert at the end?\n transform_proto.subtransforms = pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .get(&transform_id)\n .unwrap()\n .subtransforms\n .clone();\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .transforms\n .insert(transform_id, transform_proto.clone());\n drop(pipeline_proto);\n\n // TODO: ensure this happens even if an error takes place above\n {\n let mut transform_stack = self.transform_stack.lock().unwrap();\n transform_stack.pop();\n drop(transform_stack);\n }\n\n self.post_apply_transform(transform, transform_proto, result)\n }\n\n // TODO: deal with bounds and windows\n pub fn post_apply_transform<In, Out, F>(\n &self,\n _transform: F,\n _transform_proto: pipeline_v1::PTransform,\n result: PValue<Out>,\n ) -> PValue<Out>\n where\n In: ElemType,\n Out: ElemType + Clone,\n F: PTransform<In, Out> + Send,\n {\n result\n }\n\n pub(crate) fn create_pcollection_internal<Out>(\n &self,\n coder_urn_tree: &CoderUrnTree,\n pipeline: Arc<Pipeline>,\n ) -> PValue<Out>\n where\n Out: ElemType,\n {\n let coder_id = {\n let coder_proto = self.coder_to_proto(coder_urn_tree);\n self.get_coder_id(coder_proto)\n };\n\n PValue::new(\n crate::internals::pvalue::PType::PCollection,\n pipeline,\n self.create_pcollection_id_internal(coder_id),\n coder_urn_tree.coder_urn.to_string(),\n )\n }\n\n fn create_pcollection_id_internal(&self, coder_id: String) -> String {\n let pcoll_id = self.context.create_unique_name(\"pc\".to_string());\n let pcoll_proto: pipeline_v1::PCollection = pipeline_v1::PCollection {\n unique_name: pcoll_id.clone(),\n coder_id,\n ..Default::default()\n };\n\n let mut pipeline_proto = self.proto.lock().unwrap();\n pipeline_proto\n .components\n .as_mut()\n .unwrap()\n .pcollections\n .insert(pcoll_id.clone(), pcoll_proto);\n\n pcoll_id\n }\n}\n\nimpl Default for Pipeline {\n fn default() -> Self {\n Self::new(\"\".to_string())\n }\n}\n\n#[cfg(test)]\nmod tests {\n use crate::{\n coders::{\n required_coders::BytesCoder,\n urns::{BYTES_CODER_URN, ITERABLE_CODER_URN},\n },\n transforms::impulse::Impulse,\n };\n\n use super::*;\n\n fn coder_urn_from_pvalue<E: ElemType>(pvalue: &PValue<E>) -> String {\n let coder_urn_tree = CoderUrnTree::from(pvalue);\n coder_urn_tree.coder_urn\n }\n\n #[test]\n fn test_default_coder_in_proto() {\n let root = PValue::<()>::root();\n let pvalue = root.apply(Impulse::new());\n\n let coder_urn = coder_urn_from_pvalue(&pvalue);\n assert_eq!(coder_urn, ITERABLE_CODER_URN);\n }\n\n #[test]\n fn test_override_coder_in_proto() {\n let root = PValue::<()>::root();\n let pvalue = root.apply_with_coder::<BytesCoder, _, _>(Impulse::new());\n\n let coder_urn = coder_urn_from_pvalue(&pvalue);\n assert_eq!(coder_urn, BYTES_CODER_URN);\n }\n}\n"
},
{
"file": "src/internals/serialize.rs",
"description": "結局のところ、DoFn (ParDoのインスタンス的なもの、つまりUDF) は、ここでトレイトオブジェクト *のアドレス(!)* として格納されている。\nそれがキーから復元され呼び出されている。\n\nトレイトオブジェクトのアドレスはFn APIをまたぐと当然異なるので、この機構はFn APIとしては破綻している。",
"line": 17,
"contents": "use std::collections::HashMap;\n\nuse std::boxed::Box;\nuse std::fmt;\nuse std::sync::{Arc, Mutex};\n\nuse once_cell::sync::Lazy;\n\nuse crate::elem_types::kv::KV;\nuse crate::elem_types::ElemType;\nuse crate::transforms::group_by_key::KeyExtractor;\nuse crate::transforms::pardo::DoFn;\nuse crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};\nuse crate::worker::Receiver;\n\nstatic DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\nstatic KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =\n Lazy::new(|| Mutex::new(HashMap::new()));\n\npub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {\n let mut do_fns = DO_FNS.lock().unwrap();\n let name = format!(\"object{}\", do_fns.len());\n do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));\n name\n}\n\npub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {\n let binding = DO_FNS.lock().unwrap();\n binding.get(name).copied()\n}\n\npub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {\n let mut kes = KEY_EXTRACTORS.lock().unwrap();\n let name = format!(\"object{}\", kes.len());\n kes.insert(name.to_string(), Box::leak(Box::new(ke)));\n name\n}\n\npub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {\n KEY_EXTRACTORS.lock().unwrap().get(name).copied()\n}\n\npub trait DynamicDoFn: Send + Sync {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);\n fn start_bundle_dyn(&self);\n fn finish_bundle_dyn(&self);\n}\n\nimpl<D: DoFn> DynamicDoFn for D {\n fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {\n let typed_elem = elem.downcast_ref::<D::In>();\n for value in self.process(&typed_elem.value) {\n let windowed_value = typed_elem.with_value(value);\n for receiver in receivers {\n receiver.receive(DynamicWindowedValue::new(&windowed_value))\n }\n }\n }\n\n fn start_bundle_dyn(&self) {\n self.start_bundle()\n }\n\n fn finish_bundle_dyn(&self) {\n self.finish_bundle()\n }\n}\n\npub trait DynamicKeyExtractor: Sync + Send {\n fn new_grouped_values(&self) -> DynamicGroupedValues;\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);\n}\n\nimpl<V> DynamicKeyExtractor for KeyExtractor<V>\nwhere\n V: ElemType + Clone + fmt::Debug,\n{\n fn new_grouped_values(&self) -> DynamicGroupedValues {\n DynamicGroupedValues::new::<V>()\n }\n fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {\n grouped_values.downcast_mut::<V>().clear()\n }\n fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {\n let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;\n let grouped_values = grouped_values.downcast_mut::<V>();\n\n if !grouped_values.contains_key(k) {\n grouped_values.insert(k.clone(), Vec::new());\n }\n grouped_values.get_mut(k).unwrap().push(v.clone());\n }\n\n fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {\n let typed_grouped_values = grouped_values.downcast_ref::<V>();\n for (key, values) in typed_grouped_values.iter() {\n // TODO: timestamp and pane info are wrong\n for receiver in receivers.iter() {\n // TODO: End-of-window timestamp, only firing pane.\n let mut typed_values: Vec<V> = Vec::new();\n for value in values.iter() {\n typed_values.push(value.clone());\n }\n let res = KV {\n k: key.to_string(),\n v: typed_values,\n };\n receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(\n res,\n )));\n }\n }\n }\n}\n"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment