概述
NodeJs 中的 Webstream 有助于为浏览器应用程序处理小尺寸的数据。网络流使我们能够以小块的形式处理大量数据。因此,Webstreams 有助于在处理数据时减少负载并消耗更少的内存。node js 中的 Webstream 有助于压缩数据、解压缩数据、编码数据、解码数据和应用视频效果。我们还可以在 nodejs 中使用 webstreams 对 HTTP 响应进行编码和解码。
介绍
想想看,你点了一个披萨。现在有两种吃披萨的方法。
- 第一:一次吃一整块披萨。
- 第二:将其分成小块。
你会选择第二种方式,因为咀嚼和吃小块披萨既容易又舒适。同样,我们使用流来获取、处理和提供小块数据。这些小块称为块。
因此,我们使用流来处理大量数据块。此数据的源可以是任何类型,如文本文件、javascript 文件或服务器代码。Node js 中的 Webstreams 允许我们使用 WHATWG Streams 标准,该标准可作为 Web Streams API 使用。我们可以使用 Web Stream API 来读取数据、馈送数据和转换数据。通常,节点模块 stream/web 用于创建流并利用 Web Streams API。
注意:WHATWG 代表 Web 超文本应用技术工作组。WHATWG 是一个实施 Web 技术标准的社区。
流的类型
- 通常,Node Js 中有三种类型的 Webstream:
- 可读流
- 可写流
- 转换流
- 现在,我们将创建一个文件夹 nodejs-web-streams,转到当前文件夹并启动一个节点项目。
mkdir nodejs-web-streams
cd nodejs-web-streams
npm init -y
- 最初,我们的文件夹结构如下所示。
nodejs-web-streams/
├─ readableStream.js
├─ writableStream.js
├─ transformStream.js
└─ package.json
可读流
可读的 Stream 就像一个可供读取的数据源。我们使用 ReadableStream 类来创建可读流的实例。ReadableStream 有两个参数:underlyingSource 和 queuingStrategy 。这两个参数都是可选的。
-
underlyingSource:一个对象,用于决定流如何操作和处理块。它具有以下方法和属性。
- start(controller):当我们创建可读流的实例时,start 方法会立即执行。参数控制器控制添加到内部队列的数据。
- pull(controller):此方法的工作方式与 start(controller) 方法相同。但是,每当流的内部队列变为空时,就会调用它以获取更多数据(如果可用)。
- cancel(reason):当我们取消流时调用此方法。我们还可以提供取消流的原因。
- type:可以是字节或未定义。type 的默认值设置为 undefined
- autoAllocateChunkSize:它定义块的总大小以确定 AarrayBuffer 的大小。使用 autoAllocateChunkSize 时,我们必须将 type 属性设置为 bytes。
-
queuingStrategy:一个对象有助于控制流中块的流动,并通知内部队列是否已满。它包括以下属性和方法:
- highWaterMark:它表示流可以处理的总块的最高限制。highWaterMark 的默认值设置为 1。
- size():它计算每个块的大小,并且必须返回一个数字。控制器还使用它来计算属性 desiredSize 的值。
可读流的可访问属性和方法
- locked:我们使用 locked 属性来了解可读流是否被锁定。
- cancel():它使用 underlyingSource 对象的 cancel() 方法。我们使用 cancel() 方法来取消流。cancel() 方法返回错误 If a stream is locked。
- getReader():用于获取读取器并锁定当前读取器的流。因此,在使用 releaseLock() 方法解锁流之前,我们不能使用另一个读取器。
- tee():它创建一个可读流的副本,并返回一个由两个可读流组成的数组。
- pipeTo():此方法用于将可读流的块传递给另一个流。 -pipeThrough():用于将可读流的块传递给其他流并转换数据。
可读流示例
1. 创建可读流
// file: readableStream.js
const { ReadableStream } = require("stream/web");
const readableStream = new ReadableStream(
{
start(controller) {
console.log("> Getting started");
controller.enqueue("Hello!");
controller.enqueue("I am readbale stream");
},
cancel() {
console.log("!Stop reading chunks!")
}
},
{
size(chunk) {
return chunk.length
}
}
);
2. 读取可读流
// file: readableStream.js
...
const obtainReader = readableStream.getReader();
// read line by line
obtainReader.read().then(console.log);
obtainReader.read().then(console.log);
obtainReader.read().then(console.log);
3. 异步迭代器
// file: readableStream.js
...
//Read using async iterator
(async () => {
for await (const value of readbleStream) {
console.log(value);
}
})();
可写流:
可写流充当可写入数据的目标。我们使用 WritableStream 类来创建可写流的实例。WritableStream 有两个参数:underlyingSink 和 queuingStrategy。这两个参数都是可选的。underlyingSink 是一个对象,用于决定可写流如何操作和处理块。它具有以下方法和属性。
-start():当我们创建可写流的实例时,start 方法会立即执行。参数控制器控制写入内部队列的数据。
- write():当编写器准备好写入流时,将调用此方法。
- close():如果没有什么可以写入流,则调用此方法。
- abort():调用此方法以取消流。即使队列中有任何数据要写入。
可写流的可访问属性和方法
- locked:我们使用锁定的属性来了解可写流是否被锁定。
- getWriter():我们使用此方法获取写入器并将数据写入流
- close():当我们完成对流的写入时,我们调用 close 方法。
- abort():我们调用 abort 方法来中断抛出错误的可写流,即使要写入的队列中存在数据。
可写流示例
1. 创建可写流
// file: writableStream.js
const { WritableStream } = require("stream/web")
const writableStream = new WritableStream(
{
start(controller) {
console.log("Getting strated...")
},
write(chunk, controller) {
console.log(chunk)
},
close() {
console.log("Writable stream closed!")
},
abort(reason) {
console.log("Aborting writable stream...")
}
},
{
size(chunk) {
return chunk.length
}
}
)
2. 写入可写流
// file: writableStream.js
...
const obtainWriter = writableStream.getWriter()
obtainWriter.ready
obtainWriter.write("Hello!")
obtainWriter.write("I am writing")
转换流
它可以根据接收到的输出读取和写入数据。如果数据来自可读流,则转换流有助于将数据写入流,如果数据来自可写流,则传输流有助于从流中读取数据。Transformer 用于处理可读和可写的块
- start(controller):在创建 Transform Stream 的实例后立即调用它。我们使用 start() 来读取队列中的数据。
- transform(chunk, controller):我们使用这种方法来写入和转换块。它可能类似于将块转换为大写。
- flush(controller):当转换流转换所有可写块时调用它。
可写流的可访问属性和方法
- readable:用于读取块并访问转换流的可读端。
- Writable 用于将块写入转换流的可写端并访问转换流的可写端。
转换流示例
1. 创建转换流
// file: transformStream.js
const { TransformStream } = require("stream/web")
const transformStream = new TransformStream({
start(controller) {
controller.enqueue("I am transformer")
},
transform(chunk, controller) {
console.log("Transforming chunk....");
controller.enqueue(chunk.toUpperCase());
},
flush(controller) {
controller.terminate()
console.log("Controller Terminated!");
}
});
2. 读取和写入转换流
// file: transformStream.js
...
(async () => {
const writableStream = transformStream.writable;
const readableStream = transformStream.readable;
// Obtain writer and write to transform stream
const obtainWriter = writableStream.getWriter()
for (const char of "break me!") {
obtainWriter.write(char)
}
obtainWriter.close()
// Read from transform stream using async iterators
let allChunks = []
for await (const value of readableStream) {
allChunks.push(value)
console.log(allChunks)
}
})();
管链
管道是帮助将信息从一个流传递到另一个流的过程。我们可以用管道连接多个流。管道超过两条流称为管链。我们使用以下方法对两个或多个流进行管道传输:
- pipeTo() 中
- 管道通过()
例
// file: pipeChain.js
const { ReadableStream, WritableStream, TransformStream } = require("stream/web");
// Create a readable stream
const readbleStream = new ReadableStream(
{
start(controller) {
console.log("> Getting Started")
controller.enqueue("Hello!");
controller.enqueue("I am readbale stream");
controller.enqueue("It's done");
controller.enqueue("Enough for now");
controller.close()
},
cancel() {
console.log("!Stop Reading Chunks!")
}
}
);
// Create a transform stream
const transformStream = new TransformStream({
start(controller) {
console.log("> Transformer Started")
},
transform(chunk, controller) {
console.log("> Transforming Chunk");
// converting chunk to uppercase
controller.enqueue(chunk.toUpperCase());
},
flush(controller) {
controller.terminate()
console.log("!Controller Terminated!");
}
});
// Writable Stream use this array to write data
let writeArray = []
// Create a writable stream
const writableStream = new WritableStream({
start(controller) {
console.log("> Writable Stream Started")
},
write(chunk, controller) {
writeArray.push(chunk)
console.log(writeArray)
}
});
// Create pipe chains
readbleStream.pipeThrough(transformStream).pipeTo(writableStream)
背压
当我们应用管链时,数据有可能在接收器端溢出。因此,接收器使用管链向后发送信号。而这种信号流的过程称为背压。例如:在上一节中,我们使用了以下管链。
readbleStream.pipeThrough(transformStream).pipeTo(writableStream)
- 在上面的代码块中,如果 writableStream 已满,则 pipeTo() 方法接收来自 writableStream 的信号,并停止从 transformSream 的可读端读取数据。
- 现在,transformStream 的可写端被数据重载,并向 pipeThrough() 方法发送信号。然后,pipeThrough() 方法停止从 readableStream 读取数据。
Node.js中的 Web 流
- NodeJs 模块 stream/web 用于支持 Web 流,正如我们在本文中已经使用的那样。
- NodeJs 为 Fetch API 提供内置的 Web 流支持。我们可以将 html 正文的响应用作可读流。
const response = await fetch('https://xyz.***');
const readableStream = response.body;
字节流
// file: readableByteStream.js
const { ReadableStream } = require("stream/web");
const { randomFill } = require("crypto");
const readableByteStream = new ReadableStream(
{
type: "bytes",
pull(controller) {
// This object represents the current read request and gain a***ess to the provided bufferArray/typedArray
const byobRequest = controller.byobRequest;
// Fill bufferArray with ramdomData
return randomFill(byobRequest.view, (err) => {
if (err) {
console.log(err);
}
// Return no. of bytes added to buffer array
byobRequest.respond(byobRequest.view.byteLength);
});
},
}
);
// obtain the reader
const obtainReader = readableByteStream.getReader({ mode: "byob" });
(async () => {
console.log(await obtainReader.read(new Uint8Array(10)));
})();
结论
- Stream 用于以小块形式获取和传送大量数据。
- Webstream API 是 WHATWG 标准的实现,有助于在 nodejs 中使用 Web 流。
- 我们使用可读流从流中读取,使用可写流写入流,使用转换流来读取和写入块。
- 可以使用 Readable Stream 类创建可读流。我们使用 getReader() 和 reader() 方法来读取流的块。
- 可以使用 Writable Stream 类创建可写流。我们使用 getWriter() 和 write() 方法来写入块。
- 可以使用 Transform Stream 类创建转换流。我们使用 getReader() 和 reader() 方法来读取流的块。此外,我们可以使用 getWriter() 和 write() 方法编写要流的块。
- 我们可以使用管道链将可读流通过管道传输到其他流(即可写流、转换流)。我们使用 pipeTo() 和 pipe Through() 方法来管道流。
- 我们可以在创建流的实例时应用属性类型:“bytes”创建一个字节流。