bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
... wait a minute ...
| # "%200O" is required to print a full name of the "origin", i.e. PPA name | |
| aptitude search '?narrow(?installed, ~Oppa)' -F "%200O" | sort -u |
| #!/usr/bin/env bash | |
| topic-size() { kafka-log-dirs --command-config /opt/kafka/ssl/client.txt --bootstrap-server server:9093 --topic-list ${1} --describe | tail -n1 | jq '.brokers[0].logDirs[0].partitions | map(.size/1000000000) | add' | xargs echo ${1} =; } | |
| list-topics() { kafka-topics --command-config /opt/kafka/ssl/client.txt --bootstrap-server server:9093 --list; } | |
| export -f topic-size | |
| TEMP_FILE=$(mktemp) | |
| list-topics | xargs -I{} bash -c 'topic-size "{}"' > $TEMP_FILE | |
| sort -g -k3 $TEMP_FILE | |
| rm $TEMP_FILE |
| Homebrew build logs for cuetools on Debian GNU/Linux 9.9 (stretch) | |
| Build date: 2019-06-25 17:45:47 |
| import org.apache.spark.sql.avro.SchemaConverters | |
| SchemaConverters.toAvroType(df.schema) // add .toString if you need JSON here |
| scala> println("This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211") | |
| This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211 | |
| scala> println(spark.version) | |
| 2.4.0 | |
| scala> val sq = spark.readStream.format("rate").load | |
| sq: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] | |
| scala> :type sq |
| #!/usr/bin/env bash | |
| # RUN THIS ON EACH CASSANDRA NODE! | |
| DEBUG=${DEBUG:-true} # change to false or run as 'DEBUG=false backup_restore_cassandra.sh' in prod | |
| CQLSH=${CQLSH:-cqlsh} # pass required parameters if needed | |
| KEYSPACE_NAME=${KEYSPACE_NAME:-profile} | |
| TABLE_NAME=${TABLE_NAME:-device} | |
| SNAPSHOT_TAG=${SNAPSHOT_TAG:-${TABLE_NAME}_`date +%Y%m%d_%H%M%S`} | |
| KEYSPACE_DIRS="/dcos/volume*/${KEYSPACE_NAME}" # change appropriately! |
| #!/usr/bin/env python | |
| # pip install kafka-python | |
| from kafka import SimpleConsumer, KafkaClient | |
| servers = ('broker-01:9092,' | |
| 'broker-02:9092) | |
| topic_name = "test.topic1" | |
| offsets = {"17":553593369,"8":553142567,"11":562669633,"20":561215743,"2":2661087706,"5":2663616824,"14":561171342,"13":567403099,"4":2653875446,"16":554258518,"7":545144724,"1":2692486549,"10":557397175,"19":534819310,"18":548724039,"9":559537595,"3":2720217023,"12":548273786,"15":547916993,"6":2693124039,"0":2687886815} | |
| group_id = "issue_finder" |
| """{"acp_prod.devices": {""" + df.select($"partition", $"offset").groupBy($"partition").agg(max($"offset")).as[(Int, Long)].collect.map{case (p, o) => s""""$p": $o"""}.mkString(",") + "}}" |
bin/kafka-topics.sh --zookeeper localhost:2181 --list
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
... wait a minute ...