Skip to content

Commit 1126dfe

Browse files
aks-addaleax
authored andcommitted
lib: merge onread handlers for http2 streams & net.Socket
Refs: #20993 Co-authored-by: Anna Henningsen <anna@addaleax.net> PR-URL: #22449 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
1 parent 4a0466f commit 1126dfe

File tree

3 files changed

+68
-95
lines changed

3 files changed

+68
-95
lines changed

lib/internal/http2/core.js

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ const {
105105
const {
106106
createWriteWrap,
107107
writeGeneric,
108-
writevGeneric
108+
writevGeneric,
109+
onStreamRead,
110+
kMaybeDestroy,
111+
kUpdateTimer
109112
} = require('internal/stream_base_commons');
110113
const {
111114
kTimeout,
@@ -142,7 +145,6 @@ const kHandle = Symbol('handle');
142145
const kID = Symbol('id');
143146
const kInit = Symbol('init');
144147
const kInfoHeaders = Symbol('sent-info-headers');
145-
const kMaybeDestroy = Symbol('maybe-destroy');
146148
const kLocalSettings = Symbol('local-settings');
147149
const kOptions = Symbol('options');
148150
const kOwner = owner_symbol;
@@ -156,7 +158,6 @@ const kServer = Symbol('server');
156158
const kSession = Symbol('session');
157159
const kState = Symbol('state');
158160
const kType = Symbol('type');
159-
const kUpdateTimer = Symbol('update-timer');
160161
const kWriteGeneric = Symbol('write-generic');
161162

162163
const kDefaultSocketTimeout = 2 * 60 * 1000;
@@ -374,36 +375,6 @@ function onStreamClose(code) {
374375
}
375376
}
376377

377-
// Receives a chunk of data for a given stream and forwards it on
378-
// to the Http2Stream Duplex for processing.
379-
function onStreamRead(nread, buf) {
380-
const stream = this[kOwner];
381-
if (nread >= 0 && !stream.destroyed) {
382-
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
383-
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
384-
`of size ${nread}`);
385-
stream[kUpdateTimer]();
386-
if (!stream.push(buf)) {
387-
if (!stream.destroyed) // we have to check a second time
388-
this.readStop();
389-
}
390-
return;
391-
}
392-
393-
// Last chunk was received. End the readable side.
394-
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
395-
`${sessionName(stream[kSession][kType])}]: ending readable.`);
396-
397-
// defer this until we actually emit end
398-
if (!stream.readable) {
399-
stream[kMaybeDestroy]();
400-
} else {
401-
stream.on('end', stream[kMaybeDestroy]);
402-
stream.push(null);
403-
stream.read(0);
404-
}
405-
}
406-
407378
// Called when the remote peer settings have been updated.
408379
// Resets the cached settings.
409380
function onSettings() {
@@ -2145,6 +2116,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
21452116
class ServerHttp2Stream extends Http2Stream {
21462117
constructor(session, handle, id, options, headers) {
21472118
super(session, options);
2119+
handle.owner = this;
21482120
this[kInit](id, handle);
21492121
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
21502122
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];

lib/internal/stream_base_commons.js

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
'use strict';
22

33
const { Buffer } = require('buffer');
4-
const errors = require('internal/errors');
54
const { internalBinding } = require('internal/bootstrap/loaders');
65
const { WriteWrap } = internalBinding('stream_wrap');
6+
const { UV_EOF } = internalBinding('uv');
7+
const { errnoException } = require('internal/errors');
8+
const { owner_symbol } = require('internal/async_hooks').symbols;
79

8-
const errnoException = errors.errnoException;
10+
const kMaybeDestroy = Symbol('kMaybeDestroy');
11+
const kUpdateTimer = Symbol('kUpdateTimer');
912

1013
function handleWriteReq(req, data, encoding) {
1114
const { handle } = req;
@@ -82,8 +85,54 @@ function afterWriteDispatched(self, req, err, cb) {
8285
}
8386
}
8487

88+
function onStreamRead(nread, buf) {
89+
const handle = this;
90+
const stream = this[owner_symbol];
91+
92+
stream[kUpdateTimer]();
93+
94+
if (nread > 0 && !stream.destroyed) {
95+
if (!stream.push(buf)) {
96+
handle.reading = false;
97+
if (!stream.destroyed) {
98+
const err = handle.readStop();
99+
if (err)
100+
stream.destroy(errnoException(err, 'read'));
101+
}
102+
}
103+
104+
return;
105+
}
106+
107+
if (nread === 0) {
108+
return;
109+
}
110+
111+
if (nread !== UV_EOF) {
112+
return stream.destroy(errnoException(nread, 'read'));
113+
}
114+
115+
// defer this until we actually emit end
116+
if (stream._readableState.endEmitted) {
117+
if (stream[kMaybeDestroy])
118+
stream[kMaybeDestroy]();
119+
} else {
120+
if (stream[kMaybeDestroy])
121+
stream.on('end', stream[kMaybeDestroy]);
122+
123+
// push a null to signal the end of data.
124+
// Do it before `maybeDestroy` for correct order of events:
125+
// `end` -> `close`
126+
stream.push(null);
127+
stream.read(0);
128+
}
129+
}
130+
85131
module.exports = {
86132
createWriteWrap,
87133
writevGeneric,
88-
writeGeneric
134+
writeGeneric,
135+
onStreamRead,
136+
kMaybeDestroy,
137+
kUpdateTimer,
89138
};

lib/net.js

Lines changed: 11 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ const assert = require('assert');
3737
const { internalBinding } = require('internal/bootstrap/loaders');
3838
const {
3939
UV_EADDRINUSE,
40-
UV_EINVAL,
41-
UV_EOF
40+
UV_EINVAL
4241
} = internalBinding('uv');
4342

4443
const { Buffer } = require('buffer');
@@ -62,7 +61,9 @@ const {
6261
const {
6362
createWriteWrap,
6463
writevGeneric,
65-
writeGeneric
64+
writeGeneric,
65+
onStreamRead,
66+
kUpdateTimer
6667
} = require('internal/stream_base_commons');
6768
const errors = require('internal/errors');
6869
const {
@@ -210,7 +211,7 @@ function initSocketHandle(self) {
210211
// Handle creation may be deferred to bind() or connect() time.
211212
if (self._handle) {
212213
self._handle[owner_symbol] = self;
213-
self._handle.onread = onread;
214+
self._handle.onread = onStreamRead;
214215
self[async_id_symbol] = getNewAsyncId(self._handle);
215216
}
216217
}
@@ -516,6 +517,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
516517
}
517518
});
518519

520+
Object.defineProperty(Socket.prototype, kUpdateTimer, {
521+
get: function() {
522+
return this._unrefTimer;
523+
}
524+
});
525+
519526

520527
// Just call handle.readStart until we have enough in the buffer
521528
Socket.prototype._read = function(n) {
@@ -617,61 +624,6 @@ Socket.prototype._destroy = function(exception, cb) {
617624
}
618625
};
619626

620-
621-
// This function is called whenever the handle gets a
622-
// buffer, or when there's an error reading.
623-
function onread(nread, buffer) {
624-
var handle = this;
625-
var self = handle[owner_symbol];
626-
assert(handle === self._handle, 'handle != self._handle');
627-
628-
self._unrefTimer();
629-
630-
debug('onread', nread);
631-
632-
if (nread > 0) {
633-
debug('got data');
634-
635-
// read success.
636-
// In theory (and in practice) calling readStop right now
637-
// will prevent this from being called again until _read() gets
638-
// called again.
639-
640-
// Optimization: emit the original buffer with end points
641-
var ret = self.push(buffer);
642-
643-
if (handle.reading && !ret) {
644-
handle.reading = false;
645-
debug('readStop');
646-
var err = handle.readStop();
647-
if (err)
648-
self.destroy(errnoException(err, 'read'));
649-
}
650-
return;
651-
}
652-
653-
// if we didn't get any bytes, that doesn't necessarily mean EOF.
654-
// wait for the next one.
655-
if (nread === 0) {
656-
debug('not any data, keep waiting');
657-
return;
658-
}
659-
660-
// Error, possibly EOF.
661-
if (nread !== UV_EOF) {
662-
return self.destroy(errnoException(nread, 'read'));
663-
}
664-
665-
debug('EOF');
666-
667-
// push a null to signal the end of data.
668-
// Do it before `maybeDestroy` for correct order of events:
669-
// `end` -> `close`
670-
self.push(null);
671-
self.read(0);
672-
}
673-
674-
675627
Socket.prototype._getpeername = function() {
676628
if (!this._peername) {
677629
if (!this._handle || !this._handle.getpeername) {

0 commit comments

Comments
 (0)