Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- const fs = require('fs')
- const ip = require('ip')
- const cluster = require('cluster')
- const { Kafka, logLevel } = require('../index')
- const errorTypes = ['unhandledRejection', 'uncaughtException']
- const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
- const host = process.env.HOST_IP || ip.address()
- const kafka = new Kafka({
- logLevel: logLevel.INFO,
- brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
- clientId: 'example-consumer',
- ssl: {
- servername: 'localhost',
- cert: fs.readFileSync('./testHelpers/certs/client_cert.pem', 'utf-8'),
- key: fs.readFileSync('./testHelpers/certs/client_key.pem', 'utf-8'),
- ca: [fs.readFileSync('./testHelpers/certs/ca_cert.pem', 'utf-8')],
- },
- sasl: {
- mechanism: 'plain',
- username: 'test',
- password: 'testtest',
- },
- })
- const topic = 'topic-test'
- const run = async () => {
- const consumer = kafka.consumer({ groupId: 'test-group' })
- await consumer.connect()
- await consumer.subscribe({ topic })
- consumer.run({
- // eachBatch: async ({ batch }) => {
- // console.log(batch)
- // },
- eachMessage: async ({ topic, partition, message }) => {
- const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
- console.log(`- ${prefix} ${message.key}#${message.value}`)
- },
- })
- return consumer
- }
- const numWorkers = 6
- if (cluster.isMaster) {
- console.log(`Master ${process.pid} is running`)
- for (var i = 0; i < numWorkers; i++) {
- cluster.fork()
- }
- cluster.on('exit', worker => {
- console.log(`Worker ${worker.process.pid} died`)
- })
- errorTypes.map(type => {
- process.on(type, e => {
- try {
- console.log(`process.on ${type}`)
- console.error(e)
- cluster.disconnect(() => {
- console.log('disconnected on error')
- process.exit(0)
- })
- } catch (_) {
- process.exit(1)
- }
- })
- })
- signalTraps.map(type => {
- process.once(type, () => {
- return new Promise((resolve, reject) => {
- try {
- cluster.disconnect(() => {
- console.log('disconnected')
- resolve()
- })
- } catch (e) {
- // process.kill(process.pid, type)
- reject(new Error(type))
- }
- })
- })
- })
- } else {
- console.log(`Worker ${process.pid} is running`)
- const consumer = run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
- errorTypes.map(type => {
- process.on(type, async e => {
- try {
- console.log(`process.on ${type}`)
- console.error(e)
- await consumer.disconnect()
- process.exit(0)
- } catch (_) {
- process.exit(1)
- }
- })
- })
- signalTraps.map(type => {
- process.once(type, async () => {
- try {
- await consumer.disconnect()
- } finally {
- process.kill(process.pid, type)
- }
- })
- })
- }
Add Comment
Please, Sign In to add comment