Skip to content

Instantly share code, notes, and snippets.

View deeperunderstanding's full-sized avatar
Tinkering

Danny J. deeperunderstanding

Tinkering
  • Berlin
View GitHub Profile
@Component
class CoinbaseWebsocket @Autowired constructor(
@Value("\${cbpro.websocket.baseurl}") val websocketUrl: String
) {
private final val mapper = ObjectMapper()
private final val logger = LoggerFactory.getLogger(javaClass)
private final val client = ReactorNettyWebSocketClient()
init { client.maxFramePayloadLength = Int.MAX_VALUE }
@Component
class TickRecordingService
@Autowired constructor(val socket: CoinbaseWebsocket,
val tickRepository: TickRepository) {
private val logger = LoggerFactory.getLogger(javaClass)
fun recordProducts(products: Array<String>): Flux<List<Message>> {
logger.info("Start Recording Tick Stream for Products: $products")
@Component
class TickRepository(
@Value("\${db.influx.url}") val dburl : String,
@Value("\${db.influx.user}") val user : String,
@Value("\${db.influx.pw}") val pw : String,
@Value("\${db.influx.dbname}") val dbname : String
) {
private final val logger = LoggerFactory.getLogger(javaClass)
val influxDB = lazy { InfluxDBFactory.connect(dburl, user, pw).setDatabase(dbname) }
object WordCount extends MapReduce[String, String, Int, Int] {
override def mapper(line: String): Seq[KeyValue[String, Int]] =
"""[\w']+""".r.findAllIn(line).map { word => KeyValue(word, 1) }.toVector
override def reducer(key: String, values: Seq[Int]): KeyValue[String, Int] =
KeyValue(key, values.sum)
}
-- show running queries (pre 9.2)
SELECT procpid, age(clock_timestamp(), query_start), usename, current_query
FROM pg_stat_activity
WHERE current_query != '<IDLE>' AND current_query NOT ILIKE '%pg_stat_activity%'
ORDER BY query_start desc;
-- show running queries (9.2)
SELECT pid, age(clock_timestamp(), query_start), usename, query
FROM pg_stat_activity
WHERE query != '<IDLE>' AND query NOT ILIKE '%pg_stat_activity%'