Guest User

Untitled

a guest
Oct 3rd, 2018
324
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.80 KB | None | 0 0
  1. const fs = require('fs')
  2. const ip = require('ip')
  3. const cluster = require('cluster')
  4.  
  5. const { Kafka, logLevel } = require('../index')
  6.  
  7. const errorTypes = ['unhandledRejection', 'uncaughtException']
  8. const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
  9. const host = process.env.HOST_IP || ip.address()
  10.  
  11. const kafka = new Kafka({
  12. logLevel: logLevel.INFO,
  13. brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
  14. clientId: 'example-consumer',
  15. ssl: {
  16. servername: 'localhost',
  17. cert: fs.readFileSync('./testHelpers/certs/client_cert.pem', 'utf-8'),
  18. key: fs.readFileSync('./testHelpers/certs/client_key.pem', 'utf-8'),
  19. ca: [fs.readFileSync('./testHelpers/certs/ca_cert.pem', 'utf-8')],
  20. },
  21. sasl: {
  22. mechanism: 'plain',
  23. username: 'test',
  24. password: 'testtest',
  25. },
  26. })
  27.  
  28. const topic = 'topic-test'
  29.  
  30. const run = async () => {
  31. const consumer = kafka.consumer({ groupId: 'test-group' })
  32. await consumer.connect()
  33. await consumer.subscribe({ topic })
  34. consumer.run({
  35. // eachBatch: async ({ batch }) => {
  36. // console.log(batch)
  37. // },
  38. eachMessage: async ({ topic, partition, message }) => {
  39. const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
  40. console.log(`- ${prefix} ${message.key}#${message.value}`)
  41. },
  42. })
  43.  
  44. return consumer
  45. }
  46.  
  47. const numWorkers = 6
  48.  
  49. if (cluster.isMaster) {
  50. console.log(`Master ${process.pid} is running`)
  51.  
  52. for (var i = 0; i < numWorkers; i++) {
  53. cluster.fork()
  54. }
  55.  
  56. cluster.on('exit', worker => {
  57. console.log(`Worker ${worker.process.pid} died`)
  58. })
  59.  
  60. errorTypes.map(type => {
  61. process.on(type, e => {
  62. try {
  63. console.log(`process.on ${type}`)
  64. console.error(e)
  65. cluster.disconnect(() => {
  66. console.log('disconnected on error')
  67. process.exit(0)
  68. })
  69. } catch (_) {
  70. process.exit(1)
  71. }
  72. })
  73. })
  74.  
  75. signalTraps.map(type => {
  76. process.once(type, () => {
  77. return new Promise((resolve, reject) => {
  78. try {
  79. cluster.disconnect(() => {
  80. console.log('disconnected')
  81. resolve()
  82. })
  83. } catch (e) {
  84. // process.kill(process.pid, type)
  85. reject(new Error(type))
  86. }
  87. })
  88. })
  89. })
  90. } else {
  91. console.log(`Worker ${process.pid} is running`)
  92. const consumer = run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
  93.  
  94. errorTypes.map(type => {
  95. process.on(type, async e => {
  96. try {
  97. console.log(`process.on ${type}`)
  98. console.error(e)
  99. await consumer.disconnect()
  100. process.exit(0)
  101. } catch (_) {
  102. process.exit(1)
  103. }
  104. })
  105. })
  106.  
  107. signalTraps.map(type => {
  108. process.once(type, async () => {
  109. try {
  110. await consumer.disconnect()
  111. } finally {
  112. process.kill(process.pid, type)
  113. }
  114. })
  115. })
  116. }
Add Comment
Please, Sign In to add comment