Guest User

Untitled

a guest
Jun 10th, 2018
135
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.33 KB | None | 0 0
  1. package zentrope.hornet {
  2.  
  3. import scala.actors._
  4. import org.hornetq.api.core._
  5. import org.hornetq.api.core.client._
  6.  
  7. // ----------------------------------------------------------------------
  8.  
  9. private object Util {
  10.  
  11. def withWarning(body: => Unit) = {
  12. // Wraps an exceptional condition we want to warn
  13. // about, but let pass.
  14.  
  15. try {
  16. body
  17. }
  18.  
  19. catch {
  20. case (th: Throwable) => {
  21. println("WARN: " + th.toString())
  22. }
  23. }
  24. }
  25.  
  26. def withLoggedExceptions(prompt: String)(body: => Unit) = {
  27. // Executes BODY, and logs exceptions, if there are any, but
  28. // doesn't do anything special to handle them.
  29.  
  30. try {
  31. body
  32. }
  33.  
  34. catch {
  35. case (th: Throwable) =>
  36. println("ERROR: (when '" + prompt + "')") ;
  37. println("ERROR: " + th.toString) ;
  38. // th.printStackTrace()
  39. }
  40. }
  41. }
  42.  
  43. // ----------------------------------------------------------------------
  44.  
  45. case class HornetMessage(properties: Map[String,String], body: String)
  46.  
  47. // ----------------------------------------------------------------------
  48.  
  49. class Settings(
  50. val discovery: String = "231.7.7.7",
  51. val port: Int = 9876,
  52. val user: String = "guest",
  53. val pass: String = "guest")
  54. {
  55.  
  56. val xa: Boolean = false
  57. val autoCommitSends: Boolean = true
  58. val autoCommitAcks: Boolean = true
  59. val preAcknowledge: Boolean = false
  60. val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE
  61.  
  62. override
  63. def toString(): String = {
  64. format("<settings::d:%s,p:%d,u:%s,p:%s>", discovery, port, user, pass)
  65. }
  66. }
  67.  
  68. // ----------------------------------------------------------------------
  69.  
  70. protected class Connection(factory: ClientSessionFactory, session: ClientSession) {
  71.  
  72. // A HornetQ connection, stuff common to both consumers and
  73. // producers.
  74.  
  75. override def toString(): String =
  76. String.format("<%s:@%x>", this.getClass().getSimpleName(),
  77. hashCode.asInstanceOf[AnyRef])
  78.  
  79. def start() {
  80. session.start()
  81. }
  82.  
  83. def stop() {
  84. Util.withLoggedExceptions("stopping session") {
  85. session.stop()
  86. }
  87.  
  88. Util.withLoggedExceptions("closing session") {
  89. session.close()
  90. }
  91.  
  92. Util.withLoggedExceptions("closing factory") {
  93. factory.close()
  94. }
  95. }
  96.  
  97. }
  98.  
  99. // ----------------------------------------------------------------------
  100.  
  101. protected class ProducerConnection(factory: ClientSessionFactory, session: ClientSession,
  102. producer: ClientProducer) extends Connection(factory, session) {
  103.  
  104. // A HornetQ connection, specialized for producers.
  105.  
  106. val NON_DURABLE: Boolean = false
  107. val DURABLE: Boolean = true
  108.  
  109. def send(msg: HornetMessage) {
  110. val clientMessage: ClientMessage = session.createMessage(NON_DURABLE)
  111. clientMessage.getBodyBuffer().writeString(msg.body)
  112.  
  113. msg.properties foreach { case (key, value) =>
  114. clientMessage.putStringProperty(key, value)
  115. }
  116.  
  117. producer.send(clientMessage)
  118. }
  119.  
  120. override def stop() {
  121. super.stop()
  122. Util.withLoggedExceptions("closing producer " + this) {
  123. producer.close()
  124. }
  125. }
  126. }
  127.  
  128. // ----------------------------------------------------------------------
  129.  
  130. protected class ConsumerConnection(factory: ClientSessionFactory, val session: ClientSession,
  131. consumer: ClientConsumer) extends Connection(factory, session) {
  132.  
  133. // A HornetQ connection, specialized for consumers.
  134.  
  135. override def stop() {
  136. super.stop()
  137.  
  138. Util.withLoggedExceptions("closing consumer " + this) {
  139. consumer.close()
  140. }
  141. }
  142. }
  143.  
  144. // ----------------------------------------------------------------------
  145.  
  146. private object HornetQ {
  147.  
  148. // A convenience class to make it seem as if using HornetQ from
  149. // Scala is really easy. ;)
  150.  
  151. import scala.collection.JavaConversions._
  152.  
  153. private type Consumer = ClientConsumer
  154. private type Factory = ClientSessionFactory
  155. private type Session = ClientSession
  156. private type Producer = ClientProducer
  157.  
  158. private def getFactory(settings: Settings): Factory = {
  159. val factory: Factory =
  160. HornetQClient.createClientSessionFactory(settings.discovery,
  161. settings.port)
  162.  
  163. factory.setDiscoveryInitialWaitTimeout(20000)
  164. factory.setBlockOnAcknowledge(true)
  165. factory.setBlockOnNonDurableSend(true)
  166. factory.setBlockOnDurableSend(true)
  167.  
  168. factory.setReconnectAttempts(-1)
  169. factory.setRetryInterval(2000)
  170. factory.setConnectionTTL(-1) // Don't die of no data
  171.  
  172. factory
  173. }
  174.  
  175. private def getSession(settings: Settings, factory: Factory): Session = {
  176. factory.createSession(settings.user,
  177. settings.pass, settings.xa, settings.autoCommitSends,
  178. settings.autoCommitAcks, settings.preAcknowledge, settings.ackBatchSize)
  179. }
  180.  
  181. private def createTempQueue(session: Session, address: String, queue: String) {
  182. Util.withWarning {
  183. session.createQueue(address, queue, false)
  184. }
  185. }
  186.  
  187. private class Handler(session: Session, func: (HornetMessage) => Unit) extends MessageHandler {
  188. // Needs a session if we want to rollback
  189.  
  190. def onMessage(msg: ClientMessage): Unit = {
  191. try {
  192. val msgBody = msg.getBodyBuffer().readString()
  193. var properties = Map[String, String]()
  194.  
  195. msg.getPropertyNames() foreach { name =>
  196. val value = msg.getStringProperty(name)
  197. var key = name.toString()
  198. properties = properties + ((key, value))
  199. }
  200.  
  201. func(HornetMessage(properties, msgBody))
  202. msg.acknowledge()
  203. }
  204.  
  205. catch {
  206. case (th: Throwable) => {
  207. println("ERROR: [msg.receive] " + th.toString)
  208.  
  209. try {
  210. session.rollback()
  211. }
  212.  
  213. catch {
  214. case (th: Throwable) => {
  215. println("ERROR: [msg.rollback] " + th.toString)
  216. }
  217. }
  218. }
  219. }
  220. }
  221. }
  222.  
  223. // Shortcut type for a function that takes a message and returns void
  224. private type HandlerFunc = (HornetMessage) => Unit
  225.  
  226. private def createConsumer(session: Session, queue: String, handler: HandlerFunc): Consumer = {
  227. val consumer = session.createConsumer(queue)
  228. consumer.setMessageHandler(new Handler(session, handler))
  229. return consumer
  230. }
  231.  
  232. private def createProducer(s: Session, a: String) =
  233. s.createProducer(a)
  234.  
  235.  
  236. private def swallow(function: => Unit) = {
  237. try {
  238. function
  239. }
  240.  
  241. catch {
  242. case (th: Throwable) => ; // ignore
  243. }
  244. }
  245.  
  246. private
  247. def cleanFactory(f: Option[Factory]) = f match {
  248. case Some(x) => swallow { x.close() }
  249. case None => ;
  250. }
  251.  
  252.  
  253. private
  254. def cleanSession(s: Option[Session]) = s match {
  255. case Some(x) => swallow { x.close() }
  256. case None => ;
  257. }
  258.  
  259. private
  260. def cleanProducer(p: Option[Producer]) = p match {
  261. case Some(x) => swallow { x.close() }
  262. case None => ;
  263. }
  264.  
  265. private
  266. def cleanConsumer(c: Option[Consumer]) = c match {
  267. case Some(x) => swallow { x.close() }
  268. case None => ;
  269. }
  270.  
  271. override def toString(): String = "<HornetQ>"
  272.  
  273. def getConsumerConnection(settings: Settings, address: String,
  274. queue: String, handler: HandlerFunc): ConsumerConnection = {
  275.  
  276. println(this + " getting consumer connection")
  277.  
  278. var factory: Option[Factory] = None
  279. var session: Option[Session] = None
  280. var consumer: Option[Consumer] = None
  281. var conn: Option[ConsumerConnection] = None
  282.  
  283. try {
  284. factory = Some(getFactory(settings))
  285. session = Some(getSession(settings, factory.get))
  286.  
  287. // TODO: Have a way to indicate what kind of queue to create
  288. createTempQueue(session.get, address, queue)
  289.  
  290. consumer = Some(createConsumer(session.get, queue, handler))
  291.  
  292. conn = Some(new ConsumerConnection(factory.get, session.get, consumer.get))
  293. }
  294.  
  295. catch {
  296. case (th: Throwable) => {
  297.  
  298. println("warn: Unable to connect: " + th.getMessage())
  299. // Gotta close the resources, or the underlying
  300. // HornetQ objects will leak.
  301. cleanFactory(factory)
  302. cleanSession(session)
  303. cleanConsumer(consumer)
  304. throw th
  305. }
  306. }
  307.  
  308. return conn.get
  309.  
  310. }
  311.  
  312. def getProducerConnection(settings: Settings, address: String): ProducerConnection = {
  313. println(this + " getting producer connection")
  314.  
  315. var conn: Option[ProducerConnection] = None
  316. var factory: Option[Factory] = None
  317. var session: Option[Session] = None
  318. var producer: Option[Producer] = None
  319.  
  320. try {
  321. factory = Some(getFactory(settings))
  322. session = Some(getSession(settings, factory.get))
  323. producer = Some(createProducer(session.get, address))
  324.  
  325. val p = new ProducerConnection(factory.get, session.get, producer.get)
  326. conn = Some(p)
  327. }
  328.  
  329. catch {
  330. case (th: Throwable) => {
  331. println(this + " warn: Unable to producer.connect: " + th.getMessage())
  332. cleanFactory(factory)
  333. cleanSession(session)
  334. cleanProducer(producer)
  335. throw th
  336. }
  337. }
  338.  
  339. return conn.get
  340. }
  341. }
  342.  
  343. // ----------------------------------------------------------------------
  344.  
  345. class Consumer(
  346. val settings: Settings,
  347. val address: String,
  348. val queue: String,
  349. handler: (HornetMessage) => Unit)
  350. {
  351. var conn: Option[ConsumerConnection] = None
  352.  
  353. override def toString(): String = {
  354. return "<consumer:" + address + ">"
  355. }
  356.  
  357. private def connect(): Unit = {
  358. // Keep trying to get a good connection, no matter what.
  359.  
  360. while (conn == None) {
  361. try {
  362. conn = Some(HornetQ.getConsumerConnection(settings, address, queue, handler))
  363. conn.get.start()
  364.  
  365. println(this + " we got a connection")
  366. }
  367.  
  368. catch {
  369. case (th: Throwable) => {
  370. println(this + " " + th.toString)
  371. println(this + " waiting 2000 millis")
  372. Thread.sleep(2000)
  373. if (conn != None) {
  374. conn.get.stop() // Will clean up resources even if not running.
  375. conn = None
  376. }
  377. }
  378. }
  379. }
  380.  
  381. }
  382.  
  383. def startUp(): Unit = {
  384. println ("starting " + this)
  385. connect()
  386. }
  387.  
  388. def shutDown(): Unit = {
  389. println ("stopping " + this)
  390. conn match {
  391. case None => ;
  392. case Some(c) => c.stop()
  393. }
  394. }
  395.  
  396. }
  397.  
  398. // ----------------------------------------------------------------------
  399.  
  400. class Producer(val settings: Settings, val address: String) {
  401.  
  402. var conn: Option[ProducerConnection] = None
  403.  
  404. override def toString(): String = {
  405. return "<producer:" + address + ">"
  406. }
  407.  
  408. private def connect(): Unit = {
  409. // Keep trying to get a good connection, no matter what.
  410.  
  411. while (conn == None) {
  412. println (this + " attempting to connect")
  413. try {
  414. conn = Some(HornetQ.getProducerConnection(settings, address))
  415. conn.get.start()
  416. println(this + " we got a connection")
  417. }
  418.  
  419. catch {
  420. case (th: Throwable) => {
  421. println(this + " " + th.toString)
  422. if (conn != None)
  423. conn.get.stop() // Will clean up resources even if not running.
  424. println(this + " waiting 2000 millis")
  425. Thread.sleep(2000)
  426. }
  427. }
  428. }
  429.  
  430. }
  431.  
  432. def send(message: String): Unit = {
  433. send(Map.empty, message)
  434. }
  435.  
  436. def send(properties: Map[String, String], message: String): Unit = {
  437. var retry = true
  438.  
  439. while (retry) {
  440. try {
  441. conn match {
  442. case None =>
  443. throw new Exception("Producer not started or connection down.")
  444. case Some(c) =>
  445. c.send(HornetMessage(properties, message)) ;
  446. // println("message successfully sent") ;
  447. retry = false
  448. }
  449. }
  450.  
  451. catch {
  452. case (th: Throwable) => {
  453. println(this + " " + th.toString)
  454. if (conn != None)
  455. conn.get.stop() // Will clean up resources even if not running.
  456. println(this + " trying to reconnect after a failed send")
  457. conn = None
  458. connect()
  459. }
  460. }
  461. }
  462. }
  463.  
  464. def startUp(): Unit = {
  465. println("starting " + this)
  466. connect()
  467. }
  468.  
  469. def shutDown(): Unit = {
  470. conn match {
  471. case None => ;
  472. case Some(c) => c.stop()
  473. }
  474. }
  475. }
  476.  
  477. }
Add Comment
Please, Sign In to add comment