Skip to content

Instantly share code, notes, and snippets.

@laysakura
Created June 21, 2023 00:09
Show Gist options
  • Select an option

  • Save laysakura/8eff115fa5426adce9200285e2789125 to your computer and use it in GitHub Desktop.

Select an option

Save laysakura/8eff115fa5426adce9200285e2789125 to your computer and use it in GitHub Desktop.
{
"$schema": "https://aka.ms/codetour-schema",
"title": "DirectRunnerの役割",
"steps": [
{
"file": "tests/primitives_test.rs",
"description": "エントリポイントとして読んでいく",
"line": 74,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\n#[cfg(test)]\nmod tests {\n use apache_beam::elem_types::kv::KV;\n use apache_beam::internals::pvalue::PValue;\n use apache_beam::runners::direct_runner::DirectRunner;\n use apache_beam::runners::runner::RunnerI;\n use apache_beam::transforms::create::Create;\n use apache_beam::transforms::flatten::Flatten;\n use apache_beam::transforms::group_by_key::GroupByKey;\n use apache_beam::transforms::impulse::Impulse;\n use apache_beam::transforms::pardo::ParDo;\n use apache_beam::transforms::testing::AssertEqualUnordered;\n\n #[tokio::test]\n async fn run_direct_runner() {\n DirectRunner::new()\n .run(|root| root.apply(Impulse::new()))\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n // This tests that AssertEqualUnordered is actually doing its job.\n async fn ensure_assert_fails() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(AssertEqualUnordered::new(&[1, 2, 4]))\n })\n .await;\n }\n\n #[tokio::test]\n #[should_panic]\n async fn ensure_assert_fails_on_empty() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![]))\n .apply(AssertEqualUnordered::new(&[1]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_map() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![1, 2, 3]))\n .apply(ParDo::from_map(|x: &i32| -> i32 { x * x }))\n .apply(AssertEqualUnordered::new(&[1, 4, 9]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_gbk() {\n DirectRunner::new()\n .run(|root| {\n root.apply(Create::new(vec![\n KV::new(\"a\".to_string(), 1),\n KV::new(\"a\".to_string(), 2),\n KV::new(\"b\".to_string(), 3),\n ]))\n .apply(GroupByKey::default())\n .apply(AssertEqualUnordered::new(&[\n KV::new(\"a\".to_string(), vec![1, 2]),\n KV::new(\"b\".to_string(), vec![3]),\n ]))\n })\n .await;\n }\n\n #[tokio::test]\n async fn run_flatten() {\n DirectRunner::new()\n .run(|root| {\n let first = root.apply(Create::new(vec![1, 2, 3]));\n let second = root.apply(Create::new(vec![100, 200]));\n PValue::new_array(&[first, second])\n .apply(Flatten::new())\n .apply(AssertEqualUnordered::new(&[1, 2, 3, 100, 200]))\n })\n .await;\n }\n}\n"
},
{
"file": "src/runners/runner.rs",
"description": "run() は最終的に run_pipeline() まで実行される。各runnerはrun_pipeline()を実装。\nつまり、現在の実装では、Runnerは必ずproto化されたパイプラインを受け取ることになる。\nSpringQLでもとりあえずはこれがきれいなインターフェイスの切り方だと思うが、シリアライズ・デシリアライズのコストが気になるならば設計を見直す必要はあるかもしれない。",
"line": 41,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse futures::future::Future;\nuse std::{pin::Pin, sync::Arc};\n\nuse async_trait::async_trait;\n\nuse crate::elem_types::ElemType;\nuse crate::internals::pvalue::PValue;\nuse crate::proto::pipeline_v1;\n\npub type Task = Pin<Box<dyn Future<Output = ()> + Send>>;\n\n// TODO: implement PipelineResult\n\n/// A Runner is the object that takes a pipeline definition and actually\n/// executes, e.g. locally or on a distributed system.\n#[async_trait]\npub trait RunnerI {\n fn new() -> Self;\n\n /// Runs the transform.\n /// Resolves to an instance of PipelineResult when the pipeline completes.\n /// Use run_async() to execute the pipeline in the background.\n async fn run<Out, F>(&self, pipeline: F)\n where\n Out: ElemType,\n F: FnOnce(PValue<()>) -> PValue<Out> + Send, // TODO: Don't require a return value.\n {\n self.run_async(pipeline).await;\n }\n\n /// run_async() is the asynchronous version of run(), does not wait until\n /// pipeline finishes. Use the returned PipelineResult to query job\n /// status.\n async fn run_async<Out, F>(&self, pipeline: F)\n where\n Out: ElemType,\n F: FnOnce(PValue<()>) -> PValue<Out> + Send,\n {\n let root = PValue::<()>::root();\n let inner_pipeline = root.get_pipeline_arc();\n\n (pipeline)(root); // pipeline construction, affecting root's inner pipeline object\n\n self.run_pipeline(inner_pipeline.get_proto()).await;\n }\n\n async fn run_pipeline(&self, pipeline: Arc<std::sync::Mutex<pipeline_v1::Pipeline>>);\n}\n"
},
{
"file": "src/runners/direct_runner.rs",
"description": "DirectRunnerはFn APIに定義されているProcessBundleDescriptorを作り(これってDirectRunner以外のRunnerにも共通してるよね?)、",
"line": 80,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::sync::Arc;\n\nuse async_trait::async_trait;\n\nuse crate::proto::{fn_execution_v1, pipeline_v1};\nuse crate::worker::sdk_worker::BundleProcessor;\n\nuse crate::runners::runner::RunnerI;\n\npub struct DirectRunner;\n\n#[async_trait]\nimpl RunnerI for DirectRunner {\n fn new() -> Self {\n Self\n }\n\n async fn run_pipeline(&self, pipeline: Arc<std::sync::Mutex<pipeline_v1::Pipeline>>) {\n let descriptor: fn_execution_v1::ProcessBundleDescriptor;\n {\n let p = pipeline.lock().unwrap();\n\n // TODO: use this to define the descriptor instead of p\n // let proto = rewrite_side_inputs(pipeline, state_cache_ref);\n\n // TODO: review cloning\n descriptor = fn_execution_v1::ProcessBundleDescriptor {\n id: \"\".to_string(),\n transforms: p\n .components\n .as_ref()\n .expect(\"Missing components\")\n .transforms\n .clone(),\n pcollections: p\n .components\n .as_ref()\n .expect(\"Missing PCollections\")\n .pcollections\n .clone(),\n windowing_strategies: p\n .components\n .as_ref()\n .expect(\"Missing windowing strategies\")\n .windowing_strategies\n .clone(),\n coders: p\n .components\n .as_ref()\n .expect(\"Missing coders\")\n .coders\n .clone(),\n environments: p\n .components\n .as_ref()\n .expect(\"Missing environments\")\n .environments\n .clone(),\n state_api_service_descriptor: None,\n timer_api_service_descriptor: None,\n };\n }\n\n let processor = BundleProcessor::new(Arc::new(descriptor), &[crate::runners::IMPULSE_URN]);\n\n processor.process(\"bundle_id\".to_string()).await;\n }\n}\n"
},
{
"file": "src/runners/direct_runner.rs",
"description": "Runner内でBundleProcessor::process()を呼んでいる。\n本来であれば、RunnerはFn APIを呼び出して (w/ bundle descriptor) 、Workerに処理を委譲し、結果を返してもらう。\nRunnerがWorkerも兼ねているという点でDirectRunnerなのだとは言えそう。",
"line": 84,
"contents": "/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\nuse std::sync::Arc;\n\nuse async_trait::async_trait;\n\nuse crate::proto::{fn_execution_v1, pipeline_v1};\nuse crate::worker::sdk_worker::BundleProcessor;\n\nuse crate::runners::runner::RunnerI;\n\npub struct DirectRunner;\n\n#[async_trait]\nimpl RunnerI for DirectRunner {\n fn new() -> Self {\n Self\n }\n\n async fn run_pipeline(&self, pipeline: Arc<std::sync::Mutex<pipeline_v1::Pipeline>>) {\n let descriptor: fn_execution_v1::ProcessBundleDescriptor;\n {\n let p = pipeline.lock().unwrap();\n\n // TODO: use this to define the descriptor instead of p\n // let proto = rewrite_side_inputs(pipeline, state_cache_ref);\n\n // TODO: review cloning\n descriptor = fn_execution_v1::ProcessBundleDescriptor {\n id: \"\".to_string(),\n transforms: p\n .components\n .as_ref()\n .expect(\"Missing components\")\n .transforms\n .clone(),\n pcollections: p\n .components\n .as_ref()\n .expect(\"Missing PCollections\")\n .pcollections\n .clone(),\n windowing_strategies: p\n .components\n .as_ref()\n .expect(\"Missing windowing strategies\")\n .windowing_strategies\n .clone(),\n coders: p\n .components\n .as_ref()\n .expect(\"Missing coders\")\n .coders\n .clone(),\n environments: p\n .components\n .as_ref()\n .expect(\"Missing environments\")\n .environments\n .clone(),\n state_api_service_descriptor: None,\n timer_api_service_descriptor: None,\n };\n }\n\n let processor = BundleProcessor::new(Arc::new(descriptor), &[crate::runners::IMPULSE_URN]);\n\n processor.process(\"bundle_id\".to_string()).await;\n }\n}\n"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment