Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- const content: Readable = getFileFromS3()
- async *processReadableParser(iterable: Readable) {
- let events = []
- let error
- const saxesParser = new WritableStream(
- {
- onopentag(name, attributes) {
- events.push({ eventType: 'opentag', value: { name, attributes } })
- },
- onclosetag(name) {
- events.push({ eventType: 'closetag', value: { name } })
- },
- ontext(text) {
- events.push({ eventType: 'text', value: text })
- },
- onend() {
- events.push({ eventType: 'end' })
- },
- },
- { xmlMode: true },
- )
- for await (const chunk of iterable) {
- saxesParser.write(chunk)
- if (error) {
- throw error
- }
- yield events
- events = []
- }
- }
- // process this stream
- for await (const events of this.processReadableParser(content)) {
- for (const event of events) {
- const node = event.value
- if (event.eventType === 'opentag') {
- // construct object when opentag
- } else if (event.eventType === 'closetag') {
- if (shouldWriteToKafka) {
- buffer.push(data)
- if (buffer.length >= BUFFER_MAX_SIZE) {
- await handler(buffer)
- buffer = []
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement