Skip to content

Instantly share code, notes, and snippets.

@yifan-gu
Last active August 29, 2015 14:01
Show Gist options
  • Save yifan-gu/195d6b807f0f33dd0584 to your computer and use it in GitHub Desktop.
Save yifan-gu/195d6b807f0f33dd0584 to your computer and use it in GitHub Desktop.
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 85ca5c4..c197da9 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -35,6 +35,7 @@
#include <process/pid.hpp>
#include <process/subprocess.hpp>
+#include <stout/duration.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
@@ -585,3 +586,178 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint)
Shutdown();
}
+
+
+// This test ensures that when a previously unregistered slave is shutting
+// down, it will not keep trying to re-register with the master.
+TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
+{
+ // Start a master and a slave.
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ // Create a MockExecutor to enable us to catch ShutdownExecutorMessage later.
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+ slave::Flags slaveFlags = this->CreateSlaveFlags();
+ // Set recovery flags
+ //slaveFlags.checkpoint = true;
+ //slaveFlags.recover = "reconnect";
+ //slaveFlags.strict = true;
+ slaveFlags.executor_shutdown_grace_period = Duration::max();
+
+ StandaloneMasterDetector detector(master.get());
+ Try<PID<Slave> > slave = this->StartSlave(
+ &containerizer,
+ &detector,
+ slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Create a task on the slave.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> >offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait for TASK_RUNNING update.
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Pause the clock here so when the master comes back,
+ // the slave will not send multiple re-register messages
+ // before we change its state to TERMINATING.
+ Clock::pause();
+
+ // Let the slave to do re-registeration.
+ detector.appoint(master.get());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ DROP_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
+
+
+ LOG(WARNING) << "kick start";
+ // Make sure the slave has called doReliableRegistration()
+ // before we change the slave's state
+ AWAIT_READY(slaveReregisteredMessage);
+ LOG(WARNING) << "reregistered message captured";
+
+ DROP_PROTOBUFS(ShutdownExecutorMessage(), _, _);
+ //Future<ShutdownExecutorMessage> shut =
+ // DROP_PROTOBUF(ShutdownExecutorMessage(), _, _);
+ //Future<ShutdownExecutorMessage> shut1 =
+ // DROP_PROTOBUF(ShutdownExecutorMessage(), _, _);
+ //Future<process::Message> shuut =
+ // FUTURE_MESSAGE(Eq(ShutdownExecutorMessage().GetTypeName()), _, _);
+ //Future<Nothing> shuut =
+ // FUTURE_DISPATCH(_, &Slave::shutdownExecutor);
+
+ process::post(slave.get(), ShutdownMessage());
+
+ LOG(WARNING) << "kick stop";
+
+ Clock::settle();
+ Clock::resume();
+
+ //AWAIT_READY(shut);
+ //AWAIT_READY(shut1);
+ //AWAIT_READY(shuut);
+
+ LOG(WARNING) << "receive shut";
+
+ for (int i = 0; i < 10; i++) {
+ LOG(WARNING) << "some reregister " << i;
+ Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+ FUTURE_PROTOBUF(ReregisterSlaveMessage(), master.get(), _);
+ AWAIT_READY(reregisterSlaveMessage);
+ }
+
+
+ //Future<Nothing> executorTerminated =
+ //FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ driver.killTask(task.task_id());
+
+ LOG(WARNING) << "after killTask return";
+
+ //AWAIT_READY(executorTerminated);
+
+ LOG(WARNING) << "get executorTerminated";
+
+ //process::post(libprocessPid, ShutdownExecutorMessage());
+
+ driver.stop();
+ driver.join();
+
+ this->Stop(slave.get());
+ this->Stop(master.get());
+
+ //Future<process::http::Response> response =
+ // process::http::get(slave.get(), "stats.json");
+ //
+ //AWAIT_READY(response);
+ //
+ //EXPECT_SOME_EQ(
+ // "application/json",
+ // response.get().headers.get("Content-Type"));
+ //
+ //Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ //
+ //ASSERT_SOME(parse);
+ //
+ //JSON::Object state = parse.get();
+ ////EXPECT_EQ(0u, state.values["registered"]);
+ //
+ //for (auto &p : state.values) {
+ // std::cout << p.first << ": " << p.second << std::endl;
+ //}
+
+ //Future<Nothing> executorShutdown =
+ // FUTURE_DISPATCH(_, &Slave::shutdownExecutor);
+ //
+ //this->Stop(slave.get());
+ //EXPECT_CALL(exec, shutdown(_))
+ // .Times(AtMost(1));
+ //
+ //AWAIT_READY(executorShutdown);
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment