async stream in Node.js

Situation

I had to pass a stream of CSV contents, but generating the content needed to use async functions. I’d struggled with the stream API and left the solution that I’ve found for now.

Premises

This is a sample async function that get data.

const fetchData = async () => {
  return [
    { id: 1, name: 'book', price: 10 },
    { id: 2, name: 'coke', price: 5 },
    { id: 3, name: 'milk', price: 3 },
  ]
}

Node.js v10.x

'use strict'

const { Readable } = require('stream')

class CSVReadableStream extends Readable {
  async _read(size) {
    try {
      const data = await fetchData()
      for (const row of data) {
        this.push(row.name)
      }
      this.push(null)
    } catch (err) {
      this.emit('error', err)
    }
  }
}

const stream = new CSVReadableStream()

stream.on('data', chunk => {
  console.log(chunk.toString());
})

or use Readable directly:

'use strict'

const { Readable } = require('stream')

const stream = new Readable({
  async read(size) {
    try {
      const data = await fetchData()
      for (const row of data) {
        this.push(row.name)
      }
      this.push(null)
    } catch (err) {
      this.emit('error', err)
    }
  }
})

stream.on('data', chunk => {
  console.log(chunk.toString());
})

Node.js v12.x

You can use stream.Readable.from in this version.

'use strict'

const { Readable } = require('stream')

async function* iterable() {
  const data = await fetchData()
  for (const row of data) {
    yield row.name
  }
}

const stream = Readable.from(iterable())

stream.on('data', chunk => {
  console.log(chunk.toString());
})

I prefer this implementation because looks simple :)

Contents