Guest User

blockEventListener.js

a guest
Oct 16th, 2020
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. "use strict"
  2.  
  3. const { Wallets, Gateway } = require("fabric-network")
  4. const fs = require("fs")
  5. const path = require("path")
  6. const net = require("net")
  7.  
  8. const blockProcessing = require("./blockProcessing.js")
  9.  
  10. const config = require("./config.json")
  11. const channelid = config.channelid
  12. const sdkUser = config.sdk_user
  13. const sdkAddress = config.sdk_address
  14. const sdkPort = config.sdk_port
  15. const healthPort = config.healthcheck_port
  16.  
  17. class BlockMap {
  18.     constructor() {
  19.         this.list = []
  20.     }
  21.     get(channel, key) {
  22.         key = parseInt(key, 10).toString()
  23.         return this.list[`${channel}${key}`]
  24.     }
  25.     set(channel, key, value) {
  26.         this.list[`${channel}${key}`] = value
  27.     }
  28.     remove(channel, key) {
  29.         key = parseInt(key, 10).toString()
  30.         delete this.list[`${channel}${key}`]
  31.     }
  32. }
  33.  
  34. let ProcessingMap = new BlockMap()
  35.  
  36. async function main() {
  37.     try {
  38.         const http = require('http');
  39.         const server = http.createServer((req, res)=>{
  40.             console.log(req.url, req.method, req. headers);
  41.             res.write('<html>');
  42.             res.write('<head><title>HLF Event Listener </title></head>');
  43.             res.write(' <body>HLF Event Listener Service is Up!</body>');
  44.             res.write('</html>');
  45.             res.end();
  46.         });
  47.         server.listen(healthPort);
  48.  
  49.         const walletPath = path.join(process.cwd(), "wallet")
  50.         const wallet = await Wallets.newFileSystemWallet(walletPath)
  51.         console.log(`Wallet path: ${walletPath}`)
  52.  
  53.         const userExists = await wallet.get(sdkUser)
  54.         if (!userExists) {
  55.             console.log(`An identity for the user "${sdkUser}" does not exist in the wallet`)
  56.             console.log("Run the enrollUser.js application before retrying")
  57.             return
  58.         }
  59.  
  60.         const ccpPath = path.resolve("connection-mtr.json")
  61.         const ccp = JSON.parse(fs.readFileSync(ccpPath, "utf8"))
  62.         const gateway = new Gateway()
  63.         await gateway.connect(ccp, { wallet, identity: sdkUser, discovery: { enabled: false, asLocalhost: false } })
  64.  
  65.         let network, listener, configPath
  66.         for (var i=0; i<channelid.length; i++) {
  67.             let cid = channelid[i]
  68.             let nextBlock = 1
  69.             configPath = path.resolve(__dirname, `${cid}.nextBlock`)
  70.             if (fs.existsSync(configPath)) {
  71.                 nextBlock = fs.readFileSync(configPath, "utf8")
  72.             } else {
  73.                 fs.writeFileSync(configPath, parseInt(nextBlock, 10))
  74.             }
  75.  
  76.             console.log(`Adding listener for channel ${cid}`)
  77.             network = await gateway.getNetwork(cid)
  78.             listener = await network.addBlockListener(
  79.                 async (event) => {
  80.                     await ProcessingMap.set(cid, event.blockData.header.number, event.blockData)
  81.                     console.log(`[${cid}] Added block ${event.blockData.header.number} to ProcessingMap`)
  82.                 },
  83.                 { startBlock: parseInt(nextBlock, 10) }
  84.             )
  85.  
  86.             console.log(`[${cid}]: Listening for block events, nextblock: ${nextBlock}`)
  87.             processPendingBlocks(ProcessingMap, cid, configPath)
  88.         };
  89.     } catch (error) {
  90.         console.error(`Failed to evaluate transaction: ${error}`)
  91.         process.exit(1)
  92.     }
  93. }
  94.  
  95. async function processPendingBlocks(ProcessingMap, channelid, configPath) {
  96.     setTimeout(async () => {
  97.         let nextBlockNumber = fs.readFileSync(configPath, "utf8")
  98.         let processBlock
  99.  
  100.         do {
  101.             processBlock = ProcessingMap.get(channelid, nextBlockNumber)
  102.             if (processBlock == undefined) {
  103.                 break
  104.             }
  105.  
  106.             try {
  107.                 await blockProcessing.processBlockEvent(channelid, processBlock, sdkAddress, sdkPort, configPath)
  108.             } catch (error) {
  109.                 console.error(`Failed to process block: ${error}`)
  110.             }
  111.  
  112.             ProcessingMap.remove(channelid, nextBlockNumber)
  113.             fs.writeFileSync(configPath, parseInt(nextBlockNumber, 10) + 1)
  114.             nextBlockNumber = fs.readFileSync(configPath, "utf8")
  115.         } while (true)
  116.  
  117.         processPendingBlocks(ProcessingMap, channelid, configPath)
  118.     }, 250)
  119. }
  120.  
  121. main()
  122.  
Add Comment
Please, Sign In to add comment