Skip to content

Instantly share code, notes, and snippets.

View therako's full-sized avatar

Arun Kumar (AK) therako

  • Intuit
  • New York
View GitHub Profile
[
{
"id": 4072,
"uid": "c7204a53-77be-4ba6-b446-8f92921060a0",
"password": "uDQXExV6pr",
"first_name": "Lorenzo",
"last_name": "Fay",
"username": "lorenzo.fay",
"email": "[email protected]",
"avatar": "https://robohash.org/autconsecteturlabore.png?size=300x300&set=set1",
We can't make this file beautiful and searchable because it's too large.
playerID,birthYear,birthMonth,birthDay,birthCountry,birthState,birthCity,deathYear,deathMonth,deathDay,deathCountry,deathState,deathCity,nameFirst,nameLast,nameGiven,weight,height,bats,throws,debut,finalGame,retroID,bbrefID
aardsda01,1981,12,27,USA,CO,Denver,,,,,,,David,Aardsma,David Allan,215,75,R,R,2004-04-06,2015-08-23,aardd001,aardsda01
aaronha01,1934,2,5,USA,AL,Mobile,,,,,,,Hank,Aaron,Henry Louis,180,72,R,R,1954-04-13,1976-10-03,aaroh101,aaronha01
aaronto01,1939,8,5,USA,AL,Mobile,1984,8,16,USA,GA,Atlanta,Tommie,Aaron,Tommie Lee,190,75,R,R,1962-04-10,1971-09-26,aarot101,aaronto01
aasedo01,1954,9,8,USA,CA,Orange,,,,,,,Don,Aase,Donald William,190,75,R,R,1977-07-26,1990-10-03,aased001,aasedo01
abadan01,1972,8,25,USA,FL,Palm Beach,,,,,,,Andy,Abad,Fausto Andres,184,73,L,L,2001-09-10,2006-04-13,abada001,abadan01
abadfe01,1985,12,17,D.R.,La Romana,La Romana,,,,,,,Fernando,Abad,Fernando Antonio,220,73,L,L,2010-07-28,2017-10-01,abadf001,abadfe01
abadijo01,1850,11,4,USA,PA,Philadelphia,1905,5,17,USA,NJ,Pemberton,Jo
@therako
therako / iterm_startup_script
Last active November 16, 2018 08:24
My mac setup
tmux ls && read tmux_session && tmux -CC attach || tmux -CC
@therako
therako / Long_git_scripts.sh
Created July 26, 2018 04:14
Useful long git script
git for-each-ref --format='%(committerdate) %09 %(authorname) %09 %(refname)' | sort -k5n -k2M -k3n -k4n
Long batchSize = 10L;
pipeline
.apply("ReadItems", BigQueryIO.readTableRows().fromQuery(query))
.apply("ParseTableRow", ParDo.of(new ParseTableRow()))
.apply("ConcurrentQueueAggregate", ParDo.of(new DoFn<String, Iterable<String>>() {
private Queue<String> queue = new ConcurrentLinkedQueue<>();
@ProcessElement
void processElement(ProcessContext c) {
DataflowPipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args)
.as(DataflowPipelineOptions.class);
final Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(pipelineOptions);
final String query = "SELECT id, score from [bigquery-public-data:hacker_news.stories]";
Long batchSize = 10L;
pipeline
.apply("ReadItems", BigQueryIO.readTableRows().fromQuery(query))
.apply("ParseTableRow", ParDo.of(new ParseTableRow()))
.apply("Window", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
val keyspace = "keyspace"
val tableName = "tableName"
val inputPathStr = "inputPathStr"
val cassHosts = "host1:9092,host2:9092"
val sizeInMB = 61440
val primaryKeys = Array("key1", "key2", "...")
val writePartitions = Math.max(sizeInMB / 256, 1)
val df = session.read.parquet(inputPathStr)
val createTableStatement = getCreateTableStatement(keyspace, tableName, df, primaryKeys)
def flushSSTableToCassandra(cassHosts: String, dir: String): Unit = {
val conf=new Configuration()
val shuffledCassHosts=Random.shuffle(cassHosts.split(",").toList)
val selectedCassHost=shuffledCassHosts.head
conf.set("cassandra.output.thrift.address", selectedCassHost)
new SSTableLoader(new File(dir), new ExternalClient(conf), new OutputHandler.LogOutput).stream().get()
}
private def getCreateTableStatement(keyspace: String, tableName: String, df: DataFrame, keys: Array[String]) = {
val primaryKey = keys.mkString(",")
val fields = df.schema.map(field => field.name + " " + field.dataType.simpleString.trim())
val fieldsDesc = fields.mkString(", ")
"CREATE TABLE if not exists " + keyspace + "." + tableName + " (" + fieldsDesc + ", PRIMARY KEY (" + primaryKey + "))"
}
private def getInsertStatement(keyspace: String, tableName: String, df: DataFrame) = {
val fieldsDesc = df.schema.fields.map(f => f.name).mkString(",")
val valuesHolder = df.schema.fields.map(_ => "?").mkString(",")
def SSTableWriter(it: Iterator[Row], createTableStatement: String, insertStatement: String, cassHosts: String): Unit = {
val uuid = UUID.randomUUID().toString
val dir = "/tmp/" + uuid
new File(dir).mkdirs()
val writer = CQLSSTableWriter.builder().inDirectory(dir).forTable(createTableStatement)
.using(insertStatement).withPartitioner(new Murmur3Partitioner()).build()
while (it.hasNext) {
val row = it.next()
val rowValues = new java.util.ArrayList[AnyRef](row.length)