Skip to content

Instantly share code, notes, and snippets.

@Jannis
Created August 29, 2018 11:59
Show Gist options
  • Select an option

  • Save Jannis/4b27ccfac18d51c63871ee820c7229e9 to your computer and use it in GitHub Desktop.

Select an option

Save Jannis/4b27ccfac18d51c63871ee820c7229e9 to your computer and use it in GitHub Desktop.
diff --git a/store/postgres/src/entity_changes.rs b/store/postgres/src/entity_changes.rs
index 9876b34..8a96c9d 100644
--- a/store/postgres/src/entity_changes.rs
+++ b/store/postgres/src/entity_changes.rs
@@ -10,25 +10,27 @@ use graph::serde_json;
pub struct EntityChangeListener {
output: Option<Receiver<EntityChange>>,
- thread_handle: Option<thread::JoinHandle<()>>,
- terminate_thread: Arc<RwLock<bool>>,
- thread_barrier: Arc<Barrier>,
+ worker_handle: Option<thread::JoinHandle<()>>,
+ terminate_worker: Arc<RwLock<bool>>,
+ worker_barrier: Arc<Barrier>,
}
impl EntityChangeListener {
pub fn new(url: String) -> Self {
- let (receiver, thread_handle, terminate_thread, thread_barrier) = Self::listen(url);
+ // Listen to Postgres notifications in a worker thread
+ let (receiver, worker_handle, terminate_worker, worker_barrier) = Self::listen(url);
EntityChangeListener {
output: Some(receiver),
- thread_handle: Some(thread_handle),
- terminate_thread,
- thread_barrier,
+ worker_handle: Some(worker_handle),
+ terminate_worker,
+ worker_barrier,
}
}
+ /// Begins processing notifications coming in from Postgres.
pub fn start(&self) {
- self.thread_barrier.wait();
+ self.worker_barrier.wait();
}
fn listen(
@@ -39,49 +41,44 @@ impl EntityChangeListener {
Arc<RwLock<bool>>,
Arc<Barrier>,
) {
+ // Create two ends of a boolean variable for signalling when the worker
+ // thread should be terminated
let terminate = Arc::new(RwLock::new(false));
- let terminate_thread = terminate.clone();
+ let terminate_worker = terminate.clone();
let barrier = Arc::new(Barrier::new(2));
- let thread_barrier = barrier.clone();
+ let worker_barrier = barrier.clone();
+ // Create a channel for entity changes
let (sender, receiver) = channel(100);
- let thread_handle = thread::spawn(move || {
- println!("LISTENER: Listen to entity changes");
-
+ let worker_handle = thread::spawn(move || {
+ // Connect to Postgres
let conn = Connection::connect(url, TlsMode::None)
.expect("failed to connect entity change listener to Postgres");
+ // Obtain a notifications iterator from Postgres
let notifications = conn.notifications();
let iter = notifications.timeout_iter(Duration::from_millis(500));
+ // Subscribe to the "entity_changes" notification channel in Postgres
conn.execute("LISTEN entity_changes", &[])
.expect("failed to listen to entity changes in Postgres");
+ // Wait until the listener has been started
barrier.wait();
- println!("LISTENER: barrier passed, start reading notifications now");
-
let mut notifications = iter.iterator();
// Read notifications as long as the Postgres connection is alive
// or the thread is to be terminated
loop {
- println!("LISTENER: loop");
-
+ // Terminate the thread if desired
let terminate_now = terminate.read().unwrap();
- println!("LISTENER: terminate? {:?}", *terminate_now);
if *terminate_now {
- println!("LISTENER: terminate");
return;
}
- println!("LISTENER: read next notification");
-
- let result = notifications.next();
- println!("LISTENER: next notification: {:?}", result);
-
- if let Some(Ok(notification)) = result {
+ if let Some(Ok(notification)) = notifications.next() {
// Only handle notifications from the "entity_changes" channel.
if notification.channel != String::from("entity_changes") {
continue;
@@ -98,8 +95,6 @@ impl EntityChangeListener {
).as_str(),
);
- println!("LISTENER: entity change: {:?}", change);
-
// We'll assume here that if sending fails, this means that the
// entity change listener has already been dropped, the receiving
// is gone and we should terminate the listener loop
@@ -110,21 +105,21 @@ impl EntityChangeListener {
}
});
- (receiver, thread_handle, terminate_thread, thread_barrier)
+ (receiver, worker_handle, terminate_worker, worker_barrier)
}
}
impl Drop for EntityChangeListener {
fn drop(&mut self) {
- println!("LISTENER: drop");
-
- if let Some(thread_handle) = self.thread_handle.take() {
+ // When dropping the change listener, also make sure we signal termination
+ // to the worker and wait for it to shut down
+ if let Some(worker_handle) = self.worker_handle.take() {
*self
- .terminate_thread
+ .terminate_worker
.write()
.expect("failed to signal termination to EntityChangeListener thread") = true;
- thread_handle
+ worker_handle
.join()
.expect("failed to terminate EntityChangeListener thread");
}
diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs
index f554763..782222a 100644
--- a/store/postgres/src/store.rs
+++ b/store/postgres/src/store.rs
@@ -106,59 +106,42 @@ impl Store {
let logger = self.logger.clone();
let subscriptions = self.subscriptions.clone();
- tokio::spawn(
- entity_changes
- .inspect(|change| println!("STORE: entity change: {:?}", change))
- .for_each(move |change| {
- debug!(logger, "Entity change";
+ tokio::spawn(entity_changes.for_each(move |change| {
+ debug!(logger, "Entity change";
"subgraph" => &change.subgraph,
"entity" => &change.entity,
"id" => &change.id);
- // Obtain IDs and senders of subscriptions matching the entity change
- let matches = subscriptions
- .read()
- .unwrap()
- .iter()
- .filter(|(_, subscription)| {
- subscription.subgraph == change.subgraph
- && subscription.entities.contains(&change.entity)
- })
- .map(|(id, subscription)| (id.clone(), subscription.sender.clone()))
- .collect::<Vec<_>>();
-
- println!("STORE: {} matching subscriptions", matches.len());
-
- let subscriptions = subscriptions.clone();
-
- // Write change to all matching subscription streams; remove subscriptions
- // whose receiving end has been dropped
- stream::iter_ok::<_, ()>(matches).for_each(move |(id, sender)| {
- println!("STORE: send change to subscription {}: {:?}", id, change);
- let subscriptions = subscriptions.clone();
- let err_id = id.clone();
- let err_change = change.clone();
- let sent_id = id.clone();
- let sent_change = change.clone();
- sender
- .send(change.clone())
- .map_err(move |_| {
- println!(
- "STORE: failed to send change to subscription {}: {:?}",
- err_id, err_change
- );
- subscriptions.write().unwrap().remove(&id);
- })
- .map(move |_| {
- println!(
- "STORE: successfully sent change to subscription {}: {:?}",
- sent_id, sent_change
- )
- })
- .and_then(|_| Ok(()))
+ // Obtain IDs and senders of subscriptions matching the entity change
+ let matches = subscriptions
+ .read()
+ .unwrap()
+ .iter()
+ .filter(|(_, subscription)| {
+ subscription.subgraph == change.subgraph
+ && subscription.entities.contains(&change.entity)
+ })
+ .map(|(id, subscription)| (id.clone(), subscription.sender.clone()))
+ .collect::<Vec<_>>();
+
+ let subscriptions = subscriptions.clone();
+
+ // Write change to all matching subscription streams; remove subscriptions
+ // whose receiving end has been dropped
+ stream::iter_ok::<_, ()>(matches).for_each(move |(id, sender)| {
+ let subscriptions = subscriptions.clone();
+ let err_id = id.clone();
+ let err_change = change.clone();
+ let sent_id = id.clone();
+ let sent_change = change.clone();
+ sender
+ .send(change.clone())
+ .map_err(move |_| {
+ subscriptions.write().unwrap().remove(&id);
})
- }),
- );
+ .and_then(|_| Ok(()))
+ })
+ }));
}
fn periodically_clean_up_stale_subscriptions(&mut self) {
@@ -379,7 +362,6 @@ impl StoreTrait for Store {
// Add the new subscription
let mut subscriptions = subscriptions.write().unwrap();
- println!("STORE: subscribe {}", id);
subscriptions.insert(id, subscription);
// Return the subscription ID and entity change stream
diff --git a/store/postgres/tests/store.rs b/store/postgres/tests/store.rs
index ed47501..a80a324 100644
--- a/store/postgres/tests/store.rs
+++ b/store/postgres/tests/store.rs
@@ -37,8 +37,6 @@ where
{
let mut runtime = tokio::runtime::Runtime::new().unwrap();
- println!("Create test data");
-
runtime
.block_on(future::lazy(|| {
insert_test_data();
@@ -46,14 +44,10 @@ where
}))
.expect("Failed to insert test data");
- println!("Test data created, run test");
-
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
runtime.block_on(future::lazy(|| test()))
}));
- println!("Test ran, remove test data");
-
runtime
.block_on(future::lazy(|| {
remove_test_data();
@@ -61,8 +55,6 @@ where
}))
.expect("Failed to remove test data");
- println!("Removed test data");
-
result.expect("Failed to run test").expect("Test failed");
}
@@ -1655,8 +1647,6 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
// Create a store subscription
let subscription = store.subscribe(String::from("subgraph-id"), vec![String::from("User")]);
- println!("TEST: subscribed");
-
// Add two entities to the store
let added_entities = vec![
(
@@ -1683,13 +1673,11 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
id: id.clone(),
},
entity.clone(),
- EventSource::EthereumBlock(H256::zero()),
+ EventSource::EthereumBlock(H256::random()),
)
.expect("failed to add entity to the store");
}
- println!("TEST: entities added");
-
// Update an entity in the store
let updated_entity = Entity::from(vec![
("id", Value::from("1")),
@@ -1703,12 +1691,10 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
id: String::from("1"),
},
updated_entity.clone(),
- EventSource::EthereumBlock(H256::zero()),
+ EventSource::EthereumBlock(H256::random()),
)
.expect("failed to update entity in the store");
- println!("TEST: entity updated");
-
// Delete an entity in the store
store
.delete(
@@ -1717,15 +1703,12 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
entity: String::from("User"),
id: String::from("2"),
},
- EventSource::EthereumBlock(H256::zero()),
+ EventSource::EthereumBlock(H256::random()),
)
.expect("failed to delete entity from the store");
- println!("TEST: entity deleted, now reading stream");
-
- //// We're expecting three events to be written to the subscription stream
+ // We're expecting four events to be written to the subscription stream
subscription
- .inspect(|change| println!("STREAM: entity change: {:?}", change))
.take(4)
.collect()
.and_then(move |changes| {
@@ -1734,36 +1717,35 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
// be terminated as well
let _store = store;
- println!("STREAM: changes: {:#?}", changes);
- // assert_eq!(
- // changes,
- // vec![
- // EntityChange {
- // subgraph: String::from("subgraph-id"),
- // entity: String::from("User"),
- // id: added_entities[0].clone().0,
- // operation: EntityChangeOperation::Added,
- // },
- // EntityChange {
- // subgraph: String::from("subgraph-id"),
- // entity: String::from("User"),
- // id: added_entities[1].clone().0,
- // operation: EntityChangeOperation::Added,
- // },
- // EntityChange {
- // subgraph: String::from("subgraph-id"),
- // entity: String::from("User"),
- // id: String::from("1"),
- // operation: EntityChangeOperation::Updated,
- // },
- // EntityChange {
- // subgraph: String::from("subgraph-id"),
- // entity: String::from("User"),
- // id: added_entities[1].clone().0,
- // operation: EntityChangeOperation::Removed,
- // },
- // ]
- // );
+ assert_eq!(
+ changes,
+ vec![
+ EntityChange {
+ subgraph: String::from("subgraph-id"),
+ entity: String::from("User"),
+ id: added_entities[0].clone().0,
+ operation: EntityChangeOperation::Added,
+ },
+ EntityChange {
+ subgraph: String::from("subgraph-id"),
+ entity: String::from("User"),
+ id: added_entities[1].clone().0,
+ operation: EntityChangeOperation::Added,
+ },
+ EntityChange {
+ subgraph: String::from("subgraph-id"),
+ entity: String::from("User"),
+ id: String::from("1"),
+ operation: EntityChangeOperation::Updated,
+ },
+ EntityChange {
+ subgraph: String::from("subgraph-id"),
+ entity: String::from("User"),
+ id: added_entities[1].clone().0,
+ operation: EntityChangeOperation::Removed,
+ },
+ ]
+ );
Ok(())
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment