Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package zentrope.hornet {
- import scala.actors._
- import org.hornetq.api.core._
- import org.hornetq.api.core.client._
- // ----------------------------------------------------------------------
- private object Util {
- def withWarning(body: => Unit) = {
- // Wraps an exceptional condition we want to warn
- // about, but let pass.
- try {
- body
- }
- catch {
- case (th: Throwable) => {
- println("WARN: " + th.toString())
- }
- }
- }
- def withLoggedExceptions(prompt: String)(body: => Unit) = {
- // Executes BODY, and logs exceptions, if there are any, but
- // doesn't do anything special to handle them.
- try {
- body
- }
- catch {
- case (th: Throwable) =>
- println("ERROR: (when '" + prompt + "')") ;
- println("ERROR: " + th.toString) ;
- // th.printStackTrace()
- }
- }
- }
- // ----------------------------------------------------------------------
- case class HornetMessage(properties: Map[String,String], body: String)
- // ----------------------------------------------------------------------
- class Settings(
- val discovery: String = "231.7.7.7",
- val port: Int = 9876,
- val user: String = "guest",
- val pass: String = "guest")
- {
- val xa: Boolean = false
- val autoCommitSends: Boolean = true
- val autoCommitAcks: Boolean = true
- val preAcknowledge: Boolean = false
- val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE
- override
- def toString(): String = {
- format("<settings::d:%s,p:%d,u:%s,p:%s>", discovery, port, user, pass)
- }
- }
- // ----------------------------------------------------------------------
- protected class Connection(factory: ClientSessionFactory, session: ClientSession) {
- // A HornetQ connection, stuff common to both consumers and
- // producers.
- override def toString(): String =
- String.format("<%s:@%x>", this.getClass().getSimpleName(),
- hashCode.asInstanceOf[AnyRef])
- def start() {
- session.start()
- }
- def stop() {
- Util.withLoggedExceptions("stopping session") {
- session.stop()
- }
- Util.withLoggedExceptions("closing session") {
- session.close()
- }
- Util.withLoggedExceptions("closing factory") {
- factory.close()
- }
- }
- }
- // ----------------------------------------------------------------------
- protected class ProducerConnection(factory: ClientSessionFactory, session: ClientSession,
- producer: ClientProducer) extends Connection(factory, session) {
- // A HornetQ connection, specialized for producers.
- val NON_DURABLE: Boolean = false
- val DURABLE: Boolean = true
- def send(msg: HornetMessage) {
- val clientMessage: ClientMessage = session.createMessage(NON_DURABLE)
- clientMessage.getBodyBuffer().writeString(msg.body)
- msg.properties foreach { case (key, value) =>
- clientMessage.putStringProperty(key, value)
- }
- producer.send(clientMessage)
- }
- override def stop() {
- super.stop()
- Util.withLoggedExceptions("closing producer " + this) {
- producer.close()
- }
- }
- }
- // ----------------------------------------------------------------------
- protected class ConsumerConnection(factory: ClientSessionFactory, val session: ClientSession,
- consumer: ClientConsumer) extends Connection(factory, session) {
- // A HornetQ connection, specialized for consumers.
- override def stop() {
- super.stop()
- Util.withLoggedExceptions("closing consumer " + this) {
- consumer.close()
- }
- }
- }
- // ----------------------------------------------------------------------
- private object HornetQ {
- // A convenience class to make it seem as if using HornetQ from
- // Scala is really easy. ;)
- import scala.collection.JavaConversions._
- private type Consumer = ClientConsumer
- private type Factory = ClientSessionFactory
- private type Session = ClientSession
- private type Producer = ClientProducer
- private def getFactory(settings: Settings): Factory = {
- val factory: Factory =
- HornetQClient.createClientSessionFactory(settings.discovery,
- settings.port)
- factory.setDiscoveryInitialWaitTimeout(20000)
- factory.setBlockOnAcknowledge(true)
- factory.setBlockOnNonDurableSend(true)
- factory.setBlockOnDurableSend(true)
- factory.setReconnectAttempts(-1)
- factory.setRetryInterval(2000)
- factory.setConnectionTTL(-1) // Don't die of no data
- factory
- }
- private def getSession(settings: Settings, factory: Factory): Session = {
- factory.createSession(settings.user,
- settings.pass, settings.xa, settings.autoCommitSends,
- settings.autoCommitAcks, settings.preAcknowledge, settings.ackBatchSize)
- }
- private def createTempQueue(session: Session, address: String, queue: String) {
- Util.withWarning {
- session.createQueue(address, queue, false)
- }
- }
- private class Handler(session: Session, func: (HornetMessage) => Unit) extends MessageHandler {
- // Needs a session if we want to rollback
- def onMessage(msg: ClientMessage): Unit = {
- try {
- val msgBody = msg.getBodyBuffer().readString()
- var properties = Map[String, String]()
- msg.getPropertyNames() foreach { name =>
- val value = msg.getStringProperty(name)
- var key = name.toString()
- properties = properties + ((key, value))
- }
- func(HornetMessage(properties, msgBody))
- msg.acknowledge()
- }
- catch {
- case (th: Throwable) => {
- println("ERROR: [msg.receive] " + th.toString)
- try {
- session.rollback()
- }
- catch {
- case (th: Throwable) => {
- println("ERROR: [msg.rollback] " + th.toString)
- }
- }
- }
- }
- }
- }
- // Shortcut type for a function that takes a message and returns void
- private type HandlerFunc = (HornetMessage) => Unit
- private def createConsumer(session: Session, queue: String, handler: HandlerFunc): Consumer = {
- val consumer = session.createConsumer(queue)
- consumer.setMessageHandler(new Handler(session, handler))
- return consumer
- }
- private def createProducer(s: Session, a: String) =
- s.createProducer(a)
- private def swallow(function: => Unit) = {
- try {
- function
- }
- catch {
- case (th: Throwable) => ; // ignore
- }
- }
- private
- def cleanFactory(f: Option[Factory]) = f match {
- case Some(x) => swallow { x.close() }
- case None => ;
- }
- private
- def cleanSession(s: Option[Session]) = s match {
- case Some(x) => swallow { x.close() }
- case None => ;
- }
- private
- def cleanProducer(p: Option[Producer]) = p match {
- case Some(x) => swallow { x.close() }
- case None => ;
- }
- private
- def cleanConsumer(c: Option[Consumer]) = c match {
- case Some(x) => swallow { x.close() }
- case None => ;
- }
- override def toString(): String = "<HornetQ>"
- def getConsumerConnection(settings: Settings, address: String,
- queue: String, handler: HandlerFunc): ConsumerConnection = {
- println(this + " getting consumer connection")
- var factory: Option[Factory] = None
- var session: Option[Session] = None
- var consumer: Option[Consumer] = None
- var conn: Option[ConsumerConnection] = None
- try {
- factory = Some(getFactory(settings))
- session = Some(getSession(settings, factory.get))
- // TODO: Have a way to indicate what kind of queue to create
- createTempQueue(session.get, address, queue)
- consumer = Some(createConsumer(session.get, queue, handler))
- conn = Some(new ConsumerConnection(factory.get, session.get, consumer.get))
- }
- catch {
- case (th: Throwable) => {
- println("warn: Unable to connect: " + th.getMessage())
- // Gotta close the resources, or the underlying
- // HornetQ objects will leak.
- cleanFactory(factory)
- cleanSession(session)
- cleanConsumer(consumer)
- throw th
- }
- }
- return conn.get
- }
- def getProducerConnection(settings: Settings, address: String): ProducerConnection = {
- println(this + " getting producer connection")
- var conn: Option[ProducerConnection] = None
- var factory: Option[Factory] = None
- var session: Option[Session] = None
- var producer: Option[Producer] = None
- try {
- factory = Some(getFactory(settings))
- session = Some(getSession(settings, factory.get))
- producer = Some(createProducer(session.get, address))
- val p = new ProducerConnection(factory.get, session.get, producer.get)
- conn = Some(p)
- }
- catch {
- case (th: Throwable) => {
- println(this + " warn: Unable to producer.connect: " + th.getMessage())
- cleanFactory(factory)
- cleanSession(session)
- cleanProducer(producer)
- throw th
- }
- }
- return conn.get
- }
- }
- // ----------------------------------------------------------------------
- class Consumer(
- val settings: Settings,
- val address: String,
- val queue: String,
- handler: (HornetMessage) => Unit)
- {
- var conn: Option[ConsumerConnection] = None
- override def toString(): String = {
- return "<consumer:" + address + ">"
- }
- private def connect(): Unit = {
- // Keep trying to get a good connection, no matter what.
- while (conn == None) {
- try {
- conn = Some(HornetQ.getConsumerConnection(settings, address, queue, handler))
- conn.get.start()
- println(this + " we got a connection")
- }
- catch {
- case (th: Throwable) => {
- println(this + " " + th.toString)
- println(this + " waiting 2000 millis")
- Thread.sleep(2000)
- if (conn != None) {
- conn.get.stop() // Will clean up resources even if not running.
- conn = None
- }
- }
- }
- }
- }
- def startUp(): Unit = {
- println ("starting " + this)
- connect()
- }
- def shutDown(): Unit = {
- println ("stopping " + this)
- conn match {
- case None => ;
- case Some(c) => c.stop()
- }
- }
- }
- // ----------------------------------------------------------------------
- class Producer(val settings: Settings, val address: String) {
- var conn: Option[ProducerConnection] = None
- override def toString(): String = {
- return "<producer:" + address + ">"
- }
- private def connect(): Unit = {
- // Keep trying to get a good connection, no matter what.
- while (conn == None) {
- println (this + " attempting to connect")
- try {
- conn = Some(HornetQ.getProducerConnection(settings, address))
- conn.get.start()
- println(this + " we got a connection")
- }
- catch {
- case (th: Throwable) => {
- println(this + " " + th.toString)
- if (conn != None)
- conn.get.stop() // Will clean up resources even if not running.
- println(this + " waiting 2000 millis")
- Thread.sleep(2000)
- }
- }
- }
- }
- def send(message: String): Unit = {
- send(Map.empty, message)
- }
- def send(properties: Map[String, String], message: String): Unit = {
- var retry = true
- while (retry) {
- try {
- conn match {
- case None =>
- throw new Exception("Producer not started or connection down.")
- case Some(c) =>
- c.send(HornetMessage(properties, message)) ;
- // println("message successfully sent") ;
- retry = false
- }
- }
- catch {
- case (th: Throwable) => {
- println(this + " " + th.toString)
- if (conn != None)
- conn.get.stop() // Will clean up resources even if not running.
- println(this + " trying to reconnect after a failed send")
- conn = None
- connect()
- }
- }
- }
- }
- def startUp(): Unit = {
- println("starting " + this)
- connect()
- }
- def shutDown(): Unit = {
- conn match {
- case None => ;
- case Some(c) => c.stop()
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment