Created
October 8, 2015 02:46
-
-
Save BenWhitehead/8267c72e4e894c2e2b29 to your computer and use it in GitHub Desktop.
Cassandra-Mesos Dynamic reservation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final Optional<TaskResources> reserveResourcesForHost = cassandraCluster.shouldCreateReservation(marker, offer); | |
if (reserveResourcesForHost.isPresent()) { | |
final TaskResources res = reserveResourcesForHost.get(); | |
final List<Resource> resources = newArrayList( | |
reserveCpu(res.getCpuCores(), mesosRole, principal), | |
reserveMem(res.getMemMb(), mesosRole, principal), | |
reserveDisk(res.getDiskMb(), mesosRole, principal), | |
reservePorts(res.getPortsList(), mesosRole, principal) | |
); | |
final Offer.Operation reservation = Offer.Operation.newBuilder() | |
.setType(Offer.Operation.Type.RESERVE) | |
.setReserve( | |
Offer.Operation.Reserve.newBuilder() | |
.addAllResources(resources) | |
.build() | |
) | |
.build(); | |
driver.acceptOffers(Collections.singletonList(offer.getId()), Collections.singletonList(reservation), Filters.getDefaultInstance()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@NotNull | |
public Optional<TaskResources> shouldCreateReservation(@NotNull final Marker marker, @NotNull final Protos.Offer offer) { | |
if (!configuration.isReserveRequired()) { | |
LOGGER.info(marker, "Resources Reservation is not enabled"); | |
return Optional.absent(); | |
} else if (cassandraNodeForHostname(offer.getHostname()).isPresent()) { | |
LOGGER.info(marker, "Cassandra node already allocated for host."); | |
return Optional.absent(); | |
} else { | |
final NodeCounts nodeCounts = clusterState.nodeCounts(); | |
final long now = clock.now().getMillis(); | |
final long nextPossibleServerLaunchTimestamp = nextPossibleServerLaunchTimestamp(); | |
final boolean serverLaunchTimeoutActive = !canLaunchServerTask(now, nextPossibleServerLaunchTimestamp); | |
if (nodeCounts.getNodeCount() >= configuration.targetNumberOfNodes()) { | |
LOGGER.debug(marker, "Number of desired Cassandra Nodes Acquired, no need to create Resource Reservation."); | |
return Optional.absent(); | |
} else if (serverLaunchTimeoutActive) { | |
final long nextPossibleServerLaunchSeconds = secondsUntilNextPossibleServerLaunch(now, nextPossibleServerLaunchTimestamp); | |
LOGGER.info(marker, "Preventing creation of new node because server launch timeout active. Next server launch possible in {}s", nextPossibleServerLaunchSeconds); | |
return Optional.absent(); | |
} else { | |
final CassandraConfigRole configRole = configuration.getDefaultConfigRole(); | |
final CassandraFrameworkConfiguration config = configuration.get(); | |
final TaskResources allResources = add( | |
add(EXECUTOR_RESOURCES, METADATA_TASK_RESOURCES), | |
configRole.getResources() | |
); | |
final List<String> executorSizeErrors = hasResources( | |
offer, | |
allResources, | |
portMappings(config), | |
configRole.getMesosRole() | |
); | |
if (executorSizeErrors.isEmpty()) { | |
LOGGER.info(marker, "Attempting to create Resource Reservation"); | |
return Optional.of(allResources); | |
} else { | |
LOGGER.info( | |
marker, | |
"Insufficient resources in offer for executor, not attempting to launch new node. Details for offer {}: ['{}']", | |
offer.getId().getValue(), JOINER.join(executorSizeErrors) | |
); | |
return Optional.absent(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment