Skip to content

Instantly share code, notes, and snippets.

View geoHeil's full-sized avatar
💭
🚀

geoHeil geoHeil

💭
🚀
View GitHub Profile
@geoHeil
geoHeil / Foo.scala
Last active July 23, 2020 08:44
failing
// setup in bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.11.1/flink-connector-kafka_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.11.1/flink-connector-kafka-base_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.11.1/flink-avro-confluent-registry-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.1/flink-avro-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.11.1/force-shading-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.1/jackson-core-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-d
@geoHeil
geoHeil / Tweet.scala
Created June 29, 2020 12:16
new specific type class
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.github.geoheil.streamingreference
import scala.annotation.switch
final case class Tweet(var tweet_id: Option[String], var text: Option[String], var source: Option[String], var geo: Option[String], var place: Option[String], var lang: Option[String], var created_at: Option[String], var timestamp_ms: Option[String], var coordinates: Option[String], var user_id: Option[Long], var user_name: Option[String], var screen_name: Option[String], var user_created_at: Option[String], var followers_count: Option[Long], var friends_count: Option[Long], var user_lang: Option[String], var user_location: Option[String], var hashtags: Option[Seq[String]]) extends org.apache.avro.specific.SpecificRecordBase {
def this() = this(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
version: "3"
services:
nifi:
image: apache/nifi:1.11.4
ports:
- 8080:8080 # Unsecured HTTP Web Port
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_CLUSTER_IS_NODE=true
- NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082
@geoHeil
geoHeil / docker-compose.yml
Last active June 8, 2020 20:13
NiFi cannot connect to docker / fails to put record with timeout exception
version: "3"
services:
zookeeper: # the configuration manager
hostname: zookeeper
container_name: zookeeper
image: 'bitnami/zookeeper:3.6.1'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
nifi:
image: apache/nifi:1.11.4
@geoHeil
geoHeil / docker-compose.yml
Created June 8, 2020 16:42
nifi docker volume persistence
version: "3"
services:
zookeeper: # the configuration manager
hostname: zookeeper
container_name: zookeeper
image: 'bitnami/zookeeper:3.6.1'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
nifi:
image: apache/nifi:1.11.4
@geoHeil
geoHeil / docker-compose.yml
Created June 7, 2020 17:47
NiFi & Registry in docker with (not working) persistence
version: "3"
services:
nifi:
hostname: demo.nifi
image: apache/nifi:1.11.4
container_name: nifi
ports:
- 8080:8080
- 443:8443
links:
%pylab inline
import pandas as pd
import geopandas as gp
import seaborn as sns; sns.set()
import numpy as np
from h3 import h3
import monix.eval.Task
import monix.execution.Cancelable
import monix.execution.cancelables.CompositeCancelable
import scala.util.{Failure, Success, Try}
object Foo extends App {
val things = Range(1, 10)
val c = CompositeCancelable()
@geoHeil
geoHeil / foo.scala
Last active September 20, 2019 14:23
spark custom File Stream Source
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
class StatefulFileStreamOptions(parameters: CaseInsensitiveMap[String])
extends FileStreamOptions(parameters) {
def this(parameters: Map[String, String]) =
this(CaseInsensitiveMap(parameters))
@geoHeil
geoHeil / foo.scala
Last active August 24, 2019 12:11
azure event hub captured avro file parsing in spark
// using from_json
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StringType
val schema = spark.read.json(df.select("Body").as[String]).schema
val otherColumns = df.drop("Body").columns.map(col)
val combined = otherColumns :+ from_json(col("Body").cast(StringType), schema).alias("Body_parsed")
val result = df.select(combined:_*)