Transform Stream in Deno

by mmyoji

1 min read

I wrote Node.js version of this: see.

Streams APIs are bit different from node:stream.

Code

interface Post {
  id: number;
  title: string;
}

const posts: Post[] = [
  { id: 1, title: "a" },
  { id: 2, title: "b" },
  { id: 3, title: "c" },
  { id: 4, title: "d" },
  { id: 5, title: "e" },
  { id: 6, title: "f" },
  { id: 7, title: "g" },
  { id: 8, title: "h" },
  { id: 9, title: "i" },
  { id: 10, title: "j" },
];

// Dummy ORM API
const db = {
  post: {
    findMany({ take, skip }: { take: number; skip: number }): Promise<Post[]> {
      return Promise.resolve(posts.slice(skip, skip + take));
    },
  },
};

function postStream(take: number): ReadableStream<Post> {
  let skip = 0;

  return new ReadableStream<Post>(
    {
      async pull(controller) {
        const posts = await db.post.findMany({
          skip,
          take,
        });
        for (const post of posts) {
          controller.enqueue(post);
        }

        if (posts.length < take) {
          controller.close();
          return;
        }

        skip = skip + take;
      },
    },
    { highWaterMark: take },
  );
}

function toCSV(take: number): TransformStream<Post, string> {
  return new TransformStream<Post, string>(
    {
      start(controller) {
        controller.enqueue(`id,title`);
      },
      transform(chunk, controller) {
        controller.enqueue(`${chunk.id},${chunk.title}`);
      },
      flush(controller) {
        controller.terminate();
      },
    },
    {
      highWaterMark: take,
    },
  );
}

const take = 3;
const stream = postStream(take).pipeThrough(toCSV(take));
for await (const chunk of stream) {
  console.log(chunk);
}

References