Created
September 4, 2010 04:00
-
-
Save zentrope/564889 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zentrope.hornet { | |
import scala.actors._ | |
import org.hornetq.api.core._ | |
import org.hornetq.api.core.client._ | |
// ---------------------------------------------------------------------- | |
private object Util { | |
def withWarning(body: => Unit) = { | |
// Wraps an exceptional condition we want to warn | |
// about, but let pass. | |
try { | |
body | |
} | |
catch { | |
case (th: Throwable) => { | |
println("WARN: " + th.toString()) | |
} | |
} | |
} | |
def withLoggedExceptions(prompt: String)(body: => Unit) = { | |
// Executes BODY, and logs exceptions, if there are any, but | |
// doesn't do anything special to handle them. | |
try { | |
body | |
} | |
catch { | |
case (th: Throwable) => | |
println("ERROR: (when '" + prompt + "')") ; | |
println("ERROR: " + th.toString) ; | |
// th.printStackTrace() | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
case class HornetMessage(properties: Map[String,String], body: String) | |
// ---------------------------------------------------------------------- | |
class Settings( | |
val discovery: String = "231.7.7.7", | |
val port: Int = 9876, | |
val user: String = "guest", | |
val pass: String = "guest") | |
{ | |
val xa: Boolean = false | |
val autoCommitSends: Boolean = true | |
val autoCommitAcks: Boolean = true | |
val preAcknowledge: Boolean = false | |
val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE | |
override | |
def toString(): String = { | |
format("<settings::d:%s,p:%d,u:%s,p:%s>", discovery, port, user, pass) | |
} | |
} | |
// ---------------------------------------------------------------------- | |
protected class Connection(factory: ClientSessionFactory, session: ClientSession) { | |
// A HornetQ connection, stuff common to both consumers and | |
// producers. | |
override def toString(): String = | |
String.format("<%s:@%x>", this.getClass().getSimpleName(), | |
hashCode.asInstanceOf[AnyRef]) | |
def start() { | |
session.start() | |
} | |
def stop() { | |
Util.withLoggedExceptions("stopping session") { | |
session.stop() | |
} | |
Util.withLoggedExceptions("closing session") { | |
session.close() | |
} | |
Util.withLoggedExceptions("closing factory") { | |
factory.close() | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
protected class ProducerConnection(factory: ClientSessionFactory, session: ClientSession, | |
producer: ClientProducer) extends Connection(factory, session) { | |
// A HornetQ connection, specialized for producers. | |
val NON_DURABLE: Boolean = false | |
val DURABLE: Boolean = true | |
def send(msg: HornetMessage) { | |
val clientMessage: ClientMessage = session.createMessage(NON_DURABLE) | |
clientMessage.getBodyBuffer().writeString(msg.body) | |
msg.properties foreach { case (key, value) => | |
clientMessage.putStringProperty(key, value) | |
} | |
producer.send(clientMessage) | |
} | |
override def stop() { | |
super.stop() | |
Util.withLoggedExceptions("closing producer " + this) { | |
producer.close() | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
protected class ConsumerConnection(factory: ClientSessionFactory, val session: ClientSession, | |
consumer: ClientConsumer) extends Connection(factory, session) { | |
// A HornetQ connection, specialized for consumers. | |
override def stop() { | |
super.stop() | |
Util.withLoggedExceptions("closing consumer " + this) { | |
consumer.close() | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
private object HornetQ { | |
// A convenience class to make it seem as if using HornetQ from | |
// Scala is really easy. ;) | |
import scala.collection.JavaConversions._ | |
private type Consumer = ClientConsumer | |
private type Factory = ClientSessionFactory | |
private type Session = ClientSession | |
private type Producer = ClientProducer | |
private def getFactory(settings: Settings): Factory = { | |
val factory: Factory = | |
HornetQClient.createClientSessionFactory(settings.discovery, | |
settings.port) | |
factory.setDiscoveryInitialWaitTimeout(20000) | |
factory.setBlockOnAcknowledge(true) | |
factory.setBlockOnNonDurableSend(true) | |
factory.setBlockOnDurableSend(true) | |
factory.setReconnectAttempts(-1) | |
factory.setRetryInterval(2000) | |
factory.setConnectionTTL(-1) // Don't die of no data | |
factory | |
} | |
private def getSession(settings: Settings, factory: Factory): Session = { | |
factory.createSession(settings.user, | |
settings.pass, settings.xa, settings.autoCommitSends, | |
settings.autoCommitAcks, settings.preAcknowledge, settings.ackBatchSize) | |
} | |
private def createTempQueue(session: Session, address: String, queue: String) { | |
Util.withWarning { | |
session.createQueue(address, queue, false) | |
} | |
} | |
private class Handler(session: Session, func: (HornetMessage) => Unit) extends MessageHandler { | |
// Needs a session if we want to rollback | |
def onMessage(msg: ClientMessage): Unit = { | |
try { | |
val msgBody = msg.getBodyBuffer().readString() | |
var properties = Map[String, String]() | |
msg.getPropertyNames() foreach { name => | |
val value = msg.getStringProperty(name) | |
var key = name.toString() | |
properties = properties + ((key, value)) | |
} | |
func(HornetMessage(properties, msgBody)) | |
msg.acknowledge() | |
} | |
catch { | |
case (th: Throwable) => { | |
println("ERROR: [msg.receive] " + th.toString) | |
try { | |
session.rollback() | |
} | |
catch { | |
case (th: Throwable) => { | |
println("ERROR: [msg.rollback] " + th.toString) | |
} | |
} | |
} | |
} | |
} | |
} | |
// Shortcut type for a function that takes a message and returns void | |
private type HandlerFunc = (HornetMessage) => Unit | |
private def createConsumer(session: Session, queue: String, handler: HandlerFunc): Consumer = { | |
val consumer = session.createConsumer(queue) | |
consumer.setMessageHandler(new Handler(session, handler)) | |
return consumer | |
} | |
private def createProducer(s: Session, a: String) = | |
s.createProducer(a) | |
private def swallow(function: => Unit) = { | |
try { | |
function | |
} | |
catch { | |
case (th: Throwable) => ; // ignore | |
} | |
} | |
private | |
def cleanFactory(f: Option[Factory]) = f match { | |
case Some(x) => swallow { x.close() } | |
case None => ; | |
} | |
private | |
def cleanSession(s: Option[Session]) = s match { | |
case Some(x) => swallow { x.close() } | |
case None => ; | |
} | |
private | |
def cleanProducer(p: Option[Producer]) = p match { | |
case Some(x) => swallow { x.close() } | |
case None => ; | |
} | |
private | |
def cleanConsumer(c: Option[Consumer]) = c match { | |
case Some(x) => swallow { x.close() } | |
case None => ; | |
} | |
override def toString(): String = "<HornetQ>" | |
def getConsumerConnection(settings: Settings, address: String, | |
queue: String, handler: HandlerFunc): ConsumerConnection = { | |
println(this + " getting consumer connection") | |
var factory: Option[Factory] = None | |
var session: Option[Session] = None | |
var consumer: Option[Consumer] = None | |
var conn: Option[ConsumerConnection] = None | |
try { | |
factory = Some(getFactory(settings)) | |
session = Some(getSession(settings, factory.get)) | |
// TODO: Have a way to indicate what kind of queue to create | |
createTempQueue(session.get, address, queue) | |
consumer = Some(createConsumer(session.get, queue, handler)) | |
conn = Some(new ConsumerConnection(factory.get, session.get, consumer.get)) | |
} | |
catch { | |
case (th: Throwable) => { | |
println("warn: Unable to connect: " + th.getMessage()) | |
// Gotta close the resources, or the underlying | |
// HornetQ objects will leak. | |
cleanFactory(factory) | |
cleanSession(session) | |
cleanConsumer(consumer) | |
throw th | |
} | |
} | |
return conn.get | |
} | |
def getProducerConnection(settings: Settings, address: String): ProducerConnection = { | |
println(this + " getting producer connection") | |
var conn: Option[ProducerConnection] = None | |
var factory: Option[Factory] = None | |
var session: Option[Session] = None | |
var producer: Option[Producer] = None | |
try { | |
factory = Some(getFactory(settings)) | |
session = Some(getSession(settings, factory.get)) | |
producer = Some(createProducer(session.get, address)) | |
val p = new ProducerConnection(factory.get, session.get, producer.get) | |
conn = Some(p) | |
} | |
catch { | |
case (th: Throwable) => { | |
println(this + " warn: Unable to producer.connect: " + th.getMessage()) | |
cleanFactory(factory) | |
cleanSession(session) | |
cleanProducer(producer) | |
throw th | |
} | |
} | |
return conn.get | |
} | |
} | |
// ---------------------------------------------------------------------- | |
class Consumer( | |
val settings: Settings, | |
val address: String, | |
val queue: String, | |
handler: (HornetMessage) => Unit) | |
{ | |
var conn: Option[ConsumerConnection] = None | |
override def toString(): String = { | |
return "<consumer:" + address + ">" | |
} | |
private def connect(): Unit = { | |
// Keep trying to get a good connection, no matter what. | |
while (conn == None) { | |
try { | |
conn = Some(HornetQ.getConsumerConnection(settings, address, queue, handler)) | |
conn.get.start() | |
println(this + " we got a connection") | |
} | |
catch { | |
case (th: Throwable) => { | |
println(this + " " + th.toString) | |
println(this + " waiting 2000 millis") | |
Thread.sleep(2000) | |
if (conn != None) { | |
conn.get.stop() // Will clean up resources even if not running. | |
conn = None | |
} | |
} | |
} | |
} | |
} | |
def startUp(): Unit = { | |
println ("starting " + this) | |
connect() | |
} | |
def shutDown(): Unit = { | |
println ("stopping " + this) | |
conn match { | |
case None => ; | |
case Some(c) => c.stop() | |
} | |
} | |
} | |
// ---------------------------------------------------------------------- | |
class Producer(val settings: Settings, val address: String) { | |
var conn: Option[ProducerConnection] = None | |
override def toString(): String = { | |
return "<producer:" + address + ">" | |
} | |
private def connect(): Unit = { | |
// Keep trying to get a good connection, no matter what. | |
while (conn == None) { | |
println (this + " attempting to connect") | |
try { | |
conn = Some(HornetQ.getProducerConnection(settings, address)) | |
conn.get.start() | |
println(this + " we got a connection") | |
} | |
catch { | |
case (th: Throwable) => { | |
println(this + " " + th.toString) | |
if (conn != None) | |
conn.get.stop() // Will clean up resources even if not running. | |
println(this + " waiting 2000 millis") | |
Thread.sleep(2000) | |
} | |
} | |
} | |
} | |
def send(message: String): Unit = { | |
send(Map.empty, message) | |
} | |
def send(properties: Map[String, String], message: String): Unit = { | |
var retry = true | |
while (retry) { | |
try { | |
conn match { | |
case None => | |
throw new Exception("Producer not started or connection down.") | |
case Some(c) => | |
c.send(HornetMessage(properties, message)) ; | |
// println("message successfully sent") ; | |
retry = false | |
} | |
} | |
catch { | |
case (th: Throwable) => { | |
println(this + " " + th.toString) | |
if (conn != None) | |
conn.get.stop() // Will clean up resources even if not running. | |
println(this + " trying to reconnect after a failed send") | |
conn = None | |
connect() | |
} | |
} | |
} | |
} | |
def startUp(): Unit = { | |
println("starting " + this) | |
connect() | |
} | |
def shutDown(): Unit = { | |
conn match { | |
case None => ; | |
case Some(c) => c.stop() | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zentrope.testhornet { | |
// This is a small app to see what it's like to work with | |
// HornetQ messaging in a Scala context. The main things I'm | |
// interested in are making working with HornetQ itself as a | |
// resource easier, and making convenience methods, classes, data | |
// structures easier for anyone using this code. | |
import scala.actors._ | |
import zentrope.hornet._ | |
object Util { | |
def addShutdownHook(body: => Unit) = | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run { body } | |
}) | |
} | |
private case object GetOffStage | |
class PresenceReceiver extends DaemonActor { | |
// An actor for receiving messages from the "scala.presence" address. | |
def startUp() = { | |
start | |
} | |
def shutDown() = | |
this ! GetOffStage | |
def act() { | |
val settings = new Settings(user="guest", pass="guest") | |
val address = "scala.presence" | |
val tempQueue = "scala.test" | |
val handler = (msg: HornetMessage) => this ! msg | |
val inbox = new Consumer(settings, address, tempQueue, handler) | |
inbox.startUp | |
loop { react { | |
case HornetMessage(properties, body) => { | |
println("--------------------------------") | |
println("RECEIVE:") | |
properties foreach { case (key, value) => | |
println(format(" + property: %s -> %s", key, value)) | |
} | |
println(" + body: [" + body + "]") | |
println("--------------------------------") | |
} | |
case GetOffStage => { | |
println(" -- shutting down " + this) | |
inbox.shutDown | |
exit | |
} | |
case unknown => | |
println(this + " got unknown message: " + unknown) | |
}} | |
} | |
} | |
class PresenceSender extends DaemonActor { | |
// An actor for periodically sending "presence" messages to the | |
// HornetQ address: "scala.presence" | |
private var counter: Int = 1 | |
def startUp() = { | |
start() | |
} | |
def shutDown() = | |
this ! GetOffStage | |
private def timeout(outbox: Producer): Unit = { | |
// Message body | |
val msg = " msg: " + this + " (" + counter + ") " | |
// Message props | |
val props = Map[String, String] ( | |
"message-id" -> System.currentTimeMillis().toString(), | |
"counter" -> counter.toString() | |
) | |
outbox.send(props, msg) | |
counter += 1 | |
println("SENT: [" + msg + "]") | |
} | |
def act() { | |
def fiveSeconds = 5000 | |
// Named parameters! | |
val settings = new Settings(user="guest", pass="guest") | |
val outbox: Producer = new Producer(settings, "scala.presence") | |
outbox.startUp | |
loop { reactWithin(fiveSeconds) { | |
case TIMEOUT => | |
timeout(outbox) | |
case GetOffStage => { | |
outbox.shutDown | |
exit() | |
} | |
}} | |
} | |
} | |
class Lock { | |
// Super simple class to implement a lock on execution so this | |
// app will stay up until the user hits Control-C. | |
var o: AnyRef = new Object() | |
def acquire() : Unit = { | |
o.synchronized { | |
o.wait() | |
} | |
} | |
def release() : Unit = { | |
o.synchronized { | |
o.notifyAll() | |
} | |
} | |
} | |
object Main { | |
val lock: Lock = new Lock() | |
def main(args: Array[String]) = { | |
println("hello") | |
// Start a consumer | |
val consumer = new PresenceReceiver() | |
consumer.startUp | |
// Start a producer | |
val producer = new PresenceSender() | |
producer.startUp | |
// Make sure the consumer and producers get shut down | |
// when the user hits Control-C. | |
Util.addShutdownHook { | |
producer.shutDown() | |
consumer.shutDown() | |
Thread.sleep(4000) | |
lock.release | |
} | |
println("^C to shut down") | |
lock.acquire | |
System.exit(0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
HornetQ wrappers.
The second file is the example, which also shows the use of actors, one for sending out pings, another for receiving them. Fun!
The first file is a package with public objects to make it easy to interact with HornetQ: Publisher, Consumer, HornetMessage and Settings. These aren't really meant to be a "scala wrapper" for HornetQ, but just an example of wrapper HornetQ to make it simpler in a specific app. It's only useful if, for the most part, you're using sensible defaults and all you really want to do is consume, publish, and maybe change a few config options. Otherwise, this falls down.
Also, I've tried to build in some fault tolerance for the producer side of the equation. HornetQ consumers will wait forever for an instance to come back up, but producers won't. I'm not sure why. Perhaps I misunderstood some documentation somewhere?
The big learning for me was the use of the Option class, which helps, in a type safe way, do the sorts of things you might do with null. In other words, deal with uninitialized data types.
Unfortunately, figuring out a way to wrap Java libs is not really the best part about using Scala, but this is a nice little exercise anyway.
I think there's also room for improvement to try and exploit the OO features a bit more (which I generally don't do in Java because it makes things too spaghetti-like).