|
| 1 | +类型: |
| 2 | +自定义 ReadStream |
| 3 | +自定义 WriteStream |
| 4 | +自定义 DuplexStream |
| 5 | +自定义 TransformStrem |
| 6 | + |
| 7 | +模式对比: |
| 8 | +string/buffer |
| 9 | +object mode |
| 10 | + |
| 11 | + |
| 12 | +缓存:(buffering、highWaterMark) |
| 13 | + |
| 14 | +两种视角: |
| 15 | +stream使用 |
| 16 | +stream实现 |
| 17 | + |
| 18 | + |
| 19 | +## Readable Stream |
| 20 | + |
| 21 | +可以通过两种方式从一个Readable Steram中读取数据: |
| 22 | + |
| 23 | +1. none-flowing:默认 |
| 24 | +2. flowing |
| 25 | + |
| 26 | +### none-flowing mode |
| 27 | + |
| 28 | +以下代码从标准输入中读取内容,并写回到标准输出。 |
| 29 | + |
| 30 | +1. read()方法是同步调用,默认返回buffer,也可以通过 readStream.setEncoding(charset) 使得取得的内容为字符串。(从内部的缓冲区里读取内容) |
| 31 | +2. 当内部缓冲区有数据可供读取时,readable触发(可能触发多次)。read() 方法会循环调用,直到返回null。此时,停止数据读取,直到下一次readable触发,或者end触发。 |
| 32 | +2. 回车:process.stdin.read() 返回,程序打印标准输入的内容。 |
| 33 | +3. EOF:触发end事件,CTRL+D(linux)、CTRL+Z(windows)。 |
| 34 | + |
| 35 | +```javascript |
| 36 | +process.stdin |
| 37 | + .on('readable', () => { |
| 38 | + let chunk; |
| 39 | + while ((chunk = process.stdin.read()) !== null) { |
| 40 | + console.log(`Buffer.isBuffer(chunk): ${Buffer.isBuffer(buffer)}`); // true |
| 41 | + console.log(`Chunk read: ${chunk.toString()}`); |
| 42 | + } |
| 43 | + }) |
| 44 | + .on('end', () => { |
| 45 | + process.stdout.write('End of Stream'); |
| 46 | + }); |
| 47 | +``` |
| 48 | + |
| 49 | +### flowing mode |
| 50 | + |
| 51 | +跟none-flowing mode的区别: |
| 52 | + |
| 53 | +1. none-flowing:当内部缓冲区有数据,触发readable事件。用户需要主动调用 read() 方法读取数据。(如果用户在 readable 事件触发时,没有调用 read() 方法,会怎么样?) |
| 54 | +2. flowing:当有数据到来时,'data' 事件触发,同时内部缓冲区的数据会被带到回调参数里。 |
| 55 | + |
| 56 | +```javascript |
| 57 | +process.stdin |
| 58 | + .on('data', (chunk) => { |
| 59 | + console.log(`Buffer.isBuffer(chunk): ${Buffer.isBuffer(chunk)}`); |
| 60 | + console.log(`Chunk read: ${chunk.toString()}`); |
| 61 | + }) |
| 62 | + .on('end', () => { |
| 63 | + process.stdout.write('End of Stream'); |
| 64 | + }); |
| 65 | +``` |
| 66 | + |
| 67 | +flowing mode是对旧版本stream接口的继承(换个翻译方式?Stream1),在控制数据的流向方面灵活性一般。随着Stream2接口的引入,flowing mode不是默认的模式。 |
| 68 | + |
| 69 | +要将stream切换到flowing mode,有两种方式: |
| 70 | + |
| 71 | +1. 添加 'data' 回调。 |
| 72 | +2. 调用 resume() 方法。 |
| 73 | + |
| 74 | +要让stream暂时停止抛出 'data' 事件,可以调用 pause() 方法。注意,这样并不能将stream切换到none-flowing mode,只是暂停 'data' 事件的触发,后续进来的数据会被缓存在内部缓冲区。 |
| 75 | + |
| 76 | +## 实现Readable Steam |
| 77 | + |
| 78 | +```javascript |
| 79 | +// randomStream.js |
| 80 | +const { Readable } = require('stream'); |
| 81 | + |
| 82 | +const arr = []; |
| 83 | + |
| 84 | +class RandomStream extends Readable { |
| 85 | + constructor (options) { |
| 86 | + super(options); |
| 87 | + } |
| 88 | + |
| 89 | + _read () { |
| 90 | + arr.push(`[RandomStream] _read() is called`); |
| 91 | + let num = Math.random(); |
| 92 | + this.push(num.toString() + ' ', 'utf8'); |
| 93 | + |
| 94 | + if (num <= 0.1) { |
| 95 | + this.push(null); // end |
| 96 | + } |
| 97 | + } |
| 98 | +} |
| 99 | + |
| 100 | +const rs = new RandomStream(); |
| 101 | +rs |
| 102 | +.on('readable', () => { |
| 103 | + arr.push(`[readable] before loop`); |
| 104 | + |
| 105 | + let chunk; |
| 106 | + while ((chunk = rs.read()) !== null) { |
| 107 | + arr.push(`chunk read: ${chunk}`); |
| 108 | + } |
| 109 | + |
| 110 | + arr.push(`[readable] after loop`); |
| 111 | +}) |
| 112 | +.on('end', () => { |
| 113 | + arr.push(`[end]`); |
| 114 | + console.log(arr.join('\n')); |
| 115 | +}) |
| 116 | +``` |
| 117 | + |
| 118 | +运行结果输出如下(顺序有点不大对劲?): |
| 119 | + |
| 120 | +```bash |
| 121 | +[RandomStream] _read() is called |
| 122 | +[readable] before loop |
| 123 | +[RandomStream] _read() is called |
| 124 | +chunk read: 0.9455902221151478 0.4752694596188789 |
| 125 | +[RandomStream] _read() is called |
| 126 | +chunk read: 0.9372690495391933 |
| 127 | +[RandomStream] _read() is called |
| 128 | +chunk read: 0.053975422709547694 |
| 129 | +[readable] after loop |
| 130 | +[readable] before loop |
| 131 | +[readable] after loop |
| 132 | +[readable] before loop |
| 133 | +[readable] after loop |
| 134 | +[readable] before loop |
| 135 | +[readable] after loop |
| 136 | +[end] |
| 137 | +``` |
| 138 | + |
| 139 | + |
| 140 | +## Write Stream |
| 141 | + |
| 142 | +通过 write() 写入数据。如果chunk是buffer类型,encoding可以忽略不计。如果chunk是string类型,则通过encoding指定编码,默认是utf8。当chunk写入完成,callback被调用。 |
| 143 | + |
| 144 | +write(chunk, [encoding], [callback]) |
| 145 | + |
| 146 | +通过 end() 结束写入。chunk、encoding、callback 参数作用跟 write() 方法相同。这里的 callback,作用跟 .on('finish', onFinishCallback) 中的 onFinishCallback 相同。 |
| 147 | + |
| 148 | +end(chunk, [encoding], [callback]) |
| 149 | + |
| 150 | + |
| 151 | +```javascript |
| 152 | +const http = require('http'); |
| 153 | +const port = 3000; |
| 154 | + |
| 155 | +http.createServer((req, res) => { |
| 156 | + let num; |
| 157 | + while ((num = Math.random()) > 0.1) { |
| 158 | + res.write('res.write(): ' + num.toString() + '\n'); |
| 159 | + } |
| 160 | + res.end('res.end(): the end'); |
| 161 | + res.on('finish', () => console.log('finished.')); |
| 162 | +}).listen(port); |
| 163 | +``` |
| 164 | + |
| 165 | +请求: |
| 166 | + |
| 167 | +```bash |
| 168 | +curl http://127.0.0.1:3000 |
| 169 | +``` |
| 170 | + |
| 171 | +输出: |
| 172 | + |
| 173 | +```bash |
| 174 | +res.write(): 0.3070578038171923 |
| 175 | +res.write(): 0.6395702937677197 |
| 176 | +res.write(): 0.7310690728411677 |
| 177 | +res.write(): 0.9383379632316118 |
| 178 | +res.write(): 0.47331240688271636 |
| 179 | +res.write(): 0.1311702075669403 |
| 180 | +res.write(): 0.7170623464834849 |
| 181 | +res.write(): 0.3973024871804054 |
| 182 | +res.write(): 0.7583489396978729 |
| 183 | +res.write(): 0.5808965383971327 |
| 184 | +res.write(): 0.22983892514760362 |
| 185 | +res.write(): 0.25565119168375583 |
| 186 | +res.end(): the end |
| 187 | +``` |
| 188 | + |
| 189 | +备注:如果是通过浏览器访问,浏览器本身可能会对响应进行缓存,因此,多次调用res.write(),浏览器里有可能是一次性把内容展示出来 ) |
| 190 | + |
| 191 | +## Duplex Stream |
| 192 | + |
| 193 | +Duplex Stream可读、可写。开发者需要同时实现 _read()、_write() 方法。简单的例子如下: |
| 194 | + |
| 195 | +```javascript |
| 196 | +const { Duplex } = require('stream'); |
| 197 | + |
| 198 | +class DP extends Duplex { |
| 199 | + constructor (options = {}) { |
| 200 | + super(options); |
| 201 | + this._innerChunks = []; |
| 202 | + } |
| 203 | + |
| 204 | + _write (chunk, encoding, callback) { |
| 205 | + this._innerChunks.push({chunk, encoding}); |
| 206 | + callback(); |
| 207 | + } |
| 208 | + |
| 209 | + _read () { |
| 210 | + this._innerChunks.forEach(item => { |
| 211 | + let upperCasedAlphabet = item.chunk.toString().toUpperCase(); |
| 212 | + this.push(upperCasedAlphabet); |
| 213 | + }); |
| 214 | + this.push(null); // end |
| 215 | + } |
| 216 | +} |
| 217 | + |
| 218 | +const dp = new DP(); |
| 219 | +dp.pipe(process.stdout); |
| 220 | + |
| 221 | +dp.write('a'); |
| 222 | +dp.write('b'); |
| 223 | +dp.write('c'); |
| 224 | +dp.end(); |
| 225 | +``` |
| 226 | + |
| 227 | +相比 readstream、writestream,支持另外的配置参数: |
| 228 | + |
| 229 | +* allowHalfOpen:默认是true。如果设置为false,当 read side 结束时,wirte side 也会被结束掉。 |
| 230 | +* readableObjectMode:默认是false。设置read side的objectMode。 |
| 231 | +* writableObjectMode:默认是false。设置write side的objectMode。 |
| 232 | +* readableHighWaterMark:设置read side的highWaterMark。如果有 highWaterMark 设置项存在,这个设置项会被忽略。 |
| 233 | +* writableHighWaterMark:设置write side的highWaterMark。如果有 highWaterMark 设置项存在,这个设置项会被忽略。 |
| 234 | + |
| 235 | +## Transform Stream |
| 236 | + |
| 237 | +需要自定义 _transform()、_flush() 方法。代码如下: |
| 238 | + |
| 239 | +```javascript |
| 240 | +const { Transform } = require('stream'); |
| 241 | + |
| 242 | +class TR extends Transform { |
| 243 | + constructor (options = {}) { |
| 244 | + super(options); |
| 245 | + } |
| 246 | + |
| 247 | + _transform (chunk, encoding, callback) { |
| 248 | + let upperCasedAlphabet = chunk.toString().toUpperCase(); |
| 249 | + this.push(upperCasedAlphabet); |
| 250 | + callback(); |
| 251 | + } |
| 252 | + |
| 253 | + _flush (callback) { |
| 254 | + this.push('!'); |
| 255 | + callback(); |
| 256 | + } |
| 257 | +} |
| 258 | + |
| 259 | +const tr = new TR(); |
| 260 | +// tr.pipe(process.stdout); |
| 261 | +tr.on('data', (chunk) => console.log(`ondata: ${chunk}`)); |
| 262 | + |
| 263 | +tr.write('a'); |
| 264 | +tr.write('b'); |
| 265 | +tr.write('c'); |
| 266 | +tr.end(); |
| 267 | + |
| 268 | +// ondata: A |
| 269 | +// ondata: B |
| 270 | +// ondata: C |
| 271 | +// ondata: ! |
| 272 | +``` |
| 273 | + |
| 274 | +## TODO |
| 275 | + |
| 276 | +### readSteram |
| 277 | + |
| 278 | +process.stdin.read() vs process.stdin.read(size) 在终端上的表现。 |
| 279 | + |
| 280 | +readable 事件触发,用户没有调用 read() 方法,会有什么影响?(丢失数据?还是数据保留在内部缓冲区,但新的数据不进去了?) |
| 281 | + |
| 282 | +_read([size]) 方法,有没有传 size ,两者实现的区别?内部调用 push() 时,如果 返回 false,该如何处理?(返回false时,当前想push的data是否需要重新push?) |
| 283 | + |
| 284 | +实现 Readable Stream,打印的 readable 有点不大对? |
| 285 | + |
| 286 | +### write stream |
| 287 | + |
| 288 | +write(chunk) 调用,如果写入的 chunk 太多,且远超过 backpressure 的值,会有什么影响(internal buffer 也容不下的情况)? |
| 289 | + |
| 290 | +backpressure 对read stream、write stream 的影响。 |
| 291 | + |
| 292 | + |
| 293 | +### transform stream |
| 294 | + |
| 295 | +readstream.on('data', fn) 与 readStream.pipe(stream) 的区别()。多次调用 write(),on('data') 输出会换行。pipe() 不会换行(参考 Transform Stream小节) |
| 296 | + |
| 297 | +## 参考资料 |
| 298 | + |
| 299 | +Node.js Design Patterns |
0 commit comments