Created
August 29, 2018 11:59
-
-
Save Jannis/4b27ccfac18d51c63871ee820c7229e9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/store/postgres/src/entity_changes.rs b/store/postgres/src/entity_changes.rs | |
| index 9876b34..8a96c9d 100644 | |
| --- a/store/postgres/src/entity_changes.rs | |
| +++ b/store/postgres/src/entity_changes.rs | |
| @@ -10,25 +10,27 @@ use graph::serde_json; | |
| pub struct EntityChangeListener { | |
| output: Option<Receiver<EntityChange>>, | |
| - thread_handle: Option<thread::JoinHandle<()>>, | |
| - terminate_thread: Arc<RwLock<bool>>, | |
| - thread_barrier: Arc<Barrier>, | |
| + worker_handle: Option<thread::JoinHandle<()>>, | |
| + terminate_worker: Arc<RwLock<bool>>, | |
| + worker_barrier: Arc<Barrier>, | |
| } | |
| impl EntityChangeListener { | |
| pub fn new(url: String) -> Self { | |
| - let (receiver, thread_handle, terminate_thread, thread_barrier) = Self::listen(url); | |
| + // Listen to Postgres notifications in a worker thread | |
| + let (receiver, worker_handle, terminate_worker, worker_barrier) = Self::listen(url); | |
| EntityChangeListener { | |
| output: Some(receiver), | |
| - thread_handle: Some(thread_handle), | |
| - terminate_thread, | |
| - thread_barrier, | |
| + worker_handle: Some(worker_handle), | |
| + terminate_worker, | |
| + worker_barrier, | |
| } | |
| } | |
| + /// Begins processing notifications coming in from Postgres. | |
| pub fn start(&self) { | |
| - self.thread_barrier.wait(); | |
| + self.worker_barrier.wait(); | |
| } | |
| fn listen( | |
| @@ -39,49 +41,44 @@ impl EntityChangeListener { | |
| Arc<RwLock<bool>>, | |
| Arc<Barrier>, | |
| ) { | |
| + // Create two ends of a boolean variable for signalling when the worker | |
| + // thread should be terminated | |
| let terminate = Arc::new(RwLock::new(false)); | |
| - let terminate_thread = terminate.clone(); | |
| + let terminate_worker = terminate.clone(); | |
| let barrier = Arc::new(Barrier::new(2)); | |
| - let thread_barrier = barrier.clone(); | |
| + let worker_barrier = barrier.clone(); | |
| + // Create a channel for entity changes | |
| let (sender, receiver) = channel(100); | |
| - let thread_handle = thread::spawn(move || { | |
| - println!("LISTENER: Listen to entity changes"); | |
| - | |
| + let worker_handle = thread::spawn(move || { | |
| + // Connect to Postgres | |
| let conn = Connection::connect(url, TlsMode::None) | |
| .expect("failed to connect entity change listener to Postgres"); | |
| + // Obtain a notifications iterator from Postgres | |
| let notifications = conn.notifications(); | |
| let iter = notifications.timeout_iter(Duration::from_millis(500)); | |
| + // Subscribe to the "entity_changes" notification channel in Postgres | |
| conn.execute("LISTEN entity_changes", &[]) | |
| .expect("failed to listen to entity changes in Postgres"); | |
| + // Wait until the listener has been started | |
| barrier.wait(); | |
| - println!("LISTENER: barrier passed, start reading notifications now"); | |
| - | |
| let mut notifications = iter.iterator(); | |
| // Read notifications as long as the Postgres connection is alive | |
| // or the thread is to be terminated | |
| loop { | |
| - println!("LISTENER: loop"); | |
| - | |
| + // Terminate the thread if desired | |
| let terminate_now = terminate.read().unwrap(); | |
| - println!("LISTENER: terminate? {:?}", *terminate_now); | |
| if *terminate_now { | |
| - println!("LISTENER: terminate"); | |
| return; | |
| } | |
| - println!("LISTENER: read next notification"); | |
| - | |
| - let result = notifications.next(); | |
| - println!("LISTENER: next notification: {:?}", result); | |
| - | |
| - if let Some(Ok(notification)) = result { | |
| + if let Some(Ok(notification)) = notifications.next() { | |
| // Only handle notifications from the "entity_changes" channel. | |
| if notification.channel != String::from("entity_changes") { | |
| continue; | |
| @@ -98,8 +95,6 @@ impl EntityChangeListener { | |
| ).as_str(), | |
| ); | |
| - println!("LISTENER: entity change: {:?}", change); | |
| - | |
| // We'll assume here that if sending fails, this means that the | |
| // entity change listener has already been dropped, the receiving | |
| // is gone and we should terminate the listener loop | |
| @@ -110,21 +105,21 @@ impl EntityChangeListener { | |
| } | |
| }); | |
| - (receiver, thread_handle, terminate_thread, thread_barrier) | |
| + (receiver, worker_handle, terminate_worker, worker_barrier) | |
| } | |
| } | |
| impl Drop for EntityChangeListener { | |
| fn drop(&mut self) { | |
| - println!("LISTENER: drop"); | |
| - | |
| - if let Some(thread_handle) = self.thread_handle.take() { | |
| + // When dropping the change listener, also make sure we signal termination | |
| + // to the worker and wait for it to shut down | |
| + if let Some(worker_handle) = self.worker_handle.take() { | |
| *self | |
| - .terminate_thread | |
| + .terminate_worker | |
| .write() | |
| .expect("failed to signal termination to EntityChangeListener thread") = true; | |
| - thread_handle | |
| + worker_handle | |
| .join() | |
| .expect("failed to terminate EntityChangeListener thread"); | |
| } | |
| diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs | |
| index f554763..782222a 100644 | |
| --- a/store/postgres/src/store.rs | |
| +++ b/store/postgres/src/store.rs | |
| @@ -106,59 +106,42 @@ impl Store { | |
| let logger = self.logger.clone(); | |
| let subscriptions = self.subscriptions.clone(); | |
| - tokio::spawn( | |
| - entity_changes | |
| - .inspect(|change| println!("STORE: entity change: {:?}", change)) | |
| - .for_each(move |change| { | |
| - debug!(logger, "Entity change"; | |
| + tokio::spawn(entity_changes.for_each(move |change| { | |
| + debug!(logger, "Entity change"; | |
| "subgraph" => &change.subgraph, | |
| "entity" => &change.entity, | |
| "id" => &change.id); | |
| - // Obtain IDs and senders of subscriptions matching the entity change | |
| - let matches = subscriptions | |
| - .read() | |
| - .unwrap() | |
| - .iter() | |
| - .filter(|(_, subscription)| { | |
| - subscription.subgraph == change.subgraph | |
| - && subscription.entities.contains(&change.entity) | |
| - }) | |
| - .map(|(id, subscription)| (id.clone(), subscription.sender.clone())) | |
| - .collect::<Vec<_>>(); | |
| - | |
| - println!("STORE: {} matching subscriptions", matches.len()); | |
| - | |
| - let subscriptions = subscriptions.clone(); | |
| - | |
| - // Write change to all matching subscription streams; remove subscriptions | |
| - // whose receiving end has been dropped | |
| - stream::iter_ok::<_, ()>(matches).for_each(move |(id, sender)| { | |
| - println!("STORE: send change to subscription {}: {:?}", id, change); | |
| - let subscriptions = subscriptions.clone(); | |
| - let err_id = id.clone(); | |
| - let err_change = change.clone(); | |
| - let sent_id = id.clone(); | |
| - let sent_change = change.clone(); | |
| - sender | |
| - .send(change.clone()) | |
| - .map_err(move |_| { | |
| - println!( | |
| - "STORE: failed to send change to subscription {}: {:?}", | |
| - err_id, err_change | |
| - ); | |
| - subscriptions.write().unwrap().remove(&id); | |
| - }) | |
| - .map(move |_| { | |
| - println!( | |
| - "STORE: successfully sent change to subscription {}: {:?}", | |
| - sent_id, sent_change | |
| - ) | |
| - }) | |
| - .and_then(|_| Ok(())) | |
| + // Obtain IDs and senders of subscriptions matching the entity change | |
| + let matches = subscriptions | |
| + .read() | |
| + .unwrap() | |
| + .iter() | |
| + .filter(|(_, subscription)| { | |
| + subscription.subgraph == change.subgraph | |
| + && subscription.entities.contains(&change.entity) | |
| + }) | |
| + .map(|(id, subscription)| (id.clone(), subscription.sender.clone())) | |
| + .collect::<Vec<_>>(); | |
| + | |
| + let subscriptions = subscriptions.clone(); | |
| + | |
| + // Write change to all matching subscription streams; remove subscriptions | |
| + // whose receiving end has been dropped | |
| + stream::iter_ok::<_, ()>(matches).for_each(move |(id, sender)| { | |
| + let subscriptions = subscriptions.clone(); | |
| + let err_id = id.clone(); | |
| + let err_change = change.clone(); | |
| + let sent_id = id.clone(); | |
| + let sent_change = change.clone(); | |
| + sender | |
| + .send(change.clone()) | |
| + .map_err(move |_| { | |
| + subscriptions.write().unwrap().remove(&id); | |
| }) | |
| - }), | |
| - ); | |
| + .and_then(|_| Ok(())) | |
| + }) | |
| + })); | |
| } | |
| fn periodically_clean_up_stale_subscriptions(&mut self) { | |
| @@ -379,7 +362,6 @@ impl StoreTrait for Store { | |
| // Add the new subscription | |
| let mut subscriptions = subscriptions.write().unwrap(); | |
| - println!("STORE: subscribe {}", id); | |
| subscriptions.insert(id, subscription); | |
| // Return the subscription ID and entity change stream | |
| diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs | |
| index ed47501..a80a324 100644 | |
| --- a/store/postgres/tests/store.rs | |
| +++ b/store/postgres/tests/store.rs | |
| @@ -37,8 +37,6 @@ where | |
| { | |
| let mut runtime = tokio::runtime::Runtime::new().unwrap(); | |
| - println!("Create test data"); | |
| - | |
| runtime | |
| .block_on(future::lazy(|| { | |
| insert_test_data(); | |
| @@ -46,14 +44,10 @@ where | |
| })) | |
| .expect("Failed to insert test data"); | |
| - println!("Test data created, run test"); | |
| - | |
| let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { | |
| runtime.block_on(future::lazy(|| test())) | |
| })); | |
| - println!("Test ran, remove test data"); | |
| - | |
| runtime | |
| .block_on(future::lazy(|| { | |
| remove_test_data(); | |
| @@ -61,8 +55,6 @@ where | |
| })) | |
| .expect("Failed to remove test data"); | |
| - println!("Removed test data"); | |
| - | |
| result.expect("Failed to run test").expect("Test failed"); | |
| } | |
| @@ -1655,8 +1647,6 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { | |
| // Create a store subscription | |
| let subscription = store.subscribe(String::from("subgraph-id"), vec![String::from("User")]); | |
| - println!("TEST: subscribed"); | |
| - | |
| // Add two entities to the store | |
| let added_entities = vec![ | |
| ( | |
| @@ -1683,13 +1673,11 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { | |
| id: id.clone(), | |
| }, | |
| entity.clone(), | |
| - EventSource::EthereumBlock(H256::zero()), | |
| + EventSource::EthereumBlock(H256::random()), | |
| ) | |
| .expect("failed to add entity to the store"); | |
| } | |
| - println!("TEST: entities added"); | |
| - | |
| // Update an entity in the store | |
| let updated_entity = Entity::from(vec![ | |
| ("id", Value::from("1")), | |
| @@ -1703,12 +1691,10 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { | |
| id: String::from("1"), | |
| }, | |
| updated_entity.clone(), | |
| - EventSource::EthereumBlock(H256::zero()), | |
| + EventSource::EthereumBlock(H256::random()), | |
| ) | |
| .expect("failed to update entity in the store"); | |
| - println!("TEST: entity updated"); | |
| - | |
| // Delete an entity in the store | |
| store | |
| .delete( | |
| @@ -1717,15 +1703,12 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { | |
| entity: String::from("User"), | |
| id: String::from("2"), | |
| }, | |
| - EventSource::EthereumBlock(H256::zero()), | |
| + EventSource::EthereumBlock(H256::random()), | |
| ) | |
| .expect("failed to delete entity from the store"); | |
| - println!("TEST: entity deleted, now reading stream"); | |
| - | |
| - //// We're expecting three events to be written to the subscription stream | |
| + // We're expecting four events to be written to the subscription stream | |
| subscription | |
| - .inspect(|change| println!("STREAM: entity change: {:?}", change)) | |
| .take(4) | |
| .collect() | |
| .and_then(move |changes| { | |
| @@ -1734,36 +1717,35 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { | |
| // be terminated as well | |
| let _store = store; | |
| - println!("STREAM: changes: {:#?}", changes); | |
| - // assert_eq!( | |
| - // changes, | |
| - // vec![ | |
| - // EntityChange { | |
| - // subgraph: String::from("subgraph-id"), | |
| - // entity: String::from("User"), | |
| - // id: added_entities[0].clone().0, | |
| - // operation: EntityChangeOperation::Added, | |
| - // }, | |
| - // EntityChange { | |
| - // subgraph: String::from("subgraph-id"), | |
| - // entity: String::from("User"), | |
| - // id: added_entities[1].clone().0, | |
| - // operation: EntityChangeOperation::Added, | |
| - // }, | |
| - // EntityChange { | |
| - // subgraph: String::from("subgraph-id"), | |
| - // entity: String::from("User"), | |
| - // id: String::from("1"), | |
| - // operation: EntityChangeOperation::Updated, | |
| - // }, | |
| - // EntityChange { | |
| - // subgraph: String::from("subgraph-id"), | |
| - // entity: String::from("User"), | |
| - // id: added_entities[1].clone().0, | |
| - // operation: EntityChangeOperation::Removed, | |
| - // }, | |
| - // ] | |
| - // ); | |
| + assert_eq!( | |
| + changes, | |
| + vec![ | |
| + EntityChange { | |
| + subgraph: String::from("subgraph-id"), | |
| + entity: String::from("User"), | |
| + id: added_entities[0].clone().0, | |
| + operation: EntityChangeOperation::Added, | |
| + }, | |
| + EntityChange { | |
| + subgraph: String::from("subgraph-id"), | |
| + entity: String::from("User"), | |
| + id: added_entities[1].clone().0, | |
| + operation: EntityChangeOperation::Added, | |
| + }, | |
| + EntityChange { | |
| + subgraph: String::from("subgraph-id"), | |
| + entity: String::from("User"), | |
| + id: String::from("1"), | |
| + operation: EntityChangeOperation::Updated, | |
| + }, | |
| + EntityChange { | |
| + subgraph: String::from("subgraph-id"), | |
| + entity: String::from("User"), | |
| + id: added_entities[1].clone().0, | |
| + operation: EntityChangeOperation::Removed, | |
| + }, | |
| + ] | |
| + ); | |
| Ok(()) | |
| }) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment