Advertisement
aldikhan13

KafkaJs Custom Method For Kafka

May 21st, 2022 (edited)
670
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /**
  2.  about all configuration check documentation -> https://kafka.js.org
  3. **/
  4.  
  5. import {
  6.   Consumer,
  7.   ConsumerConfig,
  8.   ConsumerSubscribeTopics,
  9.   EachBatchHandler,
  10.   EachMessagePayload,
  11.   Kafka as KafkaJs,
  12.   KafkaConfig,
  13.   Producer,
  14.   ProducerBatch,
  15.   ProducerConfig,
  16.   ProducerRecord,
  17.   Transaction
  18. } from 'kafkajs'
  19.  
  20. interface ConsumerRunConfig {
  21.   autoCommit?: boolean
  22.   autoCommitInterval?: number | null
  23.   autoCommitThreshold?: number | null
  24.   eachBatchAutoResolve?: boolean
  25.   partitionsConsumedConcurrently?: number
  26.   eachBatch?: EachBatchHandler
  27. }
  28.  
  29. interface SubscriberPayload {
  30.   subscribeConfig: ConsumerSubscribeTopics
  31.   consumerConfig: ConsumerConfig
  32.   runConfig: ConsumerRunConfig
  33. }
  34.  
  35. interface PublisherPayload {
  36.   type: 'single' | 'multiple'
  37.   sendConfig: ProducerRecord | ProducerBatch
  38.   producerConfig?: ProducerConfig
  39. }
  40.  
  41. interface PublisherTransactionPayload {
  42.   type: 'single' | 'multiple'
  43.   sendConfig: ProducerRecord | ProducerBatch
  44.   producerConfig?: ProducerConfig
  45. }
  46.  
  47. class Kafka {
  48.   private config: KafkaConfig
  49.   private kafka: InstanceType<typeof KafkaJs>
  50.   private producer: Producer
  51.   private consumer: Consumer
  52.   private transaction: Transaction
  53.  
  54.   constructor(config: KafkaConfig) {
  55.     this.config = config
  56.     this.kafka = new KafkaJs(this.config)
  57.   }
  58.  
  59.   async publisher(options: PublisherPayload): Promise<void> {
  60.     try {
  61.       this.producer = this.kafka.producer(options.producerConfig || {})
  62.       await this.notification('publisher', this.producer)
  63.       await this.producer.connect()
  64.  
  65.       options.type == 'single'
  66.         ? await this.producer.send(options.sendConfig as ProducerRecord)
  67.         : await this.producer.sendBatch(options.sendConfig as ProducerBatch)
  68.     } catch (e: any) {
  69.       console.error(`publisher is not working: ${e}`)
  70.     }
  71.   }
  72.  
  73.   async publisherTransaction(options: PublisherTransactionPayload): Promise<void> {
  74.     try {
  75.       this.producer = this.kafka.producer(options.producerConfig || {})
  76.       this.transaction = await this.producer.transaction()
  77.       try {
  78.         await this.notification('publisher', this.producer)
  79.         await this.producer.connect()
  80.  
  81.         options.type == 'single'
  82.           ? await this.producer.send(options.sendConfig as ProducerRecord)
  83.           : await this.producer.sendBatch(options.sendConfig as ProducerBatch)
  84.  
  85.         await this.transaction.commit()
  86.       } catch (e: any) {
  87.         if (this.transaction.isActive()) this.transaction.abort()
  88.         console.error(`publisher transaction is not working: ${e}`)
  89.       }
  90.     } catch (e: any) {
  91.       console.error(`publisher transaction is not working: ${e}`)
  92.     }
  93.   }
  94.  
  95.   async subscriber(options: SubscriberPayload, cb: (payload: EachMessagePayload) => Promise<void>): Promise<void> {
  96.     try {
  97.       this.consumer = this.kafka.consumer(options.consumerConfig)
  98.       this.notification('subscriber', this.consumer)
  99.  
  100.       await this.consumer.connect()
  101.       await this.consumer.subscribe(options.subscribeConfig)
  102.       await this.consumer.run({ ...(options.runConfig || {}), eachMessage: cb })
  103.     } catch (e: any) {
  104.       console.error(`subscriber is not working: ${e}`)
  105.     }
  106.   }
  107.  
  108.   private async notification(type: string, handler: Producer | Consumer): Promise<void> {
  109.     try {
  110.       if (type == 'subscriber') {
  111.         this.consumer = handler as Consumer
  112.         await this.consumer.on('consumer.connect', () => console.info('consumer kafka connected'))
  113.         await this.consumer.on('consumer.network.request_timeout', () => console.error('consumer kafka network timeout'))
  114.         await this.consumer.on('consumer.crash', async (): Promise<void> => {
  115.           await this.consumer.disconnect()
  116.           console.error('consumer kafka crash')
  117.         })
  118.         await this.consumer.on('consumer.disconnect', async (): Promise<void> => {
  119.           await this.consumer.disconnect()
  120.           console.error('consumer kafka disconnect')
  121.         })
  122.         await this.consumer.on('consumer.stop', async (): Promise<void> => {
  123.           await this.consumer.stop()
  124.           console.error('consumer kafka disconnect')
  125.         })
  126.       }
  127.  
  128.       if (type == 'publisher') {
  129.         this.producer = handler as Producer
  130.         await this.producer.on('producer.connect', (): void => console.info('producer kafka connected'))
  131.         await this.producer.on('producer.network.request_timeout', (): void => console.error('producer kafka network timeout'))
  132.         await this.producer.on('producer.disconnect', async (): Promise<void> => {
  133.           await this.producer.disconnect()
  134.           console.error('producer kafka disconnect')
  135.         })
  136.       }
  137.     } catch (e: any) {
  138.       console.error(`notification is not working: ${e}`)
  139.     }
  140.   }
  141. }
  142.  
  143. // kafka instance here
  144. let kafka: InstanceType<typeof Kafka> = new Kafka({
  145.    brokers: ['localhost:9092'],
  146.    clientId: 'kafka:broker',
  147.    ssl: false
  148. })
  149.  
  150. // publisher demo here
  151. ;(async () => {
  152.   await kafka.publisher({
  153.     type: 'single',
  154.     sendConfig: {
  155.       topic: 'send:message',
  156.       messages: [{ key: 'msg', value: 'Hello my name is restu wahyu saputra' }],
  157.       acks: 0,
  158.       compression: zlib.constants.Z_BEST_COMPRESSION
  159.     },
  160.     producerConfig: { allowAutoTopicCreation: false }
  161.   })
  162. })()
  163.  
  164. // subscriber demo here
  165. ;(async () => {
  166.   await kafka.subscriber({
  167.     subscribeConfig: { topics: ['send:message'], fromBeginning: true },
  168.     consumerConfig: { groupId: 'kafka:group' },
  169.     runConfig: { autoCommit: true } },async (payload: EachMessagePayload): Promise<void> => console.log(payload.message)
  170.   )
  171. }()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement