Transform Stream in Node.js
by mmyoji
1 min read
Example
import { Readable, Transform, type TransformCallback } from "node:stream";
import { type Post, PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
function postsStream(take: number): Readable {
let id: number = 1;
return new Readable({
objectMode: true,
highWaterMark: take,
async read() {
const posts = await prisma.post.findMany({
skip: id === 1 ? 0 : 1,
take,
cursor: {
id,
},
orderBy: {
id: "asc",
},
});
for (const post of posts) {
this.push(post);
}
if (posts.length < take) {
this.push(null);
return;
}
id = posts[posts.length - 1].id;
},
});
}
function toCSV(take: number): Transform {
return new Transform({
writableObjectMode: true,
writableHighWaterMark: take,
transform(
chunk: Post,
_encoding: BufferEncoding,
callback: TransformCallback,
) {
this.push(`${chunk.id},${chunk.title}\n`);
callback();
},
});
}
const take = 100;
postsStream(take)
.pipe(toCSV(take))
.pipe(process.stdout);