Skip to content

Instantly share code, notes, and snippets.

View natewave's full-sized avatar

Nizar S. natewave

View GitHub Profile
class KafkaCustomDecoder[T](implicit keyRule: RuleLike[AvroValue, T]) extends kafka.serializer.Decoder[VA[T]] with Logging {
import kafka.message.Message
import java.util.Properties
def fromAvro(value: GenericRecord) =
Avro.fromAvro[T](Avro.wrap(value))
def fromBytes(bytes: Array[Byte]): VA[T] = {
val props: Properties = new Properties()
props.put("schema.registry.url", "http://localhost:8081")
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE)
(keyRule:GenericRecord => VA[A], valueRule: GenericRecord => VA[B]): DStream[(A, B)] = {
val sparkConf = streamingContext.sparkContext.getConf
val appName = sparkConf.get("spark.app.name")
val offsetsCoordinator = OffsetsCoordinator.get(
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port),
clientId = consumerConfig.clientId,
groupId = consumerConfig.groupId)
val sRule = implicitly[RuleLike[AvroValue, String]]
val drRule = implicitly[RuleLike[AvroValue, DefectResult]]
def kTransform(g: GenericRecord) = RecordTransformations.convert(g, sRule)
def vTransform(g: GenericRecord) = RecordTransformations.convert(g, drRule)
val defectResults: DStream[(String, DefectResult)] = KafkaCommons.read[String, DefectResult](consumerConfig, ssc)(kTransform, vTransform)
class KafkaListener(saveProgress: Array[OffsetRange] => Boolean) extends SparkListener with Logging {
private val runningJobs = collection.mutable.Set.empty[SparkListenerJobStart]
private val kafkaRDDs = collection.mutable.Map.empty[Int, Array[OffsetRange]]
override def onJobStart(jobStart: SparkListenerJobStart) {
runningJobs += jobStart
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
runningJobs.find(_.jobId == jobEnd.jobId).map { job =>
import scala.concurrent.{ Future, ExecutionContext }
object GroupedAsync {
def sequencePar[K, V, R](fs: ((K, V)) => Future[(K, R)])(input: Seq[(K, V)])(implicit ec: ExecutionContext): Future[Seq[(K, R)]] = {
val result: Future[Seq[(K, R)]] = {
val grouped: Map[K, Seq[(K, V)]] = input.groupBy(_._1)
val futurePerKeyMap: Map[K, Future[Seq[(K, R)]]] = grouped.mapValues { elements =>
val l: Future[Seq[(K, R)]] = elements.foldLeft(Future(Seq.empty[(K, R)])) {
(previousFuture, next) =>
sealed trait NodeState
trait Follower extends NodeState
trait Candidate extends NodeState
trait Leader extends NodeState
final class NotLeader extends NodeState with Follower with Candidate
final class NotFollower extends NodeState with Candidate with Leader
final class NotCandidate extends NodeState with Follower with Candidate
class Node[State <: NodeState] private () {
def getCommittedOffsetsForConsumer(consumerConfig: KafkaConsumerConfig, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = {
val result = withCustomStringConsumer(consumer => {
consumer.assign(topicPartitions.toList.asJava)
val tps = topicPartitions.map( tp => {
val offsetAndMetadata = consumer.committed(tp)
val startOffset: Long = KafkaCluster.getLogBeginningOffset(consumerConfig.topic).values.head
Option(offsetAndMetadata) match {
case None => {
// log "TopicPartition $tp doesn't have committed offsets"
def upsert[Pk, A](dbContext: DbContext[Pk, A], x: A): Query[Unit, Connection] =
queryBuilder.write { implicit connection => implicit ec =>
val params = dbContext.params(x)
val keys = params.map(_.name)
val cols = keys.mkString("(", ",", ")")
val placeholders = keys.mkString("({", "},{", "})")
val values = keys.flatMap { key =>
params.find(_.name == key).map(DbContext.anormNamedParameter)
}
package natewave
import akka.Done
import scala.concurrent.Future
import scala.util.{Failure, Success}
object GraphTest extends App {
import akka.actor.ActorSystem
import akka.stream._