巧妙的 Node.js:Node.JS中的Webstreams

巧妙的 Node.js:Node.JS中的Webstreams

概述

NodeJs 中的 Webstream 有助于为浏览器应用程序处理小尺寸的数据。网络流使我们能够以小块的形式处理大量数据。因此,Webstreams 有助于在处理数据时减少负载并消耗更少的内存。node js 中的 Webstream 有助于压缩数据、解压缩数据、编码数据、解码数据和应用视频效果。我们还可以在 nodejs 中使用 webstreams 对 HTTP 响应进行编码和解码。

介绍

想想看,你点了一个披萨。现在有两种吃披萨的方法

  • 第一:一次吃一整块披萨。
  • 第二:将其分成小块。

你会选择第二种方式,因为咀嚼和吃小块披萨既容易又舒适。同样,我们使用流来获取、处理和提供小块数据。这些小块称为

因此,我们使用流来处理大量数据块。此数据的源可以是任何类型,如文本文件、javascript 文件或服务器代码。Node js 中的 Webstreams 允许我们使用 WHATWG Streams 标准,该标准可作为 Web Streams API 使用。我们可以使用 Web Stream API 来读取数据、馈送数据和转换数据。通常,节点模块 stream/web 用于创建流并利用 Web Streams API。

注意:WHATWG 代表 Web 超文本应用技术工作组。WHATWG 是一个实施 Web 技术标准的社区。

流的类型

  • 通常,Node Js 中有三种类型的 Webstream
    • 可读流
    • 可写流
    • 转换流
  • 现在,我们将创建一个文件夹 nodejs-web-streams,转到当前文件夹并启动一个节点项目。
 
mkdir nodejs-web-streams
cd nodejs-web-streams
npm init -y
  • 最初,我们的文件夹结构如下所示。
 
nodejs-web-streams/
├─ readableStream.js
├─ writableStream.js
├─ transformStream.js
└─ package.json

可读流

可读的 Stream 就像一个可供读取的数据源。我们使用 ReadableStream 类来创建可读流的实例。ReadableStream 有两个参数:underlyingSource 和 queuingStrategy 。这两个参数都是可选的。

  • underlyingSource:一个对象,用于决定流如何操作和处理块。它具有以下方法和属性。

    • start(controller):当我们创建可读流的实例时,start 方法会立即执行。参数控制器控制添加到内部队列的数据。
    • pull(controller):此方法的工作方式与 start(controller) 方法相同。但是,每当流的内部队列变为空时,就会调用它以获取更多数据(如果可用)。
    • cancel(reason):当我们取消流时调用此方法。我们还可以提供取消流的原因。
    • type:可以是字节或未定义。type 的默认值设置为 undefined
    • autoAllocateChunkSize:它定义块的总大小以确定 AarrayBuffer 的大小。使用 autoAllocateChunkSize 时,我们必须将 type 属性设置为 bytes。
  • queuingStrategy:一个对象有助于控制流中块的流动,并通知内部队列是否已满。它包括以下属性和方法:

    • highWaterMark:它表示流可以处理的总块的最高限制。highWaterMark 的默认值设置为 1。
    • size():它计算每个块的大小,并且必须返回一个数字。控制器还使用它来计算属性 desiredSize 的值。

可读流的可访问属性和方法

  • locked:我们使用 locked 属性来了解可读流是否被锁定。
  • cancel():它使用 underlyingSource 对象的 cancel() 方法。我们使用 cancel() 方法来取消流。cancel() 方法返回错误 If a stream is locked。
  • getReader():用于获取读取器并锁定当前读取器的流。因此,在使用 releaseLock() 方法解锁流之前,我们不能使用另一个读取器。
  • tee():它创建一个可读流的副本,并返回一个由两个可读流组成的数组。
  • pipeTo():此方法用于将可读流的块传递给另一个流。 -pipeThrough():用于将可读流的块传递给其他流并转换数据。

可读流示例

1. 创建可读流

 
// file: readableStream.js
const { ReadableStream } = require("stream/web");

const readableStream = new ReadableStream(
    {
        start(controller) {
            console.log("> Getting started");
            controller.enqueue("Hello!");
            controller.enqueue("I am readbale stream");
        },
        cancel() {
            console.log("!Stop reading chunks!")
        }
    },
    {
        size(chunk) {
            return chunk.length
        }
    }
);

2. 读取可读流

 
// file: readableStream.js
...
const obtainReader = readableStream.getReader();

// read line by line
obtainReader.read().then(console.log);
obtainReader.read().then(console.log);
obtainReader.read().then(console.log);

3. 异步迭代器

 
// file: readableStream.js
...

//Read using async iterator
(async () => {
    for await (const value of readbleStream) {
        console.log(value);
    }
})();

可写流:

可写流充当可写入数据的目标。我们使用 WritableStream 类来创建可写流的实例。WritableStream 有两个参数:underlyingSink 和 queuingStrategy。这两个参数都是可选的。underlyingSink 是一个对象,用于决定可写流如何操作和处理块。它具有以下方法和属性。

-start():当我们创建可写流的实例时,start 方法会立即执行。参数控制器控制写入内部队列的数据。

  • write():当编写器准备好写入流时,将调用此方法。
  • close():如果没有什么可以写入流,则调用此方法。
  • abort():调用此方法以取消流。即使队列中有任何数据要写入。

可写流的可访问属性和方法

  • locked:我们使用锁定的属性来了解可写流是否被锁定。
  • getWriter():我们使用此方法获取写入器并将数据写入流
  • close():当我们完成对流的写入时,我们调用 close 方法。
  • abort():我们调用 abort 方法来中断抛出错误的可写流,即使要写入的队列中存在数据。

可写流示例

1. 创建可写流

 
// file: writableStream.js
const { WritableStream } = require("stream/web")

const writableStream = new WritableStream(
    {
        start(controller) {
            console.log("Getting strated...")
        },
        write(chunk, controller) {
            console.log(chunk)
        },
        close() {
            console.log("Writable stream closed!")
        },
        abort(reason) {
            console.log("Aborting writable stream...")
        }
    },
    {
        size(chunk) {
            return chunk.length
        }
    }
)

2. 写入可写流

 
// file: writableStream.js
...

const obtainWriter = writableStream.getWriter()

obtainWriter.ready
obtainWriter.write("Hello!")
obtainWriter.write("I am writing")

转换流

它可以根据接收到的输出读取和写入数据。如果数据来自可读流,则转换流有助于将数据写入流,如果数据来自可写流,则传输流有助于从流中读取数据。Transformer 用于处理可读和可写的块

  • start(controller):在创建 Transform Stream 的实例后立即调用它。我们使用 start() 来读取队列中的数据。
  • transform(chunk, controller):我们使用这种方法来写入和转换块。它可能类似于将块转换为大写。
  • flush(controller):当转换流转换所有可写块时调用它。

可写流的可访问属性和方法

  • readable:用于读取块并访问转换流的可读端。
  • Writable 用于将块写入转换流的可写端并访问转换流的可写端。

转换流示例

1. 创建转换流

 
// file: transformStream.js
const { TransformStream } = require("stream/web")

const transformStream = new TransformStream({
    start(controller) {
        controller.enqueue("I am transformer")
    },
    transform(chunk, controller) {
        console.log("Transforming chunk....");
        controller.enqueue(chunk.toUpperCase());
    },
    flush(controller) {
        controller.terminate()
        console.log("Controller Terminated!");
    }
});

2. 读取和写入转换流

 
// file: transformStream.js
...

(async () => {
    const writableStream = transformStream.writable;
    const readableStream = transformStream.readable;
    
    // Obtain writer and write to transform stream
    const obtainWriter = writableStream.getWriter()
    for (const char of "break me!") {
        obtainWriter.write(char)
    }
    obtainWriter.close()
    
    // Read from transform stream using async iterators
    let allChunks = []
    for await (const value of readableStream) {
        allChunks.push(value)
        console.log(allChunks)
    }
})();

管链

管道是帮助将信息从一个流传递到另一个流的过程。我们可以用管道连接多个流。管道超过两条流称为管链。我们使用以下方法对两个或多个流进行管道传输:

  • pipeTo() 中
  • 管道通过()

 
// file: pipeChain.js
const { ReadableStream, WritableStream, TransformStream } = require("stream/web");

// Create a readable stream
const readbleStream = new ReadableStream(
    {
        start(controller) {
            console.log("> Getting Started")
            controller.enqueue("Hello!");
            controller.enqueue("I am readbale stream");
            controller.enqueue("It's done");
            controller.enqueue("Enough for now");
            controller.close()
        },
        cancel() {
            console.log("!Stop Reading Chunks!")
        }
    }
);

// Create a transform stream
const transformStream = new TransformStream({
    start(controller) {
        console.log("> Transformer Started")
    },
    transform(chunk, controller) {
        console.log("> Transforming Chunk");
        // converting chunk to uppercase
        controller.enqueue(chunk.toUpperCase());
    },
    flush(controller) {
        controller.terminate()
        console.log("!Controller Terminated!");
    }
});

// Writable Stream use this array to write data
let writeArray = []

// Create a writable stream
const writableStream = new WritableStream({
    start(controller) {
        console.log("> Writable Stream Started")
    },
    write(chunk, controller) {
        writeArray.push(chunk)
        console.log(writeArray)
    }
});

// Create pipe chains
readbleStream.pipeThrough(transformStream).pipeTo(writableStream)

背压

当我们应用管链时,数据有可能在接收器端溢出。因此,接收器使用管链向后发送信号。而这种信号流的过程称为背压。例如:在上一节中,我们使用了以下管链。

 
readbleStream.pipeThrough(transformStream).pipeTo(writableStream)
  • 在上面的代码块中,如果 writableStream 已满,则 pipeTo() 方法接收来自 writableStream 的信号,并停止从 transformSream 的可读端读取数据。
  • 现在,transformStream 的可写端被数据重载,并向 pipeThrough() 方法发送信号。然后,pipeThrough() 方法停止从 readableStream 读取数据。

Node.js中的 Web 流

  • NodeJs 模块 stream/web 用于支持 Web 流,正如我们在本文中已经使用的那样。
  • NodeJs 为 Fetch API 提供内置的 Web 流支持。我们可以将 html 正文的响应用作可读流。
 
const response = await fetch('https://xyz.***');
const readableStream = response.body;

字节流

 
// file: readableByteStream.js
const { ReadableStream } = require("stream/web");
const { randomFill } = require("crypto");


const readableByteStream = new ReadableStream(
    {
        type: "bytes",
        pull(controller) {

            // This object represents the current read request and gain a***ess to the provided bufferArray/typedArray
            const byobRequest = controller.byobRequest;

            // Fill bufferArray with ramdomData
            return randomFill(byobRequest.view, (err) => {
                if (err) {
                    console.log(err);
                }
                
                // Return no. of bytes added to buffer array
                byobRequest.respond(byobRequest.view.byteLength);
            });
        },
    }
);

// obtain the reader
const obtainReader = readableByteStream.getReader({ mode: "byob" });

(async () => {
    console.log(await obtainReader.read(new Uint8Array(10)));
})();

结论

  • Stream 用于以小块形式获取和传送大量数据。
  • Webstream API 是 WHATWG 标准的实现,有助于在 nodejs 中使用 Web 流。
  • 我们使用可读流从流中读取,使用可写流写入流,使用转换流来读取和写入块。
  • 可以使用 Readable Stream 类创建可读流。我们使用 getReader() 和 reader() 方法来读取流的块。
  • 可以使用 Writable Stream 类创建可写流。我们使用 getWriter() 和 write() 方法来写入块。
  • 可以使用 Transform Stream 类创建转换流。我们使用 getReader() 和 reader() 方法来读取流的块。此外,我们可以使用 getWriter() 和 write() 方法编写要流的块。
  • 我们可以使用管道链将可读流通过管道传输到其他流(即可写流、转换流)。我们使用 pipeTo() 和 pipe Through() 方法来管道流。
  • 我们可以在创建流的实例时应用属性类型:“bytes”创建一个字节流。
转载请说明出处内容投诉
CSS教程_站长资源网 » 巧妙的 Node.js:Node.JS中的Webstreams

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买