Skip to content

Instantly share code, notes, and snippets.

@thaylongs
Created December 19, 2017 01:17
Show Gist options
  • Save thaylongs/488caa8499da23d1e604faf4abcc3665 to your computer and use it in GitHub Desktop.
Save thaylongs/488caa8499da23d1e604faf4abcc3665 to your computer and use it in GitHub Desktop.
package br.uff.spark.utils
import java.util.concurrent.TimeUnit
import util.control.Breaks._
import br.uff.spark.DataSource
import com.datastax.driver.core.{BatchStatement, Statement}
class SimpleThreadPool(name: String) extends Thread(name) {
val queue = new java.util.concurrent.ConcurrentLinkedQueue[() => Statement]()
var isRunning = true
var done = false
val con = DataSource.getConnection
val BATCH_SIZE = 20
override def run(): Unit = {
while ((isRunning || !queue.isEmpty) && !isInterrupted) {
val batch = new BatchStatement
var i = 0
breakable {
while (i < BATCH_SIZE) {
val next = queue.poll()
if (next != null) {
i += 1
batch.add(next.apply())
} else {
break
}
}
}
if (i > 0) {
con.executeAsync(batch)
} else {
Thread.sleep(50)
}
}
done = true
}
def shutdown(): Unit = synchronized {
isRunning = false
}
def awaitTermination(i: Int, timeUnit: TimeUnit): Unit = {
val startTime = System.currentTimeMillis()
while (!done && (System.currentTimeMillis() - startTime < timeUnit.toMillis(i))) {
Thread.sleep(100)
}
interrupt()
}
def execute(newData: () => Statement): Unit = {
queue.add(newData)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment