Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- function consoleLoggerProvider (name) {
- return {
- debug: console.debug.bind(console),
- info: console.info.bind(console),
- warn: console.warn.bind(console),
- error: console.error.bind(console)
- };
- }
- const kafkaLogging = require('kafka-node/logging');
- kafkaLogging.setLoggerProvider(consoleLoggerProvider);
- const kafka = require('kafka-node');
- const ConsumerGroup = kafka.ConsumerGroup;
- const fs = require('fs');
- let sslOptions = {
- key: fs.readFileSync('service.key'),
- cert: fs.readFileSync('service.cert'),
- ca: [ fs.readFileSync('ca.pem') ]
- };
- let options = {
- autoCommit: true,
- kafkaHost: 'my-thingy.aivencloud.com:15957',
- groupId: 'my-testing-group',
- protocol: ['roundrobin'],
- fromOffset: 'earliest',
- ssl: true,
- sslOptions: sslOptions,
- id: "my-id",
- debug: "all"
- };
- let consumerGroup = new ConsumerGroup(options, ['my.topic']);
- consumerGroup.on('message', (msg) => {
- console.log(msg);
- });
- consumerGroup.on('error', (error) => {
- console.error(error);
- });
- consumerGroup.on('connect', (connect) => {
- console.log("connect");
- });
- consumerGroup.on('offsetOutOfRange', (connect) => {
- console.log("offsetOutOfRange");
- });
Add Comment
Please, Sign In to add comment