Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class ZMQSubscriber[T <: Transaction, B <: Block](
- socket: InetSocketAddress,
- hashTxListener: Option[HashDigest => Future[Unit]],
- hashBlockListener: Option[HashDigest => Future[Unit]],
- rawTxListener: Option[Transaction => Future[Unit]],
- rawBlockListener: Option[Block => Future[Unit]]) {
- private val logger = BitcoinSLogger.logger
- def begin()(implicit ec: ExecutionContext) = {
- val context = ZMQ.context(1)
- // First, connect our subscriber socket
- val subscriber = context.socket(ZMQ.SUB)
- val uri = socket.getHostString + ":" + socket.getPort
- //subscribe to the appropriate feed
- hashTxListener.map { _ =>
- subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
- logger.debug("subscribed to the transaction hashes from zmq")
- }
- rawTxListener.map { _ =>
- subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
- logger.debug("subscribed to raw transactions from zmq")
- }
- hashBlockListener.map { _ =>
- subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
- logger.debug("subscribed to the hashblock stream from zmq")
- }
- rawBlockListener.map { _ =>
- subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
- logger.debug("subscribed to raw block")
- }
- subscriber.connect(uri)
- subscriber.setRcvHWM(0)
- logger.info("Connection to zmq client successful")
- while (true) {
- val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
- val body = subscriber.recv(ZMQ.DONTWAIT)
- Future(processMsg(notificationTypeStr, body))
- }
- }
- private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = Future {
- val notification = ZMQNotification.fromString(topic)
- val res: Option[Future[Unit]] = notification.flatMap {
- case HashTx =>
- hashTxListener.map { f =>
- val hash = Future(DoubleSha256Digest.fromBytes(body))
- hash.flatMap(f(_))
- }
- case RawTx =>
- rawTxListener.map { f =>
- val tx = Future(Transaction.fromBytes(body))
- tx.flatMap(f(_))
- }
- case HashBlock =>
- hashBlockListener.map { f =>
- val hash = Future(DoubleSha256Digest.fromBytes(body))
- hash.flatMap(f(_))
- }
- case RawBlock =>
- rawBlockListener.map { f =>
- val block = Future(Block.fromBytes(body))
- block.flatMap(f(_))
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment