Created
April 4, 2016 19:48
-
-
Save jsanda/c77e6c730251f4cb0df4a2051c8c9c1c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates | |
* and other contributors as indicated by the @author tags. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.hawkular.metrics.api.jaxrs; | |
import static java.util.concurrent.TimeUnit.MILLISECONDS; | |
import static java.util.concurrent.TimeUnit.MINUTES; | |
import static java.util.concurrent.TimeUnit.NANOSECONDS; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_CQL_PORT; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_KEYSPACE; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_NODES; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_RESETDB; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.CASSANDRA_USESSL; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.DATA_GC_GRACE_SECONDS; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.DEFAULT_TTL; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.USE_VIRTUAL_CLOCK; | |
import static org.hawkular.metrics.api.jaxrs.config.ConfigurationKey.WAIT_FOR_SERVICE; | |
import java.security.NoSuchAlgorithmException; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.Locale; | |
import java.util.Map; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ThreadFactory; | |
import javax.annotation.PostConstruct; | |
import javax.annotation.PreDestroy; | |
import javax.enterprise.context.ApplicationScoped; | |
import javax.enterprise.inject.Produces; | |
import javax.inject.Inject; | |
import javax.net.ssl.SSLContext; | |
import org.hawkular.metrics.api.jaxrs.config.Configurable; | |
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty; | |
import org.hawkular.metrics.api.jaxrs.log.RestLogger; | |
import org.hawkular.metrics.api.jaxrs.log.RestLogging; | |
import org.hawkular.metrics.api.jaxrs.util.Eager; | |
import org.hawkular.metrics.api.jaxrs.util.TestClock; | |
import org.hawkular.metrics.api.jaxrs.util.VirtualClock; | |
import org.hawkular.metrics.core.api.MetricsService; | |
import org.hawkular.metrics.core.impl.DataAccess; | |
import org.hawkular.metrics.core.impl.DataAccessImpl; | |
import org.hawkular.metrics.core.impl.DateTimeService; | |
import org.hawkular.metrics.core.impl.MetricsServiceImpl; | |
import org.hawkular.metrics.schema.SchemaManager; | |
import org.hawkular.metrics.tasks.api.AbstractTrigger; | |
import org.hawkular.metrics.tasks.api.Task2; | |
import org.hawkular.metrics.tasks.api.TaskScheduler; | |
import org.hawkular.metrics.tasks.impl.Queries; | |
import org.hawkular.metrics.tasks.impl.TaskSchedulerImpl; | |
import org.hawkular.rx.cassandra.driver.RxSessionImpl; | |
import org.joda.time.DateTime; | |
import com.codahale.metrics.MetricRegistry; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.ResultSet; | |
import com.datastax.driver.core.SSLOptions; | |
import com.datastax.driver.core.Session; | |
import com.google.common.base.Throwables; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.Uninterruptibles; | |
import rx.Subscription; | |
import rx.functions.Action1; | |
import rx.schedulers.Schedulers; | |
import rx.schedulers.TestScheduler; | |
/** | |
* Bean created on startup to manage the lifecycle of the {@link MetricsService} instance shared in application scope. | |
* | |
* @author John Sanda | |
* @author Thomas Segismont | |
*/ | |
@ApplicationScoped | |
@Eager | |
public class MetricsServiceLifecycle { | |
private static final RestLogger log = RestLogging.getRestLogger(MetricsServiceLifecycle.class); | |
/** | |
* @see #getState() | |
*/ | |
public enum State { | |
STARTING, STARTED, STOPPING, STOPPED, FAILED | |
} | |
private MetricsServiceImpl metricsService; | |
private TaskScheduler taskScheduler; | |
private final ScheduledExecutorService lifecycleExecutor; | |
private VirtualClock virtualClock; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(CASSANDRA_CQL_PORT) | |
private String cqlPort; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(CASSANDRA_NODES) | |
private String nodes; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(CASSANDRA_KEYSPACE) | |
private String keyspace; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(CASSANDRA_RESETDB) | |
private String resetDb; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(WAIT_FOR_SERVICE) | |
private String waitForService; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(USE_VIRTUAL_CLOCK) | |
private String useVirtualClock; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(CASSANDRA_USESSL) | |
private String cassandraUseSSL; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(DEFAULT_TTL) | |
private String defaultTTL; | |
@Inject | |
@Configurable | |
@ConfigurationProperty(DATA_GC_GRACE_SECONDS) | |
private String dataGcGraceSeconds; | |
private volatile State state; | |
private int connectionAttempts; | |
private Session session; | |
private DataAccess dataAcces; | |
private Map<? super Action1<Task2>, Subscription> jobs = new HashMap<>(); | |
MetricsServiceLifecycle() { | |
ThreadFactory threadFactory = r -> { | |
Thread thread = Executors.defaultThreadFactory().newThread(r); | |
thread.setName(MetricsService.class.getSimpleName().toLowerCase(Locale.ROOT) + "-lifecycle-thread"); | |
return thread; | |
}; | |
// All lifecycle operations will be executed on a single thread to avoid synchronization issues | |
lifecycleExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); | |
state = State.STARTING; | |
} | |
/** | |
* Returns the lifecycle state of the {@link MetricsService} shared in application scope. | |
* | |
* @return lifecycle state of the shared {@link MetricsService} | |
*/ | |
public State getState() { | |
return state; | |
} | |
@PostConstruct | |
void init() { | |
lifecycleExecutor.submit(this::startMetricsService); | |
if (Boolean.parseBoolean(waitForService) | |
// "hawkular-metrics.backend" is not a real Metrics configuration parameter (there's a single | |
// MetricsService implementation, which is backed by Cassandra). | |
// But it's been used historically to wait for the service to be available before completing the deployment. | |
// Therefore, we still use it here for backward compatibililty. | |
// TODO remove when Hawkular build has been updated to use the eager startup flag | |
|| "embedded_cassandra".equals(System.getProperty("hawkular.backend"))) { | |
long start = System.nanoTime(); | |
while (state == State.STARTING | |
// Give up after a minute. The deployment won't be failed and we'll continue to try to start the | |
// service in the background. | |
&& NANOSECONDS.convert(1, MINUTES) > System.nanoTime() - start) { | |
Uninterruptibles.sleepUninterruptibly(1, SECONDS); | |
} | |
} | |
} | |
private void startMetricsService() { | |
if (state != State.STARTING) { | |
return; | |
} | |
log.infoInitializing(); | |
connectionAttempts++; | |
try { | |
session = createSession(); | |
} catch (Exception t) { | |
Throwable rootCause = Throwables.getRootCause(t); | |
log.warnCouldNotConnectToCassandra(rootCause.getLocalizedMessage()); | |
// cycle between original and more wait time - avoid waiting huge amounts of time | |
long delay = 1L + ((connectionAttempts - 1L) % 4L); | |
log.warnRetryingConnectingToCassandra(connectionAttempts, delay); | |
lifecycleExecutor.schedule(this::startMetricsService, delay, SECONDS); | |
return; | |
} | |
try { | |
// When this class was first introduced, I said that the schema management | |
// should stay in MetricsServiceImpl, and now I am putting it back here. OK, so | |
// I deserve some criticism; however, I still think it should be done that way. | |
// I made this change temporarily because the schema for metrics and for the | |
// task scheduling service are declared and created in the same place. That | |
// will change at some point though because the task scheduling service will | |
// probably move to the hawkular-commons repo. | |
initSchema(); | |
log.info("Updating gc_grace_seconds to " + dataGcGraceSeconds); | |
session.execute("ALTER TABLE " + keyspace + ".data WITH gc_grace_seconds = " + dataGcGraceSeconds); | |
// updateGcGraceSecondsIfNecessary(); | |
dataAcces = new DataAccessImpl(session); | |
initTaskScheduler(); | |
metricsService = new MetricsServiceImpl(); | |
metricsService.setDataAccess(dataAcces); | |
metricsService.setTaskScheduler(taskScheduler); | |
metricsService.setDateTimeService(createDateTimeService()); | |
metricsService.setDefaultTTL(getDefaultTTL()); | |
// TODO Set up a managed metric registry | |
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose | |
// the registered metrics in various ways such as new REST endpoints, JMX, or via different | |
// com.codahale.metrics.Reporter instances. | |
metricsService.startUp(session, keyspace, false, false, new MetricRegistry()); | |
log.infoServiceStarted(); | |
initJobs(); | |
state = State.STARTED; | |
} catch (Exception e) { | |
log.fatalCannotConnectToCassandra(e); | |
state = State.FAILED; | |
} finally { | |
if (state != State.STARTED) { | |
try { | |
metricsService.shutdown(); | |
} catch (Exception e) { | |
log.errorCouldNotCloseServiceInstance(e); | |
} | |
} | |
} | |
} | |
private Session createSession() { | |
Cluster.Builder clusterBuilder = new Cluster.Builder(); | |
int port; | |
try { | |
port = Integer.parseInt(cqlPort); | |
} catch (NumberFormatException nfe) { | |
String defaultPort = CASSANDRA_CQL_PORT.defaultValue(); | |
log.warnInvalidCqlPort(cqlPort, defaultPort); | |
port = Integer.parseInt(defaultPort); | |
} | |
clusterBuilder.withPort(port); | |
Arrays.stream(nodes.split(",")).forEach(clusterBuilder::addContactPoint); | |
if (Boolean.parseBoolean(cassandraUseSSL)) { | |
SSLOptions sslOptions = null; | |
try { | |
sslOptions = new SSLOptions(SSLContext.getDefault(), SSLOptions | |
.DEFAULT_SSL_CIPHER_SUITES); | |
clusterBuilder.withSSL(sslOptions); | |
} catch (NoSuchAlgorithmException e) { | |
throw new RuntimeException("SSL support is required but is not available in the JVM.", e); | |
} | |
} | |
Cluster cluster = clusterBuilder.build(); | |
cluster.init(); | |
Session createdSession = null; | |
try { | |
createdSession = cluster.connect("system"); | |
return createdSession; | |
} finally { | |
if (createdSession == null) { | |
cluster.close(); | |
} | |
} | |
} | |
private void initSchema() { | |
SchemaManager schemaManager = new SchemaManager(session); | |
if (Boolean.parseBoolean(resetDb)) { | |
schemaManager.dropKeyspace(keyspace); | |
} | |
schemaManager.createSchema(keyspace); | |
session.execute("USE " + keyspace); | |
} | |
private void updateGcGraceSecondsIfNecessary() { | |
try { | |
ResultSet resultSet = session.execute("SELECT gc_grace_seconds FROM system.schema_columnfamilies " + | |
"WHERE keyspace_name = '" + keyspace + "' AND columnfamily_name = 'data'"); | |
Integer gcGraceSeconds = null; | |
int newGcGraceSeconds = Integer.parseInt(dataGcGraceSeconds); | |
if (resultSet.isExhausted()) { | |
log.warn("Could not determine gc_grace_seconds for data table"); | |
} else { | |
gcGraceSeconds = resultSet.all().get(0).getInt(0); | |
} | |
if (gcGraceSeconds == null || newGcGraceSeconds != gcGraceSeconds) { | |
log.info("Updating gc_grace_seconds of data table from " + gcGraceSeconds + " to " + newGcGraceSeconds); | |
session.execute("ALTER TABLE " + keyspace + ".data WITH gc_grace_seconds = " + gcGraceSeconds); | |
} | |
} catch (NumberFormatException e) { | |
log.warn(dataGcGraceSeconds + " is not a valid valid for " + DATA_GC_GRACE_SECONDS + ". Skipping update " + | |
"of gc_grace_seconds for data table."); | |
} | |
} | |
private void initTaskScheduler() { | |
taskScheduler = new TaskSchedulerImpl(new RxSessionImpl(session), new Queries(session)); | |
if (Boolean.valueOf(useVirtualClock.toLowerCase())) { | |
TestScheduler scheduler = Schedulers.test(); | |
scheduler.advanceTimeTo(System.currentTimeMillis(), MILLISECONDS); | |
virtualClock = new VirtualClock(scheduler); | |
AbstractTrigger.now = scheduler::now; | |
((TaskSchedulerImpl) taskScheduler).setTickScheduler(scheduler); | |
} | |
taskScheduler.start(); | |
} | |
private int getDefaultTTL() { | |
try { | |
return Integer.parseInt(defaultTTL); | |
} catch (NumberFormatException e) { | |
log.warnInvalidDefaultTTL(defaultTTL, DEFAULT_TTL.defaultValue()); | |
return Integer.parseInt(DEFAULT_TTL.defaultValue()); | |
} | |
} | |
private void initJobs() { | |
// GenerateRate generateRates = new GenerateRate(metricsService); | |
// CreateTenants createTenants = new CreateTenants(metricsService, dataAcces); | |
// | |
// jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME)) | |
// .subscribe(generateRates)); | |
// jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME)) | |
// .subscribe(createTenants)); | |
} | |
private DateTimeService createDateTimeService() { | |
DateTimeService dateTimeService = new DateTimeService(); | |
if (Boolean.valueOf(useVirtualClock.toLowerCase())) { | |
dateTimeService.now = () -> new DateTime(virtualClock.now()); | |
} | |
return dateTimeService; | |
} | |
/** | |
* @return a {@link MetricsService} instance to share in application scope | |
*/ | |
@Produces | |
@ApplicationScoped | |
public MetricsService getMetricsService() { | |
return metricsService; | |
} | |
@Produces | |
@ApplicationScoped | |
@TestClock | |
public VirtualClock getVirtualClock() { | |
return virtualClock; | |
} | |
@Produces | |
@ApplicationScoped | |
public TaskScheduler getTaskScheduler() { | |
return taskScheduler; | |
} | |
@PreDestroy | |
void destroy() { | |
Future stopFuture = lifecycleExecutor.submit(this::stopMetricsService); | |
try { | |
Futures.get(stopFuture, 1, MINUTES, Exception.class); | |
} catch (Exception e) { | |
log.errorShutdownProblem(e); | |
} | |
lifecycleExecutor.shutdown(); | |
} | |
private void stopMetricsService() { | |
state = State.STOPPING; | |
try { | |
if (metricsService != null) { | |
metricsService.shutdown(); | |
} | |
if (taskScheduler != null) { | |
taskScheduler.shutdown(); | |
} | |
jobs.values().forEach(Subscription::unsubscribe); | |
if (session != null) { | |
session.close(); | |
session.getCluster().close(); | |
} | |
} finally { | |
state = State.STOPPED; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment