Advertisement
Guest User

Untitled

a guest
Mar 20th, 2024
39
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. const content: Readable = getFileFromS3()
  2.  
  3. async *processReadableParser(iterable: Readable) {
  4.     let events = []
  5.     let error
  6.  
  7.     const saxesParser = new WritableStream(
  8.         {
  9.             onopentag(name, attributes) {
  10.                 events.push({ eventType: 'opentag', value: { name, attributes } })
  11.             },
  12.             onclosetag(name) {
  13.                 events.push({ eventType: 'closetag', value: { name } })
  14.             },
  15.             ontext(text) {
  16.                 events.push({ eventType: 'text', value: text })
  17.             },
  18.             onend() {
  19.                 events.push({ eventType: 'end' })
  20.             },
  21.         },
  22.         { xmlMode: true },
  23.     )
  24.  
  25.     for await (const chunk of iterable) {
  26.         saxesParser.write(chunk)
  27.         if (error) {
  28.             throw error
  29.         }
  30.         yield events
  31.         events = []
  32.     }
  33. }
  34.  
  35. // process this stream
  36. for await (const events of this.processReadableParser(content)) {
  37.     for (const event of events) {
  38.         const node = event.value
  39.         if (event.eventType === 'opentag') {
  40.             // construct object when opentag
  41.         } else if (event.eventType === 'closetag') {
  42.             if (shouldWriteToKafka) {
  43.                 buffer.push(data)
  44.  
  45.                 if (buffer.length >= BUFFER_MAX_SIZE) {
  46.                     await handler(buffer)
  47.                     buffer = []
  48.                 }
  49.             }
  50.         }
  51.     }
  52. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement