Skip to content

Instantly share code, notes, and snippets.

@mpenick
Last active November 27, 2018 21:00
Show Gist options
  • Save mpenick/dbfc37a9005b3edfaa3a2df9726f8300 to your computer and use it in GitHub Desktop.
Save mpenick/dbfc37a9005b3edfaa3a2df9726f8300 to your computer and use it in GitHub Desktop.
class HostEventFuture : public cass::Future {
public:
typedef SharedRefPtr<HostEventFuture> Ptr;
enum Type {
INVALID,
START_NODE,
STOP_NODE,
ADD_NODE,
REMOVE_NODE
};
typedef std::pair<Type, Address> Event;
HostEventFuture()
: cass::Future(cass::Future::FUTURE_TYPE_GENERIC) { }
void set_event(Type type, const Address& host) {
cass::ScopedMutex lock(&mutex_);
if (!is_set()) {
event_ = Event(type, host);
internal_set(lock);
}
}
Event wait_for_event(uint64_t timeout_us) {
cass::ScopedMutex lock(&mutex_);
return internal_wait_for(lock, timeout_us) ? event_ : Event(INVALID, Address());
}
private:
Event event_;
};
class TestHostListener : public cass::DefaultHostListener {
public:
typedef cass::SharedRefPtr<TestHostListener> Ptr;
TestHostListener() {
events_.push_back(
HostEventFuture::Ptr(
Memory::allocate<HostEventFuture>()));
uv_mutex_init(&mutex_);
}
~TestHostListener() {
uv_mutex_destroy(&mutex_);
}
virtual void on_up(const cass::Host::Ptr& host) {
push_back(HostEventFuture::START_NODE, host);
}
virtual void on_down(const cass::Host::Ptr& host) {
push_back(HostEventFuture::STOP_NODE, host);
}
virtual void on_add(const cass::Host::Ptr& host) {
push_back(HostEventFuture::ADD_NODE, host);
}
virtual void on_remove(const cass::Host::Ptr& host) {
push_back(HostEventFuture::REMOVE_NODE, host);
}
HostEventFuture::Event wait_for_event(uint64_t timeout_us) {
HostEventFuture::Event event(front()->wait_for_event(timeout_us));
pop_front();
return event;
}
size_t event_count() {
cass::ScopedMutex lock(&mutex_);
size_t count = events_.size();
return events_.front()->ready() ? count : count - 1;
}
private:
HostEventFuture::Ptr front() {
cass::ScopedMutex lock(&mutex_);
return events_.front();
}
void pop_front() {
cass::ScopedMutex lock(&mutex_);
events_.pop_front();
}
void push_back(HostEventFuture::Type type, const cass::Host::Ptr& host) {
cass::ScopedMutex lock(&mutex_);
events_.back()->set_event(type, host->address());
events_.push_back(
HostEventFuture::Ptr(
Memory::allocate<HostEventFuture>()));
}
private:
uv_mutex_t mutex_;
cass::Deque<HostEventFuture::Ptr> events_;
};
TEST_F(SessionUnitTest, HostListener) {
mockssandra::SimpleCluster cluster(simple(), 2);
ASSERT_EQ(cluster.start_all(), 0);
TestHostListener::Ptr listener(cass::Memory::allocate<TestHostListener>());
cass::Config config;
config.set_reconnect_wait_time(100); // Reconnect immediately
config.contact_points().push_back("127.0.0.2");
config.set_host_listener(listener);
cass::Session session;
connect(config, &session);
ASSERT_EQ(listener->event_count(), 0u);
{
cluster.remove(1);
EXPECT_EQ(listener->wait_for_event(WAIT_FOR_TIME),
HostEventFuture::Event(HostEventFuture::REMOVE_NODE, Address("127.0.0.1", 9042)));
}
{
cluster.add(1);
EXPECT_EQ(listener->wait_for_event(WAIT_FOR_TIME),
HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.1", 9042)));
}
{
cluster.stop(2);
EXPECT_EQ(listener->wait_for_event(WAIT_FOR_TIME),
HostEventFuture::Event(HostEventFuture::STOP_NODE, Address("127.0.0.2", 9042)));
}
{
cluster.start(2);
EXPECT_EQ(listener->wait_for_event(WAIT_FOR_TIME),
HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.2", 9042)));
}
close(&session);
// Fails. This should be 0.
//ASSERT_EQ(listener->event_count(), 0u);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment