Guest User

Untitled

a guest
Apr 22nd, 2018
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.38 KB | None | 0 0
  1. class ZMQSubscriber[T <: Transaction, B <: Block](
  2. socket: InetSocketAddress,
  3. hashTxListener: Option[HashDigest => Future[Unit]],
  4. hashBlockListener: Option[HashDigest => Future[Unit]],
  5. rawTxListener: Option[Transaction => Future[Unit]],
  6. rawBlockListener: Option[Block => Future[Unit]]) {
  7. private val logger = BitcoinSLogger.logger
  8.  
  9. def begin()(implicit ec: ExecutionContext) = {
  10. val context = ZMQ.context(1)
  11.  
  12. // First, connect our subscriber socket
  13. val subscriber = context.socket(ZMQ.SUB)
  14. val uri = socket.getHostString + ":" + socket.getPort
  15.  
  16. //subscribe to the appropriate feed
  17. hashTxListener.map { _ =>
  18. subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
  19. logger.debug("subscribed to the transaction hashes from zmq")
  20. }
  21.  
  22. rawTxListener.map { _ =>
  23. subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
  24. logger.debug("subscribed to raw transactions from zmq")
  25. }
  26.  
  27. hashBlockListener.map { _ =>
  28. subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
  29. logger.debug("subscribed to the hashblock stream from zmq")
  30. }
  31.  
  32. rawBlockListener.map { _ =>
  33. subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
  34. logger.debug("subscribed to raw block")
  35. }
  36.  
  37. subscriber.connect(uri)
  38. subscriber.setRcvHWM(0)
  39. logger.info("Connection to zmq client successful")
  40.  
  41. while (true) {
  42. val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
  43. val body = subscriber.recv(ZMQ.DONTWAIT)
  44. Future(processMsg(notificationTypeStr, body))
  45. }
  46. }
  47.  
  48. private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = Future {
  49.  
  50. val notification = ZMQNotification.fromString(topic)
  51. val res: Option[Future[Unit]] = notification.flatMap {
  52. case HashTx =>
  53. hashTxListener.map { f =>
  54. val hash = Future(DoubleSha256Digest.fromBytes(body))
  55. hash.flatMap(f(_))
  56. }
  57. case RawTx =>
  58. rawTxListener.map { f =>
  59. val tx = Future(Transaction.fromBytes(body))
  60. tx.flatMap(f(_))
  61. }
  62. case HashBlock =>
  63. hashBlockListener.map { f =>
  64. val hash = Future(DoubleSha256Digest.fromBytes(body))
  65. hash.flatMap(f(_))
  66. }
  67. case RawBlock =>
  68. rawBlockListener.map { f =>
  69. val block = Future(Block.fromBytes(body))
  70. block.flatMap(f(_))
  71. }
  72. }
  73. }
  74. }
Add Comment
Please, Sign In to add comment