Skip to content

Instantly share code, notes, and snippets.

@codedmart
Created March 26, 2015 14:49
Show Gist options
  • Select an option

  • Save codedmart/3cb95f5463be1189fc8c to your computer and use it in GitHub Desktop.

Select an option

Save codedmart/3cb95f5463be1189fc8c to your computer and use it in GitHub Desktop.
trait Rql {
// val DEFAULT_HOST = "localhost"
// val DEFAULT_PORT_DRIVER = 28015
def connect: Connection
def get(db:String, table: String): String = {
// Table example
val query = Json.array(
jNumber(1), Json.array(
jNumber(15), Json.array(
jString(table)
)
),
Json.obj(
"db" -> Json.array(
jNumber(14), Json.array(
jString(db)
)
)
)
)
connect.run(query)
}
}
case class R(host: String, port: Int, db: String = null, auth: String = null) extends Rql {
val address = new InetSocketAddress(host, port)
val dbName = getOption[String](db)
val authKey = getOption[String](auth)
def connect = Connection(address, dbName, authKey)
}
case class Connection(address: InetSocketAddress, dbName: Option[String], authKey: Option[String]) {
private var socket: Socket = _
private var in: InputStream = _
private var out: OutputStream = _
private val token: AtomicInteger = new AtomicInteger()
def nextToken = token.incrementAndGet
reconnect()
def reconnect(): this.type = {
if (isOpen) close()
Try(new Socket(address.getAddress, address.getPort)) match {
case Success(sock) => {
socket = new Socket(address.getAddress, address.getPort)
socket.setKeepAlive(true)
socket.setTcpNoDelay(true)
in = socket.getInputStream()
out = socket.getOutputStream()
handshake()
}
case _ => {
val message = "Could not connect to %s.".format(address)
throw new RethinkDbConnectionError(message)
}
}
def handshake() {
val version = Wire.to(VersionDummy.V0_3)
val protocol = Wire.to(VersionDummy.JSON)
val auth = authKey match {
case None => ""
case Some(str) => str
}
val authLength = authKey match {
case None => 0
case Some(str) => str.length
}
val message = pack(version) ++ pack(authLength) ++ auth.getBytes ++ pack(protocol)
out.write(message)
out.flush()
val response = recvNullTerminatedString()
if ("SUCCESS" != new String(response, "US-ASCII")) {
throw new RethinkDbConnectionError("Error establishing driver handshake")
}
}
this
}
//def use(name: String): Connection = {
//this.defaultDb = name
//this
//}
def close(): Unit = {
if (!isOpen) println("Connection is closed.")
in.close()
out.close()
socket.close()
}
// TODO: This is not how this should actually be
def run(query: argonaut.Json): String = {
val token = nextToken
val queryLength = query.toString.length
val message = pack(token, 8) ++ pack(queryLength) ++ query.toString.getBytes
out.write(message)
out.flush()
val resToken = recvAll(8)
val resLength = recvAll(4)
val resMessage = recvAll(unpack(resLength))
new String(resMessage, "US-ASCII")
}
def isOpen: Boolean = {
socket != null && !socket.isClosed && socket.isConnected
}
private def recvAll(length: Int): Array[Byte] = {
Stream.continually(in.read())
.take(length)
.map(_.toByte)
.toArray
}
private def recvNullTerminatedString(): Array[Byte] = {
Stream.continually(in.read())
.takeWhile(_ != 0)
.map(_.toByte)
.toArray
}
}
import com.scalaRethinkdb.{R => r}
r.connect("localhost", 28015)
// Error
value connect is not a member of object com.scalaRethinkdb.R
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment