Created
July 15, 2016 20:49
-
-
Save rweeks/bedb96e20fcb8d2d2f27c555494181c5 to your computer and use it in GitHub Desktop.
Delete all key-value pairs for a single locality group in Accumulo.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this code is in the public domain | |
package com.newbrightidea.accumulo | |
import java.io.IOException | |
import java.util | |
import org.apache.accumulo.core.data.{ByteSequence, Key, Range, Value} | |
import org.apache.accumulo.core.iterators.{IteratorEnvironment, SortedKeyValueIterator, WrappingIterator} | |
import scala.collection.JavaConverters._ | |
import java.nio.charset.StandardCharsets.UTF_8 | |
import org.apache.accumulo.core.client.security.tokens.KerberosToken | |
import org.apache.accumulo.core.client.{ClientConfiguration, IteratorSetting, ZooKeeperInstance} | |
object LocalityGroupDeleter { | |
val OptLgColFams = "lgColFams" | |
def main(args: Array[String]): Integer = { | |
val Array(tableName, lgCols) = args | |
val krbToken = new KerberosToken() | |
val accConn = new ZooKeeperInstance(ClientConfiguration.loadDefault()) | |
.getConnector(krbToken.getPrincipal, krbToken) | |
accConn.tableOperations().compact( tableName, null, null, | |
List( | |
new IteratorSetting(50, "LocalityGroupDeleter", classOf[LocalityGroupDeleter], Map(OptLgColFams -> lgCols).asJava) | |
).asJava, true, true) | |
0 | |
} | |
} | |
class LocalityGroupDeleter extends WrappingIterator { | |
import LocalityGroupDeleter._ | |
var lgColFams = Set[String]() | |
var seekCfMatchesLg = false | |
override def init(source: SortedKeyValueIterator[Key, Value], options: util.Map[String, String], | |
env: IteratorEnvironment): Unit = { | |
super.init(source, options, env) | |
if (!options.containsKey(OptLgColFams)) { | |
throw new IOException("Specify LG column families as comma-separated list using " + OptLgColFams) | |
} | |
lgColFams = Set(options.get(OptLgColFams).split(","):_*) | |
} | |
override def seek(range: Range, columnFamilies: util.Collection[ByteSequence], inclusive: Boolean): Unit = { | |
super.seek(range, columnFamilies, inclusive) | |
val seekColFams = columnFamilies.asScala.map(x => new String(x.toArray, UTF_8)).toSet | |
seekCfMatchesLg = seekColFams == lgColFams | |
} | |
override def hasTop: Boolean = if (seekCfMatchesLg) false else super.hasTop | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment