Skip to content

Instantly share code, notes, and snippets.

View jsanda's full-sized avatar

John Sanda jsanda

View GitHub Profile
@BeforeClass
public void initClass() {
dataAccess = new DataAccessImpl(session);
ConfigurationService configurationService = new ConfigurationService() ;
configurationService.init(rxSession);
metricsService = new MetricsServiceImpl();
metricsService.setDataAccess(dataAccess);
metricsService.setConfigurationService(configurationService);
cqlsh:hawkulartest> select distinct tenant_id, type, metric, dpart from data limit 1;
tenant_id | type | metric | dpart
-------------+------+--------+-------
test-tenant | 0 | m1 | 0
(1 rows)
Tracing session: 11b597d0-6a6a-11e6-ab52-51eaf4bb8e56
cqlsh:hawkulartest> select distinct tenant_id, metric from data where tenant_id = 'T2' and type = 0 and metric = 'G488' and dpart = 0;
tenant_id | metric
-----------+--------
(0 rows)
Tracing session: 3fe7c3d0-6a6a-11e6-ab52-51eaf4bb8e56
activity | timestamp | source | source_elapsed | client
// Resets keyspace by truncating all tables in keyspace.
//
// session is an Rx wrapper around com.datastax.driver.core.Session and
// and uses the async methods in Session. The execute method return an
// Observable<ResultSet>.
public void truncateTables(String keyspace) {
session.execute("select table_name from system_schema.tables where keyspace_name = '" + keyspace + "'")
.flatMap(Observable::from)
.flatMap(row -> session.execute("truncate " + row.getString(0)))
.toCompletable()
private static Logger logger = Logger.getLogger(RxTest.class);
private rx.Scheduler tickScheduler;
private ExecutorService queueExecutor;
private rx.Scheduler queueScheduler;
@BeforeClass
public void initClass() {
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
public class QueryListener extends RPCBasicMessageListener<InventoryQueryRequestMessage> {
private final Inventory inventory;
@Inject
private Bus bus;
@Resource(mappedName = "java:/jms/queue/INVENTORY_QUEUE_NAME")
Queue queue;
@Test
public void addAndFetchGaugeDataAggregates() throws Exception {
DateTime start = now().minusMinutes(30);
DateTime end = start.plusMinutes(20);
String tenantId = "t1";
metricsService.createTenant(new Tenant(tenantId)).toBlocking().lastOrDefault(null);
Metric<Double> m1 = new Metric<>(new MetricId<>(tenantId, GAUGE, "m1"), asList(
new DataPoint<>(start.getMillis(), 10.0),
// Schema for metrics_idx is
//
// CREATE TABLE metrics_idx (
// tenant_id text,
// type int,
// interval text,
// metric text,
// PRIMARY KEY ((tenant_id, type), interval, metric)
// );
String indexQuery = "INSERT INTO metrics_idx (tenant_id, type, interval, metric) VALUES (?, ?, ?, ?)";
i = 0, j = 1
i = 0, j = 2
i = 0, j = 3
i = 0, j = 4
i = 0, j = 5
i = 0, j = 6
i = 5, j = 6
i = 0, j = 7
i = 5, j = 7
i = 0, j = 8