Transform Stream in Deno
by mmyoji
1 min read
I wrote Node.js version of this: see.
Streams APIs are
bit different from node:stream
.
Code
interface Post {
id: number;
title: string;
}
const posts: Post[] = [
{ id: 1, title: "a" },
{ id: 2, title: "b" },
{ id: 3, title: "c" },
{ id: 4, title: "d" },
{ id: 5, title: "e" },
{ id: 6, title: "f" },
{ id: 7, title: "g" },
{ id: 8, title: "h" },
{ id: 9, title: "i" },
{ id: 10, title: "j" },
];
// Dummy ORM API
const db = {
post: {
findMany({ take, skip }: { take: number; skip: number }): Promise<Post[]> {
return Promise.resolve(posts.slice(skip, skip + take));
},
},
};
function postStream(take: number): ReadableStream<Post> {
let skip = 0;
return new ReadableStream<Post>(
{
async pull(controller) {
const posts = await db.post.findMany({
skip,
take,
});
for (const post of posts) {
controller.enqueue(post);
}
if (posts.length < take) {
controller.close();
return;
}
skip = skip + take;
},
},
{ highWaterMark: take },
);
}
function toCSV(take: number): TransformStream<Post, string> {
return new TransformStream<Post, string>(
{
start(controller) {
controller.enqueue(`id,title`);
},
transform(chunk, controller) {
controller.enqueue(`${chunk.id},${chunk.title}`);
},
flush(controller) {
controller.terminate();
},
},
{
highWaterMark: take,
},
);
}
const take = 3;
const stream = postStream(take).pipeThrough(toCSV(take));
for await (const chunk of stream) {
console.log(chunk);
}