Last active
August 29, 2015 14:11
-
-
Save clayrat/7b6958ab9b6d1bd23a77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.stream.nio.{file => nio} | |
import scodec.bits.ByteVector | |
... | |
val uuidSource: Process[Task, String] = | |
Process.repeatEval( | |
Task.delay( | |
// uuid creation | |
) | |
) | |
def kafkaSink(producerConfig: ProducerConfig, topic: String): Sink[Task, String] = { | |
val producer = new Producer[String, String](producerConfig) | |
io.channel { msg => | |
Task.delay { | |
val data = new KeyedMessage[String, String](topic, msg, msg) | |
producer.send(data) | |
} | |
} | |
} | |
val path = "D:/" | |
val eol = ByteVector(Array(13,10).map(_.toByte)) | |
val fileSink: Sink[Task, Vector[String]] = io.channel { lines => | |
Task.delay { | |
val fname = lines.head concat ".txt" | |
val bytes = lines.map(line => ByteVector(line.getBytes)) | |
Process.emitAll(bytes).toSource.intersperse(eol).to(nio.chunkW(path concat fname)).run.run | |
} | |
} | |
def main(args: Array[String]) { | |
// config | |
implicit val scheduler = DefaultScheduler | |
val proc = | |
Process.awakeEvery(500 milliseconds) | |
.zipWith(uuidSource)((_, uuid) => uuid) | |
.observe(kafkaSink(config, topic)) | |
.chunk(1000) | |
.to(fileSink) | |
proc.run.run | |
} | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment