Transform Stream in Node.js

by mmyoji

1 min read

Example

import { Readable, Transform, type TransformCallback } from "node:stream";
import { type Post, PrismaClient } from "@prisma/client";

const prisma = new PrismaClient();

function postsStream(take: number): Readable {
  let id: number = 1;

  return new Readable({
    objectMode: true,
    highWaterMark: take,
    async read() {
      const posts = await prisma.post.findMany({
        skip: id === 1 ? 0 : 1,
        take,
        cursor: {
          id,
        },
        orderBy: {
          id: "asc",
        },
      });

      for (const post of posts) {
        this.push(post);
      }

      if (posts.length < take) {
        this.push(null);
        return;
      }

      id = posts[posts.length - 1].id;
    },
  });
}

function toCSV(take: number): Transform {
  return new Transform({
    writableObjectMode: true,
    writableHighWaterMark: take,
    transform(
      chunk: Post,
      _encoding: BufferEncoding,
      callback: TransformCallback,
    ) {
      this.push(`${chunk.id},${chunk.title}\n`);

      callback();
    },
  });
}

const take = 100;

postsStream(take)
  .pipe(toCSV(take))
  .pipe(process.stdout);