Skip to content

Instantly share code, notes, and snippets.

View joan38's full-sized avatar

Joan Goyeau joan38

View GitHub Profile
@joan38
joan38 / CaseClassWrapper.scala
Last active August 29, 2015 14:08
Gremlin vertex to case class and case class saver
package gremlin.scala.caseclass
import com.tinkerpop.gremlin.scala._
import com.tinkerpop.gremlin.structure.Vertex
import shapeless.HList
import scala.reflect._
import scala.reflect.runtime.universe._
/**
* Created by Joan on 24/10/2014.
case class Param[T](name: String)
trait ParamDecoder[T] {
def apply(in: String): T
}
object ParamDecoder {
implicit val stringDecoder = new ParamDecoder[String] { def apply(in: String) = in }
implicit val intDecoder = new ParamDecoder[Int] { def apply(in: String) = in.toInt }
}
case class Id[Resource](value: String) extends AnyVal
import java.time.Instant
import java.net.URI
case class User(id: Id[User], updatedOn: Instant, image: URI, nickname: String, verified: Boolean, deleted: Boolean)
case class Post(id: Id[Post], updatedOn: Instant, author: Id[User], text: String, image: URI, deleted: Boolean)
case class Like(userId: Id[User], postId: Id[Post], updatedOn: Instant, unliked: Boolean)
case class Comment(id: Id[Comment], postId: Id[Post], updatedOn: Instant, author: Id[User], text: String, deleted: Boolean)
case class DenormalisedPost(post: Post, author: User, interactions: Interactions)
case class Interactions(likes: Set[Like], comments: Int)
import com.goyeau.kafka.streams.circe.CirceSerdes._
val config = new Properties()
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
val postProducer = new KafkaProducer(
config,
CirceSerdes.serializer[Id[Post]],
CirceSerdes.serializer[Post]
)
val post = Post(Id[Post]("post0"), Instant.now(), Id[User]("user0"), "Some text", URI.create("https://some-uri"), false)
producer.send(new ProducerRecord("posts", null, post.updatedOn, post.id, post)).get()
trait Record[K, V] {
def topic: String
def key(value: V): K
def timestamp(value: V): Long
}
object Producer {
def apply[V] = new ProducerBuilder[V]
class ProducerBuilder[V] {
def apply[K](config: Properties)(implicit record: Record[K, V],
object Post {
implicit val record: Record[Id[Post], Post] = new Record[Id[Post], Post] {
val topic = "posts"
def key(post: Post): Id[Post] = post.id
def timestamp(post: Post): Long = post.updatedOn.toEpochMilli
}
}
val streamsBuilder = new StreamsBuilder()
val postsStream = streamsBuilder.stream[Id[Post], Post]("posts")