Last active
January 3, 2021 19:50
-
-
Save tkrs/8068f42b1a1c6d98183cfac680edbca9 to your computer and use it in GitHub Desktop.
Bigtable CLI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`com.github.tkrs::orcus-bigtable:0.25.2` | |
import $ivy.`com.github.tkrs::orcus-cats-effect:0.25.2` | |
import $ivy.`com.github.tkrs::mess-core:0.2.4` | |
import $ivy.`com.github.scopt::scopt:4.0.0` | |
import java.time.Instant | |
import cats.effect.IO | |
import cats.instances.map._ | |
import cats.syntax.all._ | |
import cats.syntax.group | |
import com.google.cloud.bigtable.data.v2.BigtableDataClient | |
import com.google.cloud.bigtable.data.v2.BigtableDataSettings | |
import com.google.cloud.bigtable.data.v2.models.Filters.FILTERS | |
import com.google.cloud.bigtable.data.v2.models.Query | |
import com.google.cloud.bigtable.data.v2.models.RowCell | |
import com.google.protobuf.ByteString | |
import mainargs.TokensReader | |
import mess._ | |
import mess.codec._ | |
import orcus.async.instances.catsEffect.effect._ | |
import orcus.bigtable.DataClient | |
import orcus.bigtable.async.implicits._ | |
import org.msgpack.core.MessagePack | |
import scopt.Read | |
@main | |
def lookup( | |
project: String, | |
instance: String, | |
table: String, | |
profile: String = "", | |
key: ByteString, | |
@arg(name = "cells-per-column") | |
cellsPerColumn: Option[Int] = None, | |
@arg(doc = "family:qualifier:format,...") | |
format: Map[String, Map[String, Format]] = Map.empty | |
): Unit = { | |
val client = buildClient(project, instance, profile) | |
val chain = FILTERS.chain() | |
cellsPerColumn.foreach { x => | |
chain.filter(FILTERS.limit().cellsPerColumn(x)) | |
} | |
if (!format.contains("*")) { | |
chain.filter( | |
format.foldLeft(FILTERS.interleave()) { case (acc, (key, _)) => | |
acc.filter(FILTERS.family().exactMatch(key)) | |
} | |
) | |
} | |
val q = Query.create(table).prefix(key).filter(chain) | |
try client.readRowAsync(q).unsafeRunSync() match { | |
case Some(row) => | |
val size = | |
row.families.toList | |
.flatMap(_._2) | |
.map { cell => | |
Seq( | |
cell.getFamily().size, | |
cell.getQualifier().size, | |
cell.getValue().size | |
).sum | |
} | |
.sum | |
println(s""""${row.rowKey}":""") | |
println(s""" size: $size""") | |
println(s""" families:""") | |
row.families.foreach { case (family, cells) => | |
val f = format | |
.get(family) | |
.orElse(format.get("*")) | |
.getOrElse(Map("*" -> Format.String)) | |
println(s""" "${family}":""") | |
val grouped = | |
cells.groupBy(_.getQualifier().toStringUtf8()).toList.sortBy(_._1) | |
grouped.foreach { case (q, cells) => | |
println(s""" "$q":""") | |
cells.foreach { cell => | |
val ts = Instant.ofEpochMilli(cell.getTimestamp() / 1000L) | |
val v = formatValue(q, cell, f) | |
println(s" - value: ${v}") | |
println(s" size: ${cell.getValue().size()}") | |
println(s" timestamp: ${ts}") | |
} | |
} | |
} | |
case _ => | |
println("not found") | |
} finally client.close() | |
} | |
implicit object readByteString | |
extends TokensReader[ByteString]( | |
"key", | |
args => TokensReader.tryEither(ByteString.copyFromUtf8(args.head)) | |
) | |
implicit object ReadFormat | |
extends TokensReader[Map[String, Map[String, Format]]]( | |
"format", | |
s => | |
TokensReader.tryEither { | |
s.head | |
.split(',') | |
.toList | |
.map(_.split(':') match { | |
case Array(a, b, c) => a -> Map(b -> Format(c)) | |
}) | |
.foldLeft[Map[String, Map[String, Format]]](Map.empty) { | |
case (acc, (a, b)) => | |
acc.updated(a, acc.get(a).fold(b)(_ ++ b)) | |
} | |
} | |
) | |
sealed abstract class Format(val value: String) | |
object Format { | |
def apply(v: String): Format = v match { | |
case String.value => String | |
case Int.value => Int | |
case Long.value => Long | |
case Msgpack.value => Msgpack | |
case _ => throw new IllegalArgumentException(v) | |
} | |
case object String extends Format("string") | |
case object Int extends Format("int") | |
case object Long extends Format("long") | |
case object Msgpack extends Format("msgpack") | |
} | |
def buildClient( | |
project: String, | |
instance: String, | |
profile: String | |
): DataClient[IO] = { | |
val settings = BigtableDataSettings | |
.newBuilder() | |
.setProjectId(project) | |
.setInstanceId(instance) | |
if (profile.nonEmpty) { | |
settings.setAppProfileId(profile) | |
} | |
DataClient(BigtableDataClient.create(settings.build())) | |
} | |
def formatValue(q: String, cell: RowCell, f: Map[String, Format]): String = { | |
if (cell.getValue().isEmpty()) "" | |
else | |
f | |
.get(q) | |
.orElse(f.get("*")) | |
.getOrElse(Format.String) match { | |
case Format.Int => | |
orcus.bigtable.codec | |
.PrimitiveDecoder[Int] | |
.apply(cell.getValue()) | |
.toTry | |
.get | |
.toString() | |
case Format.Long => | |
orcus.bigtable.codec | |
.PrimitiveDecoder[Long] | |
.apply(cell.getValue()) | |
.toTry | |
.get | |
.toString() | |
case Format.String => | |
s""""${cell.getValue().toStringUtf8()}"""" | |
case Format.Msgpack => | |
formatMsgpack( | |
Fmt | |
.unpack( | |
cell.getValue().toByteArray(), | |
MessagePack.DEFAULT_UNPACKER_CONFIG | |
) | |
) | |
} | |
} | |
def formatMsgpack(a: Fmt): String = a match { | |
case Fmt.MNil => "nil" | |
case Fmt.MUnit => "" | |
case v: Fmt.MBool => v.value.toString() | |
case Fmt.MByte(v) => v.toString() | |
case Fmt.MShort(v) => v.toString() | |
case Fmt.MInt(v) => v.toString() | |
case Fmt.MLong(v) => v.toString() | |
case Fmt.MBigInt(v) => v.toString() | |
case Fmt.MFloat(v) => v.toString() | |
case Fmt.MDouble(v) => v.toString() | |
case Fmt.MString(v) => s""""${v}"""" | |
case Fmt.MBin(v) => binString(v) | |
case Fmt.MExtension(t, l, v) => | |
s"""{ type: 0x${"%x02".format(t)}, size: $l, value: ${binString(v)} }""" | |
case v: Fmt.MMap => | |
v.iterator | |
.map { case (k, v) => | |
s"${formatMsgpack(k)}: ${formatMsgpack(v)}" | |
} | |
.mkString("{ ", ", ", " }") | |
case v: Fmt.MArray => | |
v.iterator.map(formatMsgpack).mkString("[ ", ", ", " ]") | |
} | |
def binString(xs: Array[Byte]): String = | |
xs.map(v => "%x02".format(v)).mkString("0x", "", "") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment