# Node.js中有几种stream ## 前言 在Node.js中,流(Stream)是处理流式数据的抽象接口,是Node.js最重要的核心模块之一。流提供了一种高效处理数据的方式,特别是当处理大量数据或数据从外部来源逐渐到达时。本文将深入探讨Node.js中的流类型、实现原理以及实际应用场景。 ## 一、流的基本概念 ### 1.1 什么是流 流是数据的集合,就像数组或字符串一样。不同的是,流的数据可能不会一次性全部可用,也不需要一次性全部加载到内存中。这使得流在处理大型数据集或来自外部源的数据时特别高效。 ### 1.2 为什么需要流 传统的数据处理方式需要将全部数据加载到内存中才能进行处理,这会导致: - 高内存消耗 - 处理延迟(需要等待所有数据加载完成) - 不适用于实时数据处理 流解决了这些问题,它允许我们: - 分段处理数据 - 内存使用更高效 - 实现实时处理 ### 1.3 流的基本类型 Node.js中有四种基本的流类型: 1. 可读流(Readable) 2. 可写流(Writable) 3. 双工流(Duplex) 4. 转换流(Transform) ## 二、可读流(Readable Stream) ### 2.1 可读流概述 可读流是数据的源头,表示可以从中读取数据的流。常见例子包括: - 文件读取流 - HTTP请求 - TCP sockets - 标准输入(stdin) ### 2.2 可读流的两种模式 可读流有两种读取模式: #### 2.2.1 流动模式(Flowing Mode) 数据自动从底层系统读取,并通过事件尽可能快地提供给应用程序。 ```javascript const fs = require('fs'); const readable = fs.createReadStream('largefile.txt'); readable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); });
必须显式调用stream.read()
方法来从流中读取数据片段。
const readable = fs.createReadStream('largefile.txt'); readable.on('readable', () => { let chunk; while ((chunk = readable.read()) !== null) { console.log(`Received ${chunk.length} bytes of data.`); } });
const { Readable } = require('stream'); class MyReadable extends Readable { constructor(options) { super(options); this.data = ['a', 'b', 'c', 'd', 'e']; this.index = 0; } _read(size) { if (this.index >= this.data.length) { this.push(null); // 结束流 } else { this.push(this.data[this.index++]); } } } const myReadable = new MyReadable(); myReadable.on('data', (chunk) => { console.log(chunk.toString()); });
可写流是数据的目标,表示可以向其中写入数据的流。常见例子包括: - 文件写入流 - HTTP响应 - TCP sockets - 标准输出(stdout) - 标准错误(stderr)
const fs = require('fs'); const writable = fs.createWriteStream('output.txt'); writable.write('Hello, '); writable.write('World!'); writable.end(); // 结束写入
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { super(options); this.data = []; } _write(chunk, encoding, callback) { this.data.push(chunk.toString()); callback(); // 表示写入完成 } } const myWritable = new MyWritable(); myWritable.write('Hello'); myWritable.write('World'); myWritable.end();
双工流既是可读的又是可写的,可以看作是可读流和可写流的组合。常见例子包括: - TCP sockets - WebSocket连接 - 压缩/解压缩流
const { Duplex } = require('stream'); class MyDuplex extends Duplex { constructor(options) { super(options); this.data = []; } _write(chunk, encoding, callback) { this.data.push(chunk.toString()); callback(); } _read(size) { if (this.data.length === 0) { this.push(null); } else { this.push(this.data.shift()); } } } const duplex = new MyDuplex(); duplex.on('data', (chunk) => { console.log('Received:', chunk.toString()); }); duplex.write('Hello'); duplex.write('World'); duplex.end();
转换流是一种特殊的双工流,它的输出与输入是相关的。常见例子包括: - zlib压缩/解压缩 - crypto加密/解密 - 数据格式转换
const { Transform } = require('stream'); class UpperCaseTransform extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } } const upperCase = new UpperCaseTransform(); process.stdin.pipe(upperCase).pipe(process.stdout);
一种特殊的转换流,只是简单地传递数据而不做任何修改。
const { PassThrough } = require('stream'); const passThrough = new PassThrough(); passThrough.on('data', (chunk) => { console.log('Received:', chunk.toString()); }); passThrough.write('Hello'); passThrough.end();
默认情况下,流处理的是Buffer或String类型的数据。对象模式允许流处理任意JavaScript对象。
const { Readable } = require('stream'); const objectStream = new Readable({ objectMode: true, read() {} }); objectStream.push({ name: 'Alice' }); objectStream.push({ name: 'Bob' }); objectStream.push(null); objectStream.on('data', (obj) => { console.log('Received object:', obj); });
管道是将多个流连接在一起的机制,数据自动从一个流流向另一个流。
const fs = require('fs'); const zlib = require('zlib'); fs.createReadStream('input.txt') .pipe(zlib.createGzip()) .pipe(fs.createWriteStream('input.txt.gz'));
流中的错误需要通过事件监听器捕获:
const stream = fs.createReadStream('nonexistent.txt'); stream.on('error', (err) => { console.error('Stream error:', err); });
当数据生产速度大于消费速度时,流会自动处理背压问题。
使用destroy()
方法可以手动销毁流并释放资源。
// 文件复制 const readStream = fs.createReadStream('source.mp4'); const writeStream = fs.createWriteStream('destination.mp4'); readStream.pipe(writeStream);
const http = require('http'); const server = http.createServer((req, res) => { const fileStream = fs.createReadStream('largefile.txt'); fileStream.pipe(res); }); server.listen(3000);
const csv = require('csv-parser'); const { Transform } = require('stream'); const { createWriteStream } = require('fs'); // CSV转JSON fs.createReadStream('data.csv') .pipe(csv()) .pipe(new Transform({ objectMode: true, transform: (obj, encoding, callback) => { callback(null, JSON.stringify(obj) + '\n'); } })) .pipe(fs.createWriteStream('data.jsonl'));
根据场景选择适当的流类型可以提高性能: - 纯读取:Readable - 纯写入:Writable - 双向独立:Duplex - 数据转换:Transform
const readStream = fs.createReadStream('largefile.txt', { highWaterMark: 1024 * 1024 // 1MB });
const { pipeline } = require('stream'); const zlib = require('zlib'); pipeline( fs.createReadStream('input.txt'), zlib.createGzip(), fs.createWriteStream('input.txt.gz'), (err) => { if (err) { console.error('Pipeline failed:', err); } else { console.log('Pipeline succeeded'); } } );
确保正确处理流结束和错误事件,避免未关闭的流导致内存泄漏。
确保正确处理end
事件,避免在流结束前关闭资源。
调整highWaterMark
或使用pause()
/resume()
控制流量。
使用pipeline
代替pipe
可以更好地传播错误。
确保流之间的编码和数据格式兼容。
Node.js持续优化流API,提高性能和易用性。
Node.js正在逐步实现Web Streams标准,提高与浏览器API的兼容性。
Node.js流现在支持异步迭代器,提供了新的消费流的方式:
async function processStream() { const readable = fs.createReadStream('data.txt'); for await (const chunk of readable) { console.log(chunk.toString()); } }
Node.js中的流是处理数据的高效工具,主要分为四种基本类型:可读流、可写流、双工流和转换流。每种流类型都有其特定的用途和优势:
掌握这些流类型及其特性,可以帮助开发者构建高效、可扩展的Node.js应用程序,特别是在处理I/O密集型任务时。通过合理使用流,可以显著降低内存使用,提高应用程序性能,并实现实时数据处理能力。
随着Node.js的发展,流API也在不断改进,引入了更多现代JavaScript特性如异步迭代器,以及与Web标准的对齐。这些改进使得流API更加易用和强大,成为Node.js生态系统中不可或缺的一部分。
fs
:文件系统流http
/https
:HTTP流zlib
:压缩/解压缩流crypto
:加密流net
/dgram
:TCP/UDP流child_process
:子进程流stream
:核心流模块”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。