This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Observable<Lease> leases = queryLeases(timeSlice) | |
.flatMap(Observable::from) | |
.map(row -> new Lease(timeSlice.getTime(), row.getInt(1), row.getString(2), row.getBool(3))) | |
.filter(lease -> !lease.isFinished()) | |
.toList() | |
.repeat() | |
.takeUntil(List::isEmpty) | |
.flatMap(Observable::from); | |
// At this point I want to fetch the tasks for the lease, | |
// execute each task, then delete the tasks partition, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE metrics_idx ( | |
tenant_id text, | |
shard int | |
type int, | |
metric text, | |
tags map<text, text>, | |
data_retention int, | |
PRIMARY KEY ((tenant_id, shard), type, metric) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Func2<Observable<DataPoint<Double>>, Func1<Observable<Double>, Observable<Double>>, Observable<Double>> apply = | |
(dataPoints, func) -> func.call(dataPoints.map(DataPoint::getValue)); | |
Observable<DataPoint<Double>> dataPoints = null; // fetch the data... | |
Observable<Double> avgerage = apply.call(dataPoints, MathObservable::averageDouble); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AlertsTest { | |
@Test | |
public void createAlerts() throws Exception { | |
initSession(); | |
PreparedStatement createAlert = session.prepare( | |
"INSERT INTO alerts (tenantId, alertId, payload) VALUES (?, ?, ?)"); | |
String tenantId = "alerts-tenant"; | |
int numAlerts = 100000000; | |
CountDownLatch latch = new CountDownLatch(numAlerts); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ccm node1 status | |
Datacenter: datacenter1 | |
======================= | |
Status=Up/Down | |
|/ State=Normal/Leaving/Joining/Moving | |
-- Address Load Tokens Owns Host ID Rack | |
UN 127.0.0.1 195.48 MB 256 ? 8c14221d-0870-467c-894a-76ead663e9f1 rack1 | |
UN 127.0.0.2 279.71 KB 256 ? 1f9cf256-fc13-41ec-b5c5-2a0274b18963 rack1 | |
UN 127.0.0.3 278.63 KB 256 ? e26d6450-91e1-4d6b-92dd-566b023c8686 rack1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T> T applyFunction(String tenantId, MetricId id, long start, long end, | |
Func1<Observable<DataPoint<Double>>, T> function) { | |
Observable<DataPoint<Double>> dataPoints = metricsService.findGaugeData(tenantId, id, start, end); | |
return function.call(dataPoints); | |
} | |
class Aggregate { | |
public final Double min; | |
public final Double max; | |
public final Double avg; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T> T applyFunction(String tenantId, MetricId id, long start, long end, | |
Func1<Observable<DataPoint<Double>>, T> function) { | |
Observable<DataPoint<Double>> dataPoints = metricsService.findGaugeData(tenantId, id, start, end); | |
return function.call(dataPoints); | |
} | |
@Test | |
public void getAverage() { | |
DateTime start = now().minusMinutes(30); | |
DateTime end = start.plusMinutes(20); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static class MinMax { | |
public DataPoint<Double> min; | |
public DataPoint<Double> max; | |
} | |
@Test | |
public void findRange() throws Exception { | |
DateTime start = now().minusMinutes(30); | |
DateTime end = start.plusMinutes(20); | |
String tenantId = "t1"; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void findAvg() throws Exception { | |
DateTime start = now().minusMinutes(30); | |
DateTime end = start.plusMinutes(20); | |
String tenantId = "t1"; | |
metricsService.createTenant(new Tenant(tenantId)).toBlocking().lastOrDefault(null); | |
Metric<Double> m1 = new Metric<>(tenantId, GAUGE, new MetricId("m1"), asList( | |
new DataPoint<>(start.getMillis(), 1.1), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MetricsServiceProducer { | |
private MetricsServiceImpl metricsService; | |
@Produces | |
public MetricsService getMetricsService() { | |
if (metricsService == null) { | |
metricsService = new MetricsServiceImpl(); | |
Session session = // create session object... | |
String keyspace = // keyspace to use for metrics... |