Created
August 27, 2010 20:40
-
-
Save zentrope/554162 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zentrope.hornet { | |
// This is a small app to see what it's like to work with | |
// HornetQ messaging in a Scala context. The main things I'm | |
// interested in are making working with HornetQ itself as a | |
// resource easier, and making confience methods, classes, data | |
// structures easier for anyone using this code. | |
import scala.actors._ | |
import scala.concurrent.ops.spawn | |
object Util { | |
def addShutdownHook(body: => Unit) = | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run { body } | |
}) | |
def withWarning(body: => Unit) = { | |
// Wraps an exceptional condition we want to warn | |
// about, but let pass. | |
try { | |
body | |
} | |
catch { | |
case (th: Throwable) => { | |
println("WARN: " + th.toString()) | |
} | |
} | |
} | |
def withLoggedExceptions(prompt: String)(body: => Unit) = { | |
// Executes BODY, and logs exceptions, if there are any, but | |
// doesn't do anything special to handle them. | |
try { | |
body | |
} | |
catch { | |
case (th: Throwable) => | |
println(" -- " + prompt) ; | |
println("WARN: " + th.toString) ; | |
th.printStackTrace() | |
} | |
} | |
} | |
object Conn { | |
// Common connection oriented properties. Hard coded | |
// for now. | |
import org.hornetq.api.core.client._ | |
val discovery: String = "231.7.7.7" | |
val port: Int = 9876 | |
val user: String = "guest" | |
val pass: String = "guest" | |
val xa: Boolean = false | |
val autoCommitSends: Boolean = true | |
val autoCommitAcks: Boolean = true | |
val preAcknowledge: Boolean = false | |
val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE | |
} | |
case class HornetMessage(body: String) | |
// Just the body for now. Later, add a map of properties. | |
class Consumer(address: String, queue: String) extends DaemonActor { | |
// For now, nothing's durable. | |
import org.hornetq.api.core._ | |
import org.hornetq.api.core.client._ | |
// Public API | |
def startUp() = | |
start | |
def shutDown() = | |
this ! ShutDownPlease | |
def add(observer: Actor): Unit = | |
// TODO: Throw an exception if the actor isn't yet started. | |
// TODO: Do we really need more than one receiver of these | |
// messages? Instead, just make a new consumer with a | |
// different queue. | |
this ! AddObserver(observer) | |
// Internal | |
private def delegateToObservers(msg: HornetMessage): Unit = { | |
this ! msg | |
} | |
private case object ShutDownPlease | |
private case class AddObserver(actor: Actor) | |
private class InboxHandler(consumer: Consumer) extends MessageHandler { | |
def onMessage(msg: ClientMessage): Unit = { | |
Util.withLoggedExceptions("receiving message") { | |
val body = msg.getBodyBuffer().readString() | |
consumer.delegateToObservers(HornetMessage(body)) | |
} | |
} | |
} | |
private def initFactory(): ClientSessionFactory = { | |
val factory: ClientSessionFactory = HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port) | |
factory.setDiscoveryInitialWaitTimeout(60000) | |
factory.setBlockOnAcknowledge(true) | |
factory.setBlockOnNonDurableSend(true) | |
factory.setBlockOnDurableSend(true) | |
return factory | |
} | |
private def initSession(factory: ClientSessionFactory): ClientSession = { | |
var session: ClientSession = factory.createSession(Conn.user, | |
Conn.pass, Conn.xa, Conn.autoCommitSends, | |
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize) | |
Util.withWarning { | |
// Throws exception if queue already created. | |
session.createTemporaryQueue(address, queue) | |
} | |
return session | |
} | |
private def initConsumer(session: ClientSession): ClientConsumer = { | |
var consumer: ClientConsumer = session.createConsumer(queue) | |
consumer.setMessageHandler(new InboxHandler(this)) | |
return consumer | |
} | |
private def close(factory: ClientSessionFactory, | |
session: ClientSession, | |
consumer: ClientConsumer) { | |
println("stopping " + this) | |
Util.withLoggedExceptions("closing factory " + this) { | |
if (factory != null) | |
factory.close() | |
} | |
Util.withLoggedExceptions("closing session " + this) { | |
if (session != null) | |
session.close() | |
} | |
Util.withLoggedExceptions("closing producer " + this) { | |
if (consumer != null) | |
consumer.close() | |
} | |
} | |
def act() { | |
val factory = initFactory() | |
val session = initSession(factory) | |
val consumer = initConsumer(session) | |
session.start() | |
var observers: List[Actor] = List() | |
loop { | |
react { | |
case AddObserver(observer) => { | |
println(this + " adding observer " + observer) | |
observers = observer :: observers | |
} | |
case HornetMessage(body) => { | |
observers foreach ( observer => | |
// println("sending " + observer + " a message ") | |
observer ! HornetMessage(body) | |
) | |
} | |
case ShutDownPlease => { | |
println(this + " is shutting down") | |
close(factory, session, consumer) | |
exit | |
} | |
case unknown => { | |
println(this + " recvd unknown: " + unknown) | |
} | |
} | |
} | |
} | |
} | |
class Producer(val address: String) extends DaemonActor { | |
// For now, nothing's durable. | |
// And no fault tolerance. | |
// UGLY! | |
// For later: Have a "supervisor" actor | |
// which monitors for when a "producer" crashes. When that happens, | |
// the supervisor restarts it. Separate the concern for dealing with | |
// errors from the thing that has the errors. | |
import org.hornetq.api.core._ | |
import org.hornetq.api.core.client._ | |
private case object STOP_ACTOR | |
private case class SEND_MESSAGE(body: String) | |
def startUp() { | |
println("connecting " + this) | |
this.start | |
} | |
def shutDown() { | |
println("disconnecting " + this); | |
this ! STOP_ACTOR | |
} | |
override | |
def toString() : String = | |
"<producer:a=" + address + ">" | |
def send(body: String): Unit = { | |
// Let's just send only payloads: no properties. | |
this ! SEND_MESSAGE(body) | |
} | |
private | |
def close(factory: ClientSessionFactory, | |
session: ClientSession, | |
producer: ClientProducer) { | |
println("stopping " + this) | |
Util.withLoggedExceptions("closing factory " + this) { | |
if (factory != null) | |
factory.close() | |
} | |
Util.withLoggedExceptions("closing session " + this) { | |
if (session != null) | |
session.close() | |
} | |
Util.withLoggedExceptions("closing producer " + this) { | |
if (producer != null) | |
producer.close() | |
} | |
} | |
def act() { | |
var factory: ClientSessionFactory = | |
HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port) | |
factory.setDiscoveryInitialWaitTimeout(60000) | |
factory.setBlockOnNonDurableSend(true) | |
var session: ClientSession = factory.createSession(Conn.user, Conn.pass, | |
Conn.xa, Conn.autoCommitSends, | |
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize) | |
var producer: ClientProducer = session.createProducer(address) | |
session.start() | |
loop { | |
react { | |
case SEND_MESSAGE(body) => { | |
Util.withLoggedExceptions("sending message to " + address) { | |
val msg: ClientMessage = session.createMessage(false) | |
msg.getBodyBuffer().writeString(body) | |
producer.send(msg) | |
} | |
} | |
case STOP_ACTOR => { | |
close(factory, session, producer) | |
exit | |
} | |
} | |
} | |
} | |
} | |
class PresenceSubscriber extends DaemonActor { // Need a "HornetActor" type | |
private case object GiveItUp | |
def startUp() = start | |
def shutDown() = | |
this ! GiveItUp | |
def act() { | |
loop { | |
react { | |
case HornetMessage(body) => { | |
println("GOT MESSAGE: " + body) | |
} | |
case GiveItUp => { | |
println(" -- shutting down " + this) | |
exit | |
} | |
case unknown => | |
println(this + " got unknown message: " + unknown) | |
} | |
} | |
} | |
} | |
class PresenceNotifier extends DaemonActor { | |
private case object StopActor | |
private case object StartActor | |
def startUp() = | |
start() | |
def shutDown() = | |
this ! StopActor | |
private def timeout(): Unit = { | |
val msg = " ---> ping: " + this + " <---- " | |
println("sending [" + msg + "]") | |
PRESENCE_TOPIC.send(msg) | |
} | |
def act() { | |
// def producer = new Producer("test.scala") | |
// producer.startUp() | |
def fiveSeconds = 5000 | |
loop { | |
reactWithin(fiveSeconds) { | |
case TIMEOUT => timeout() | |
case StopActor => { | |
exit() | |
} | |
} | |
} | |
} | |
} | |
class Lock { | |
var o: AnyRef = new Object() | |
def acquire() : Unit = { | |
o.synchronized { | |
o.wait() | |
} | |
} | |
def release() : Unit = { | |
o.synchronized { | |
o.notifyAll() | |
} | |
} | |
} | |
object PRESENCE_TOPIC extends Producer("scala.presence") {} | |
// So that we can reference this "statically" for those | |
// actors that want to send presence information. | |
object PRESENCE_CLIENT extends Consumer("scala.presence", "scala.queue") | |
object Main { | |
val lock: Lock = new Lock() | |
def main(args: Array[String]) = { | |
println("hello") | |
// Because the notifier and the "producer" are both actors, | |
// you can't start one inside the other. | |
PRESENCE_TOPIC.startUp() | |
PRESENCE_CLIENT.startUp() | |
val c = new PresenceSubscriber() | |
PRESENCE_CLIENT.add(c) | |
val p = new PresenceNotifier() | |
Util.addShutdownHook { | |
println("\nInvoking shutdown handler.") | |
println("PRESENCE_TOPIC") | |
PRESENCE_TOPIC.shutDown() | |
println("PRESENCE_CLIENT") | |
PRESENCE_CLIENT.shutDown() | |
println("PRODUCER") | |
p.shutDown | |
println("CONSUMER") | |
c.shutDown | |
println("Waiting 4 secs to give time for proper shutdowns.") | |
Thread.sleep(4000) | |
lock.release | |
} | |
c.startUp() | |
p.startUp | |
println("^C to shut down") | |
// Thread.sleep(30000) | |
lock.acquire | |
} | |
} | |
} |
New version of this here:
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Some really, really messy, fragile code with hints about how to integrate with a Java MQ system like HornetQ.
Two issues I learned:
Okay, notes to myself. But there you go.