-
-
Save koen-dejonghe/39c10357607c698c0b04 to your computer and use it in GitHub Desktop.
| package net.atos.sparti.pub | |
| import java.io.PrintStream | |
| import java.net.Socket | |
| import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} | |
| import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory} | |
| import org.apache.spark.streaming.dstream.DStream | |
| class PooledSocketStreamPublisher[T](host: String, port: Int) | |
| extends Serializable { | |
| /** | |
| * Publish the stream to a socket. | |
| */ | |
| def publish (stream: DStream[T], callback: (T) => String) = | |
| stream foreachRDD ( rdd => | |
| rdd foreachPartition { partition => | |
| val pool = PrintStreamPool(host, port) | |
| partition foreach { event => | |
| val s = pool.printStream | |
| s println callback (event) | |
| } | |
| pool.release() | |
| } | |
| ) | |
| } | |
| class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) { | |
| def release() = pool.returnObject(printStream) | |
| } | |
| object PrintStreamPool { | |
| var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map() | |
| sys.addShutdownHook { | |
| hostPortPool.values.foreach { pool => pool.close() } | |
| } | |
| // factory method | |
| def apply(host: String, port: Int): ManagedPrintStream = { | |
| val pool = hostPortPool.getOrElse((host, port), { | |
| val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port)) | |
| hostPortPool += (host, port) -> p | |
| p | |
| }) | |
| new ManagedPrintStream(pool, pool.borrowObject()) | |
| } | |
| } | |
| class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] { | |
| override def create() = new PrintStream(new Socket(host, port).getOutputStream) | |
| override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream) | |
| override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError() | |
| override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close() | |
| override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush() | |
| } | |
16/06/24 13:40:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/24 13:42:17 ERROR Executor: Exception in task 16.0 in stage 1.0 (TID 17)
redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool
at redis.clients.util.Pool.returnResourceObject(Pool.java:65)
at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:113)
at util.RedisDB$.returnRedis(RedisDB.scala:58)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalStateException: Returned object not currently part of this pool
at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:537)
at redis.clients.util.Pool.returnResourceObject(Pool.java:63)
... 18 more
This way is not suit for Jedis?