Last active
April 1, 2024 00:13
-
-
Save allada/23f6d9f11854c830d48828a1f00ae8a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/Cargo.lock b/Cargo.lock | |
index e563095..c0e1fee 100644 | |
--- a/Cargo.lock | |
+++ b/Cargo.lock | |
@@ -1915,6 +1915,7 @@ dependencies = [ | |
"serde_json5", | |
"shlex", | |
"tokio", | |
+ "tokio-fork", | |
"tokio-stream", | |
"tonic 0.11.0", | |
"tracing", | |
@@ -2964,6 +2965,16 @@ dependencies = [ | |
"windows-sys 0.48.0", | |
] | |
+[[package]] | |
+name = "tokio-fork" | |
+version = "0.2.1" | |
+source = "registry+https://github.com/rust-lang/crates.io-index" | |
+checksum = "008f916174b9d488089bbf4309aa1b07fe7a87686318bd6d77a692b937de2bfb" | |
+dependencies = [ | |
+ "libc", | |
+ "tokio", | |
+] | |
+ | |
[[package]] | |
name = "tokio-io-timeout" | |
version = "1.2.0" | |
diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel | |
index 9abaf1e..33286a5 100644 | |
--- a/nativelink-worker/BUILD.bazel | |
+++ b/nativelink-worker/BUILD.bazel | |
@@ -78,6 +78,7 @@ rust_test_suite( | |
"@crates//:rand", | |
"@crates//:tokio", | |
"@crates//:tonic", | |
+ "@crates//:tokio-fork", | |
], | |
) | |
diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml | |
index 0635ef9..210dad8 100644 | |
--- a/nativelink-worker/Cargo.toml | |
+++ b/nativelink-worker/Cargo.toml | |
@@ -40,3 +40,4 @@ once_cell = "1.19.0" | |
pretty_assertions = "1.4.0" | |
prost-types = "0.12.3" | |
rand = "0.8.5" | |
+tokio-fork = "0.2.1" | |
diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs | |
index 6c54afd..841e8b4 100644 | |
--- a/nativelink-worker/tests/running_actions_manager_test.rs | |
+++ b/nativelink-worker/tests/running_actions_manager_test.rs | |
@@ -2933,191 +2933,224 @@ exit 1 | |
// We've experienced deadlocks when uploading, so make only a single permit available and | |
// check it's able to handle uploading some directories with some files in. | |
- // Be default this test is ignored because it *must* be run single threaded... to run this | |
- // test execute: | |
- // cargo test -p nativelink-worker --test running_actions_manager_test -- --test-threads=1 --ignored | |
- #[tokio::test] | |
- #[ignore] | |
- async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> { | |
+ #[test] | |
+ fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> { | |
+ unsafe { | |
+ let fork = tokio_fork::fork()?; | |
+ match fork { | |
+ tokio_fork::Fork::Child => { | |
+ let panics = std::panic::catch_unwind(|| { | |
+ tokio::runtime::Builder::new_current_thread() | |
+ .enable_all() | |
+ .build() | |
+ .unwrap() | |
+ .block_on(inner_test()) | |
+ }); | |
+ // Do not trust rust's test wrapper exit code, as it may be incorrect. | |
+ // We manually kill the process after this test if we are a child with | |
+ // the correct exit code. | |
+ std::process::exit(match panics { | |
+ Ok(Ok(())) => 0, | |
+ Ok(Err(e)) => { | |
+ eprintln!("Error: {e:?}"); | |
+ 1 | |
+ } | |
+ Err(e) => { | |
+ eprintln!("Error: {e:?}"); | |
+ 1 | |
+ }, | |
+ }); | |
+ } | |
+ tokio_fork::Fork::Parent(mut child) => { | |
+ let exit_status = child.block().expect("Failed to wait for child process"); | |
+ assert!( | |
+ exit_status.success(), | |
+ "Child process failed. This probably means test failed" | |
+ ); | |
+ return Ok(()); | |
+ } | |
+ } | |
+ } | |
const WORKER_ID: &str = "foo_worker_id"; | |
fn test_monotonic_clock() -> SystemTime { | |
static CLOCK: AtomicU64 = AtomicU64::new(0); | |
monotonic_clock(&CLOCK) | |
} | |
+ async fn inner_test() -> Result<(), Box<dyn std::error::Error>> { | |
+ let (_, slow_store, cas_store, ac_store) = setup_stores().await?; | |
+ let root_work_directory = make_temp_path("root_work_directory"); | |
+ fs::create_dir_all(&root_work_directory).await?; | |
+ | |
+ // Take all but one FD permit away. | |
+ let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits()) | |
+ .then(|_| fs::OPEN_FILE_SEMAPHORE.acquire()) | |
+ .try_collect::<Vec<_>>() | |
+ .await?; | |
+ assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits()); | |
- let (_, slow_store, cas_store, ac_store) = setup_stores().await?; | |
- let root_work_directory = make_temp_path("root_work_directory"); | |
- fs::create_dir_all(&root_work_directory).await?; | |
- | |
- // Take all but one FD permit away. | |
- let _permits = futures::stream::iter(1..fs::OPEN_FILE_SEMAPHORE.available_permits()) | |
- .then(|_| fs::OPEN_FILE_SEMAPHORE.acquire()) | |
- .try_collect::<Vec<_>>() | |
- .await?; | |
- assert_eq!(1, fs::OPEN_FILE_SEMAPHORE.available_permits()); | |
- | |
- let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( | |
- RunningActionsManagerArgs { | |
- root_work_directory, | |
- execution_configuration: ExecutionConfiguration::default(), | |
- cas_store: Pin::into_inner(cas_store.clone()), | |
- ac_store: Some(Pin::into_inner(ac_store.clone())), | |
- historical_store: Pin::into_inner(cas_store.clone()), | |
- upload_action_result_config: | |
- &nativelink_config::cas_server::UploadActionResultConfig { | |
- upload_ac_results_strategy: | |
- nativelink_config::cas_server::UploadCacheResultsStrategy::never, | |
+ let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( | |
+ RunningActionsManagerArgs { | |
+ root_work_directory, | |
+ execution_configuration: ExecutionConfiguration::default(), | |
+ cas_store: Pin::into_inner(cas_store.clone()), | |
+ ac_store: Some(Pin::into_inner(ac_store.clone())), | |
+ historical_store: Pin::into_inner(cas_store.clone()), | |
+ upload_action_result_config: | |
+ &nativelink_config::cas_server::UploadActionResultConfig { | |
+ upload_ac_results_strategy: | |
+ nativelink_config::cas_server::UploadCacheResultsStrategy::never, | |
+ ..Default::default() | |
+ }, | |
+ max_action_timeout: Duration::MAX, | |
+ timeout_handled_externally: false, | |
+ }, | |
+ Callbacks { | |
+ now_fn: test_monotonic_clock, | |
+ sleep_fn: |_duration| Box::pin(futures::future::pending()), | |
+ }, | |
+ )?); | |
+ let action_result = { | |
+ const SALT: u64 = 55; | |
+ #[cfg(target_family = "unix")] | |
+ let arguments = vec![ | |
+ "sh".to_string(), | |
+ "-c".to_string(), | |
+ "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '" | |
+ .to_string(), | |
+ ]; | |
+ #[cfg(target_family = "windows")] | |
+ let arguments = vec![ | |
+ "cmd".to_string(), | |
+ "/C".to_string(), | |
+ // Note: Windows adds two spaces after 'set /p=XXX'. | |
+ "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0" | |
+ .to_string(), | |
+ ]; | |
+ let working_directory = "some_cwd"; | |
+ let command = Command { | |
+ arguments, | |
+ output_paths: vec!["test.txt".to_string(), "tst".to_string()], | |
+ working_directory: working_directory.to_string(), | |
+ ..Default::default() | |
+ }; | |
+ let command_digest = serialize_and_upload_message( | |
+ &command, | |
+ cas_store.as_ref(), | |
+ &mut DigestHasherFunc::Sha256.hasher(), | |
+ ) | |
+ .await?; | |
+ let input_root_digest = serialize_and_upload_message( | |
+ &Directory { | |
+ directories: vec![DirectoryNode { | |
+ name: working_directory.to_string(), | |
+ digest: Some( | |
+ serialize_and_upload_message( | |
+ &Directory::default(), | |
+ cas_store.as_ref(), | |
+ &mut DigestHasherFunc::Sha256.hasher(), | |
+ ) | |
+ .await? | |
+ .into(), | |
+ ), | |
+ }], | |
..Default::default() | |
}, | |
- max_action_timeout: Duration::MAX, | |
- timeout_handled_externally: false, | |
- }, | |
- Callbacks { | |
- now_fn: test_monotonic_clock, | |
- sleep_fn: |_duration| Box::pin(futures::future::pending()), | |
- }, | |
- )?); | |
- let action_result = { | |
- const SALT: u64 = 55; | |
- #[cfg(target_family = "unix")] | |
- let arguments = vec![ | |
- "sh".to_string(), | |
- "-c".to_string(), | |
- "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr '" | |
- .to_string(), | |
- ]; | |
- #[cfg(target_family = "windows")] | |
- let arguments = vec![ | |
- "cmd".to_string(), | |
- "/C".to_string(), | |
- // Note: Windows adds two spaces after 'set /p=XXX'. | |
- "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0" | |
- .to_string(), | |
- ]; | |
- let working_directory = "some_cwd"; | |
- let command = Command { | |
- arguments, | |
- output_paths: vec!["test.txt".to_string(), "tst".to_string()], | |
- working_directory: working_directory.to_string(), | |
- ..Default::default() | |
- }; | |
- let command_digest = serialize_and_upload_message( | |
- &command, | |
- cas_store.as_ref(), | |
- &mut DigestHasherFunc::Sha256.hasher(), | |
- ) | |
- .await?; | |
- let input_root_digest = serialize_and_upload_message( | |
- &Directory { | |
- directories: vec![DirectoryNode { | |
- name: working_directory.to_string(), | |
- digest: Some( | |
- serialize_and_upload_message( | |
- &Directory::default(), | |
- cas_store.as_ref(), | |
- &mut DigestHasherFunc::Sha256.hasher(), | |
- ) | |
- .await? | |
- .into(), | |
- ), | |
- }], | |
+ cas_store.as_ref(), | |
+ &mut DigestHasherFunc::Sha256.hasher(), | |
+ ) | |
+ .await?; | |
+ let action = Action { | |
+ command_digest: Some(command_digest.into()), | |
+ input_root_digest: Some(input_root_digest.into()), | |
..Default::default() | |
- }, | |
- cas_store.as_ref(), | |
- &mut DigestHasherFunc::Sha256.hasher(), | |
- ) | |
- .await?; | |
- let action = Action { | |
- command_digest: Some(command_digest.into()), | |
- input_root_digest: Some(input_root_digest.into()), | |
- ..Default::default() | |
- }; | |
- let action_digest = serialize_and_upload_message( | |
- &action, | |
- cas_store.as_ref(), | |
- &mut DigestHasherFunc::Sha256.hasher(), | |
- ) | |
- .await?; | |
- | |
- let running_action_impl = running_actions_manager | |
- .create_and_add_action( | |
- WORKER_ID.to_string(), | |
- StartExecute { | |
- execute_request: Some(ExecuteRequest { | |
- action_digest: Some(action_digest.into()), | |
- ..Default::default() | |
- }), | |
- salt: SALT, | |
- queued_timestamp: None, | |
- }, | |
+ }; | |
+ let action_digest = serialize_and_upload_message( | |
+ &action, | |
+ cas_store.as_ref(), | |
+ &mut DigestHasherFunc::Sha256.hasher(), | |
) | |
.await?; | |
- run_action(running_action_impl.clone()).await? | |
- }; | |
- let file_content = slow_store | |
- .as_ref() | |
- .get_part_unchunked(action_result.output_files[0].digest, 0, None, None) | |
- .await?; | |
- assert_eq!(from_utf8(&file_content)?, "123 "); | |
- let stdout_content = slow_store | |
- .as_ref() | |
- .get_part_unchunked(action_result.stdout_digest, 0, None, None) | |
- .await?; | |
- assert_eq!(from_utf8(&stdout_content)?, "foo-stdout "); | |
- let stderr_content = slow_store | |
- .as_ref() | |
- .get_part_unchunked(action_result.stderr_digest, 0, None, None) | |
- .await?; | |
- assert_eq!(from_utf8(&stderr_content)?, "bar-stderr "); | |
- let mut clock_time = make_system_time(0); | |
- assert_eq!( | |
- action_result, | |
- ActionResult { | |
- output_files: vec![FileInfo { | |
- name_or_path: NameOrPath::Path("test.txt".to_string()), | |
- digest: DigestInfo::try_new( | |
- "c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b", | |
- 4 | |
+ let running_action_impl = running_actions_manager | |
+ .create_and_add_action( | |
+ WORKER_ID.to_string(), | |
+ StartExecute { | |
+ execute_request: Some(ExecuteRequest { | |
+ action_digest: Some(action_digest.into()), | |
+ ..Default::default() | |
+ }), | |
+ salt: SALT, | |
+ queued_timestamp: None, | |
+ }, | |
+ ) | |
+ .await?; | |
+ | |
+ run_action(running_action_impl.clone()).await? | |
+ }; | |
+ let file_content = slow_store | |
+ .as_ref() | |
+ .get_part_unchunked(action_result.output_files[0].digest, 0, None, None) | |
+ .await?; | |
+ assert_eq!(from_utf8(&file_content)?, "123 "); | |
+ let stdout_content = slow_store | |
+ .as_ref() | |
+ .get_part_unchunked(action_result.stdout_digest, 0, None, None) | |
+ .await?; | |
+ assert_eq!(from_utf8(&stdout_content)?, "foo-stdout "); | |
+ let stderr_content = slow_store | |
+ .as_ref() | |
+ .get_part_unchunked(action_result.stderr_digest, 0, None, None) | |
+ .await?; | |
+ assert_eq!(from_utf8(&stderr_content)?, "bar-stderr "); | |
+ let mut clock_time = make_system_time(0); | |
+ assert_eq!( | |
+ action_result, | |
+ ActionResult { | |
+ output_files: vec![FileInfo { | |
+ name_or_path: NameOrPath::Path("test.txt".to_string()), | |
+ digest: DigestInfo::try_new( | |
+ "c69e10a5f54f4e28e33897fbd4f8701595443fa8c3004aeaa20dd4d9a463483b", | |
+ 4 | |
+ )?, | |
+ is_executable: false, | |
+ }], | |
+ stdout_digest: DigestInfo::try_new( | |
+ "15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94", | |
+ 11 | |
)?, | |
- is_executable: false, | |
- }], | |
- stdout_digest: DigestInfo::try_new( | |
- "15019a676f057d97d1ad3af86f3cc1e623cb33b18ff28422bbe3248d2471cc94", | |
- 11 | |
- )?, | |
- stderr_digest: DigestInfo::try_new( | |
- "2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06", | |
- 12 | |
- )?, | |
- exit_code: 0, | |
- output_folders: vec![DirectoryInfo { | |
- path: "tst".to_string(), | |
- tree_digest: DigestInfo::try_new( | |
- "95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3", | |
- 166, | |
+ stderr_digest: DigestInfo::try_new( | |
+ "2375ab8a01ca11e1ea7606dfb58756c153d49733cde1dbfb5a1e00f39afacf06", | |
+ 12 | |
)?, | |
- },], | |
- output_file_symlinks: vec![], | |
- output_directory_symlinks: vec![], | |
- server_logs: HashMap::new(), | |
- execution_metadata: ExecutionMetadata { | |
- worker: WORKER_ID.to_string(), | |
- queued_timestamp: SystemTime::UNIX_EPOCH, | |
- worker_start_timestamp: increment_clock(&mut clock_time), | |
- input_fetch_start_timestamp: increment_clock(&mut clock_time), | |
- input_fetch_completed_timestamp: increment_clock(&mut clock_time), | |
- execution_start_timestamp: increment_clock(&mut clock_time), | |
- execution_completed_timestamp: increment_clock(&mut clock_time), | |
- output_upload_start_timestamp: increment_clock(&mut clock_time), | |
- output_upload_completed_timestamp: increment_clock(&mut clock_time), | |
- worker_completed_timestamp: increment_clock(&mut clock_time), | |
- }, | |
- error: None, | |
- message: String::new(), | |
- } | |
- ); | |
- Ok(()) | |
+ exit_code: 0, | |
+ output_folders: vec![DirectoryInfo { | |
+ path: "tst".to_string(), | |
+ tree_digest: DigestInfo::try_new( | |
+ "95711c1905d4898a70209dd6e98241dcafb479c00241a1ea4ed8415710d706f3", | |
+ 166, | |
+ )?, | |
+ },], | |
+ output_file_symlinks: vec![], | |
+ output_directory_symlinks: vec![], | |
+ server_logs: HashMap::new(), | |
+ execution_metadata: ExecutionMetadata { | |
+ worker: WORKER_ID.to_string(), | |
+ queued_timestamp: SystemTime::UNIX_EPOCH, | |
+ worker_start_timestamp: increment_clock(&mut clock_time), | |
+ input_fetch_start_timestamp: increment_clock(&mut clock_time), | |
+ input_fetch_completed_timestamp: increment_clock(&mut clock_time), | |
+ execution_start_timestamp: increment_clock(&mut clock_time), | |
+ execution_completed_timestamp: increment_clock(&mut clock_time), | |
+ output_upload_start_timestamp: increment_clock(&mut clock_time), | |
+ output_upload_completed_timestamp: increment_clock(&mut clock_time), | |
+ worker_completed_timestamp: increment_clock(&mut clock_time), | |
+ }, | |
+ error: None, | |
+ message: String::new(), | |
+ } | |
+ ); | |
+ Ok(()) | |
+ } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment