Skip to content

Instantly share code, notes, and snippets.

View jsanda's full-sized avatar

John Sanda jsanda

View GitHub Profile
Observable<Lease> leases = queryLeases(timeSlice)
.flatMap(Observable::from)
.map(row -> new Lease(timeSlice.getTime(), row.getInt(1), row.getString(2), row.getBool(3)))
.filter(lease -> !lease.isFinished())
.toList()
.repeat()
.takeUntil(List::isEmpty)
.flatMap(Observable::from);
// At this point I want to fetch the tasks for the lease,
// execute each task, then delete the tasks partition,
CREATE TABLE metrics_idx (
tenant_id text,
shard int
type int,
metric text,
tags map<text, text>,
data_retention int,
PRIMARY KEY ((tenant_id, shard), type, metric)
)
Func2<Observable<DataPoint<Double>>, Func1<Observable<Double>, Observable<Double>>, Observable<Double>> apply =
(dataPoints, func) -> func.call(dataPoints.map(DataPoint::getValue));
Observable<DataPoint<Double>> dataPoints = null; // fetch the data...
Observable<Double> avgerage = apply.call(dataPoints, MathObservable::averageDouble);
public class AlertsTest {
@Test
public void createAlerts() throws Exception {
initSession();
PreparedStatement createAlert = session.prepare(
"INSERT INTO alerts (tenantId, alertId, payload) VALUES (?, ?, ?)");
String tenantId = "alerts-tenant";
int numAlerts = 100000000;
CountDownLatch latch = new CountDownLatch(numAlerts);
$ ccm node1 status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
UN 127.0.0.1 195.48 MB 256 ? 8c14221d-0870-467c-894a-76ead663e9f1 rack1
UN 127.0.0.2 279.71 KB 256 ? 1f9cf256-fc13-41ec-b5c5-2a0274b18963 rack1
UN 127.0.0.3 278.63 KB 256 ? e26d6450-91e1-4d6b-92dd-566b023c8686 rack1
public <T> T applyFunction(String tenantId, MetricId id, long start, long end,
Func1<Observable<DataPoint<Double>>, T> function) {
Observable<DataPoint<Double>> dataPoints = metricsService.findGaugeData(tenantId, id, start, end);
return function.call(dataPoints);
}
class Aggregate {
public final Double min;
public final Double max;
public final Double avg;
public <T> T applyFunction(String tenantId, MetricId id, long start, long end,
Func1<Observable<DataPoint<Double>>, T> function) {
Observable<DataPoint<Double>> dataPoints = metricsService.findGaugeData(tenantId, id, start, end);
return function.call(dataPoints);
}
@Test
public void getAverage() {
DateTime start = now().minusMinutes(30);
DateTime end = start.plusMinutes(20);
static class MinMax {
public DataPoint<Double> min;
public DataPoint<Double> max;
}
@Test
public void findRange() throws Exception {
DateTime start = now().minusMinutes(30);
DateTime end = start.plusMinutes(20);
String tenantId = "t1";
@Test
public void findAvg() throws Exception {
DateTime start = now().minusMinutes(30);
DateTime end = start.plusMinutes(20);
String tenantId = "t1";
metricsService.createTenant(new Tenant(tenantId)).toBlocking().lastOrDefault(null);
Metric<Double> m1 = new Metric<>(tenantId, GAUGE, new MetricId("m1"), asList(
new DataPoint<>(start.getMillis(), 1.1),
public class MetricsServiceProducer {
private MetricsServiceImpl metricsService;
@Produces
public MetricsService getMetricsService() {
if (metricsService == null) {
metricsService = new MetricsServiceImpl();
Session session = // create session object...
String keyspace = // keyspace to use for metrics...