-
-
Save renato2099/5835106 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import me.prettyprint.cassandra.serializers.ObjectSerializer; | |
import me.prettyprint.cassandra.service.ThriftCfDef; | |
import me.prettyprint.hector.api.*; | |
import me.prettyprint.hector.api.beans.HColumn; | |
import me.prettyprint.hector.api.ddl.*; | |
import me.prettyprint.hector.api.factory.HFactory; | |
import me.prettyprint.hector.api.mutation.Mutator; | |
import me.prettyprint.hector.api.query.QueryResult; | |
import org.slf4j.*; | |
import static me.prettyprint.hector.api.factory.HFactory.*; | |
/** | |
* Service that allows interaction with a single column family in a key-space on a cluster | |
*/ | |
public class CassandraService { | |
private static final ObjectSerializer SERIALIZER = ObjectSerializer.get(); | |
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraService.class); | |
private Cluster cluster; | |
private Keyspace keyspace; | |
private String columnFamilyName; | |
private KeyspaceDefinition keyspaceDefinition; | |
private void init(Cluster cluster, String keySpaceName, String columnFamilyName) { | |
this.cluster = cluster; | |
this.columnFamilyName = columnFamilyName; | |
this.keyspaceDefinition = getOrCreateKeyspaceDefinition(keySpaceName); | |
LOGGER.info("We are going to create KeySpace {}", keySpaceName); | |
this.keyspace = createKeyspace(keySpaceName, cluster, new GmiPublisherConsistencyLevelPolicy()); | |
} | |
public CassandraService(Cluster cluster, String keySpaceName, String columnFamilyName) { | |
init(cluster, keySpaceName, columnFamilyName); | |
LOGGER.info("KeyspaceDefinition {} for keyspace {}", keyspaceDefinition, keySpaceName); | |
if (!columnFamilyExists(columnFamilyName, keyspaceDefinition)) { | |
ColumnFamilyDefinition columnFamilyDefinition = createColumnFamilyDefinition(keySpaceName, columnFamilyName); | |
cluster.addColumnFamily(columnFamilyDefinition); | |
LOGGER.info("Added column family {} / {}", keySpaceName, columnFamilyName); | |
} | |
} | |
public CassandraService(Cluster cluster, String keySpaceName, String columnFamilyName, int gcGraceSeconds) { | |
init(cluster, keySpaceName, columnFamilyName); | |
LOGGER.info("KeyspaceDefinition {} for keyspace {}", keyspaceDefinition, keySpaceName); | |
if (!columnFamilyExists(columnFamilyName, keyspaceDefinition)) { | |
addColumnFamilyWithGCGraceSeconds(keySpaceName, columnFamilyName, gcGraceSeconds); | |
LOGGER.info("Added column family {} / {} with gcGraceSeconds period {}", | |
new Object[] { keySpaceName, columnFamilyName, gcGraceSeconds }); | |
} | |
} | |
private KeyspaceDefinition getOrCreateKeyspaceDefinition(String keySpaceName) { | |
KeyspaceDefinition keyspaceDef = cluster.describeKeyspace(keySpaceName); | |
if (keyspaceDef == null) { | |
keyspaceDef = createKeyspaceDefinition(keySpaceName); | |
} | |
return keyspaceDef; | |
} | |
private KeyspaceDefinition createKeyspaceDefinition(String keySpaceName) { | |
KeyspaceDefinition keyspaceDef; | |
cluster.addKeyspace(HFactory.createKeyspaceDefinition(keySpaceName)); | |
LOGGER.info("Added keyspace {}", keySpaceName); | |
keyspaceDef = cluster.describeKeyspace(keySpaceName); | |
return keyspaceDef; | |
} | |
private void addColumnFamilyWithGCGraceSeconds(String keySpaceName, String columnFamilyName, int gcGraceSeconds) { | |
ColumnFamilyDefinition columnFamilyDefinition = createColumnFamilyDefinition(keySpaceName, columnFamilyName); | |
ThriftCfDef columnFamilyWithGCGraceSeconds = new ThriftCfDef(columnFamilyDefinition); | |
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds); | |
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds); | |
} | |
private boolean columnFamilyExists(String columnFamily, KeyspaceDefinition keyspaceDefinition) { | |
if (keyspaceDefinition != null) { | |
for (ColumnFamilyDefinition columnFamilyDefinition : keyspaceDefinition.getCfDefs()) { | |
if (columnFamily.equals(columnFamilyDefinition.getName())) { | |
return true; | |
} | |
} | |
} | |
return false; | |
} | |
public QueryResult<HColumn<Object, Object>> queryObject(Object rowKey, Object columnName) { | |
return createColumnQuery(keyspace, SERIALIZER, SERIALIZER, SERIALIZER) | |
.setColumnFamily(this.columnFamilyName) | |
.setKey(rowKey) | |
.setName(columnName) | |
.execute(); | |
} | |
public void persistObject(Object rowKey, Object columnName, Object obj) { | |
LOGGER.debug("Persist {} / {}", rowKey, columnName); | |
HColumn<Object, Object> column = createColumn(columnName, obj, SERIALIZER, SERIALIZER); | |
executeInsertion(rowKey, column); | |
} | |
public void persistObjectWithTtl(Object rowKey, Object columnName, Object obj, int ttl) { | |
LOGGER.debug("Persist {} / {}", rowKey, columnName); | |
HColumn<Object, Object> column = createColumn(columnName, obj, SERIALIZER, SERIALIZER); | |
column.setTtl(ttl); | |
executeInsertion(rowKey, column); | |
} | |
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) { | |
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER); | |
mutator.addInsertion(rowKey, this.columnFamilyName, column); | |
mutator.execute(); | |
} | |
/** | |
* API doesn't currently support completely removing row, only deletes contents, use truncate to get rid of empty | |
* rows | |
*/ | |
public void remove(Object rowKey, Object columnName) { | |
createMutator(this.keyspace, SERIALIZER) | |
.addDeletion(rowKey, this.columnFamilyName, columnName, SERIALIZER) | |
.execute(); | |
} | |
/** | |
* Only returns true if removed entries are truncated, as remove leaves empty row | |
*/ | |
public boolean isColumnFamilyEmpty() { | |
return HFactory.createRangeSlicesQuery(this.keyspace, SERIALIZER, SERIALIZER, SERIALIZER) | |
.setColumnFamily(this.columnFamilyName) | |
.setRange("", "", false, 1) | |
.setRowCount(1) | |
.execute().get().getList().isEmpty(); | |
} | |
public void truncateColumnFamily() { | |
this.cluster.truncate(this.keyspace.getKeyspaceName(), this.columnFamilyName); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Iterator; | |
import java.util.concurrent.*; | |
import me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy; | |
import me.prettyprint.cassandra.serializers.ObjectSerializer; | |
import me.prettyprint.hector.api.*; | |
import me.prettyprint.hector.api.beans.*; | |
import me.prettyprint.hector.api.factory.HFactory; | |
import me.prettyprint.hector.api.query.*; | |
import org.apache.cassandra.service.StorageService; | |
import org.apache.commons.lang3.RandomStringUtils; | |
import org.junit.*; | |
import org.slf4j.LoggerFactory; | |
import com.jayway.awaitility.Duration; | |
import static com.jayway.awaitility.Awaitility.waitAtMost; | |
import static me.prettyprint.hector.api.factory.HFactory.createKeyspace; | |
import static org.junit.Assert.*; | |
/** | |
* Tests that cassandra actually removes empty rows after compaction. | |
* | |
* @author Grigorev Alexey | |
*/ | |
public class CassandraTtlIntergrationTest { | |
private static final Logger logger = LoggerFactory.getLogger(CassandraTtlIntergrationTest.class); | |
private static final String KEYSPACE = "KEYSPACE"; | |
private static final String COLUMN_FAMILY = "COLUMN_FAMILY"; | |
private static final String COLUMN_NAME = "columnName"; | |
private static final int GC_CRACE_SECONDS = 20; | |
// sut | |
private CassandraService cassandraService; | |
// dependencies | |
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", "localhost:9160"); | |
private Keyspace keyspace; | |
@BeforeClass | |
public static void setupBeforeClass() { | |
EmbeddedCassandraDaemon daemon = EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon(); | |
StorageService.instance.registerDaemon(daemon.getCassandraDaemon()); | |
} | |
@Before | |
public void setUp() throws Exception { | |
keyspace = createKeyspace(KEYSPACE, cluster, new QuorumAllConsistencyLevelPolicy()); | |
cassandraService = new CassandraService(cluster, KEYSPACE, COLUMN_FAMILY, GC_CRACE_SECONDS); | |
} | |
@Test | |
public void objectRemovedAfterTtlPasses() throws Exception { | |
String rowKey = RandomStringUtils.randomAlphanumeric(128); | |
Object obj = RandomStringUtils.randomAlphanumeric(1000); | |
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20); | |
logger.info("after persisting rows count is {}", countRows()); | |
Object value = retrieve(rowKey, COLUMN_NAME); | |
assertNotNull(value); | |
logger.info("before TTL passes rows count is {}", countRows()); | |
logger.info("sleeping 25 seconds..."); | |
TimeUnit.SECONDS.sleep(25); | |
Object nullValue = retrieve(rowKey, COLUMN_NAME); | |
assertNull(nullValue); | |
} | |
@Test | |
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception { | |
final int expectedAmount = 50000; | |
logger.info("before persisting rows count is {}", countRows()); | |
for (int i = 0; i < expectedAmount; i++) { | |
String rowKey = RandomStringUtils.randomAlphanumeric(128); | |
Object obj = RandomStringUtils.randomAlphanumeric(1000); | |
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20); | |
if (i % 100 == 0) { | |
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY); | |
} | |
} | |
logger.info("causing major compaction..."); | |
causeCompaction(); | |
logger.info("after major compaction rows count is {}", countRows()); | |
waitAtMost(Duration.TWO_MINUTES) | |
.pollDelay(Duration.TWO_SECONDS) | |
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS) | |
.until(new Callable<Boolean>() { | |
@Override | |
public Boolean call() throws Exception { | |
int countRows = countRows(); | |
logger.info("the rows count is {}", countRows); | |
return countRows < expectedAmount; | |
} | |
}); | |
} | |
public void causeCompaction() throws Exception { | |
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY); | |
} | |
public int countRows() { | |
// http://stackoverflow.com/questions/8418448 | |
int rowCount = 100; | |
ObjectSerializer serializer = ObjectSerializer.get(); | |
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery = | |
HFactory.createRangeSlicesQuery(keyspace, serializer, serializer, serializer) | |
.setColumnFamily(COLUMN_FAMILY) | |
.setRange(null, null, false, 10).setRowCount(rowCount); | |
Object lastKey = null; | |
int i = 0; | |
while (true) { | |
rangeSlicesQuery.setKeys(lastKey, null); | |
QueryResult<OrderedRows<Object, Object, Object>> result = rangeSlicesQuery.execute(); | |
OrderedRows<Object, Object, Object> rows = result.get(); | |
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator(); | |
// we'll skip this first one, since it is the same as the last one from previous time we executed | |
if (lastKey != null && rowsIterator != null) { | |
rowsIterator.next(); | |
} | |
while (rowsIterator.hasNext()) { | |
Row<Object, Object, Object> row = rowsIterator.next(); | |
lastKey = row.getKey(); | |
i++; | |
if (row.getColumnSlice().getColumns().isEmpty()) { | |
continue; | |
} | |
} | |
if (rows.getCount() < rowCount) { | |
break; | |
} | |
} | |
return i; | |
} | |
private Object retrieve(String rowKey, String columnName) { | |
HColumn<Object, Object> hColumn = cassandraService.queryObject(rowKey, columnName).get(); | |
if (hColumn == null) { | |
return null; | |
} | |
return hColumn.getValue(); | |
} | |
@After | |
public void tearDown() { | |
cassandraService.truncateColumnFamily(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment