@@ -6,8 +6,8 @@ const { Duplex } = require('stream');
66const { randomFillSync } = require ( 'crypto' ) ;
77
88const PerMessageDeflate = require ( './permessage-deflate' ) ;
9- const { EMPTY_BUFFER } = require ( './constants' ) ;
10- const { isValidStatusCode } = require ( './validation' ) ;
9+ const { EMPTY_BUFFER , kWebSocket , NOOP } = require ( './constants' ) ;
10+ const { isBlob , isValidStatusCode } = require ( './validation' ) ;
1111const { mask : applyMask , toBuffer } = require ( './buffer-util' ) ;
1212
1313const kByteLength = Symbol ( 'kByteLength' ) ;
@@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024;
1616let randomPool ;
1717let randomPoolPointer = RANDOM_POOL_SIZE ;
1818
19+ const DEFAULT = 0 ;
20+ const DEFLATING = 1 ;
21+ const GET_BLOB_DATA = 2 ;
22+
1923/**
2024 * HyBi Sender implementation.
2125 */
@@ -42,8 +46,10 @@ class Sender {
4246 this . _compress = false ;
4347
4448 this . _bufferedBytes = 0 ;
45- this . _deflating = false ;
4649 this . _queue = [ ] ;
50+ this . _state = DEFAULT ;
51+ this . onerror = NOOP ;
52+ this [ kWebSocket ] = undefined ;
4753 }
4854
4955 /**
@@ -210,7 +216,7 @@ class Sender {
210216 rsv1 : false
211217 } ;
212218
213- if ( this . _deflating ) {
219+ if ( this . _state !== DEFAULT ) {
214220 this . enqueue ( [ this . dispatch , buf , false , options , cb ] ) ;
215221 } else {
216222 this . sendFrame ( Sender . frame ( buf , options ) , cb ) ;
@@ -232,6 +238,9 @@ class Sender {
232238 if ( typeof data === 'string' ) {
233239 byteLength = Buffer . byteLength ( data ) ;
234240 readOnly = false ;
241+ } else if ( isBlob ( data ) ) {
242+ byteLength = data . size ;
243+ readOnly = false ;
235244 } else {
236245 data = toBuffer ( data ) ;
237246 byteLength = data . length ;
@@ -253,7 +262,13 @@ class Sender {
253262 rsv1 : false
254263 } ;
255264
256- if ( this . _deflating ) {
265+ if ( isBlob ( data ) ) {
266+ if ( this . _state !== DEFAULT ) {
267+ this . enqueue ( [ this . getBlobData , data , false , options , cb ] ) ;
268+ } else {
269+ this . getBlobData ( data , false , options , cb ) ;
270+ }
271+ } else if ( this . _state !== DEFAULT ) {
257272 this . enqueue ( [ this . dispatch , data , false , options , cb ] ) ;
258273 } else {
259274 this . sendFrame ( Sender . frame ( data , options ) , cb ) ;
@@ -275,6 +290,9 @@ class Sender {
275290 if ( typeof data === 'string' ) {
276291 byteLength = Buffer . byteLength ( data ) ;
277292 readOnly = false ;
293+ } else if ( isBlob ( data ) ) {
294+ byteLength = data . size ;
295+ readOnly = false ;
278296 } else {
279297 data = toBuffer ( data ) ;
280298 byteLength = data . length ;
@@ -296,7 +314,13 @@ class Sender {
296314 rsv1 : false
297315 } ;
298316
299- if ( this . _deflating ) {
317+ if ( isBlob ( data ) ) {
318+ if ( this . _state !== DEFAULT ) {
319+ this . enqueue ( [ this . getBlobData , data , false , options , cb ] ) ;
320+ } else {
321+ this . getBlobData ( data , false , options , cb ) ;
322+ }
323+ } else if ( this . _state !== DEFAULT ) {
300324 this . enqueue ( [ this . dispatch , data , false , options , cb ] ) ;
301325 } else {
302326 this . sendFrame ( Sender . frame ( data , options ) , cb ) ;
@@ -330,6 +354,9 @@ class Sender {
330354 if ( typeof data === 'string' ) {
331355 byteLength = Buffer . byteLength ( data ) ;
332356 readOnly = false ;
357+ } else if ( isBlob ( data ) ) {
358+ byteLength = data . size ;
359+ readOnly = false ;
333360 } else {
334361 data = toBuffer ( data ) ;
335362 byteLength = data . length ;
@@ -357,40 +384,94 @@ class Sender {
357384
358385 if ( options . fin ) this . _firstFragment = true ;
359386
360- if ( perMessageDeflate ) {
361- const opts = {
362- [ kByteLength ] : byteLength ,
363- fin : options . fin ,
364- generateMask : this . _generateMask ,
365- mask : options . mask ,
366- maskBuffer : this . _maskBuffer ,
367- opcode ,
368- readOnly ,
369- rsv1
370- } ;
371-
372- if ( this . _deflating ) {
373- this . enqueue ( [ this . dispatch , data , this . _compress , opts , cb ] ) ;
387+ const opts = {
388+ [ kByteLength ] : byteLength ,
389+ fin : options . fin ,
390+ generateMask : this . _generateMask ,
391+ mask : options . mask ,
392+ maskBuffer : this . _maskBuffer ,
393+ opcode ,
394+ readOnly ,
395+ rsv1
396+ } ;
397+
398+ if ( isBlob ( data ) ) {
399+ if ( this . _state !== DEFAULT ) {
400+ this . enqueue ( [ this . getBlobData , data , this . _compress , opts , cb ] ) ;
374401 } else {
375- this . dispatch ( data , this . _compress , opts , cb ) ;
402+ this . getBlobData ( data , this . _compress , opts , cb ) ;
376403 }
404+ } else if ( this . _state !== DEFAULT ) {
405+ this . enqueue ( [ this . dispatch , data , this . _compress , opts , cb ] ) ;
377406 } else {
378- this . sendFrame (
379- Sender . frame ( data , {
380- [ kByteLength ] : byteLength ,
381- fin : options . fin ,
382- generateMask : this . _generateMask ,
383- mask : options . mask ,
384- maskBuffer : this . _maskBuffer ,
385- opcode,
386- readOnly,
387- rsv1 : false
388- } ) ,
389- cb
390- ) ;
407+ this . dispatch ( data , this . _compress , opts , cb ) ;
391408 }
392409 }
393410
411+ /**
412+ * Gets the contents of a blob as binary data.
413+ *
414+ * @param {Blob } blob The blob
415+ * @param {Boolean } [compress=false] Specifies whether or not to compress
416+ * the data
417+ * @param {Object } options Options object
418+ * @param {Boolean } [options.fin=false] Specifies whether or not to set the
419+ * FIN bit
420+ * @param {Function } [options.generateMask] The function used to generate the
421+ * masking key
422+ * @param {Boolean } [options.mask=false] Specifies whether or not to mask
423+ * `data`
424+ * @param {Buffer } [options.maskBuffer] The buffer used to store the masking
425+ * key
426+ * @param {Number } options.opcode The opcode
427+ * @param {Boolean } [options.readOnly=false] Specifies whether `data` can be
428+ * modified
429+ * @param {Boolean } [options.rsv1=false] Specifies whether or not to set the
430+ * RSV1 bit
431+ * @param {Function } [cb] Callback
432+ * @private
433+ */
434+ getBlobData ( blob , compress , options , cb ) {
435+ this . _bufferedBytes += options [ kByteLength ] ;
436+ this . _state = GET_BLOB_DATA ;
437+
438+ blob
439+ . arrayBuffer ( )
440+ . then ( ( arrayBuffer ) => {
441+ if ( this . _socket . destroyed ) {
442+ const err = new Error (
443+ 'The socket was closed while the blob was being read'
444+ ) ;
445+
446+ //
447+ // `callCallbacks` is called in the next tick to ensure that errors
448+ // that might be thrown in the callbacks behave like errors thrown
449+ // outside the promise chain.
450+ //
451+ process . nextTick ( callCallbacks , this , err , cb ) ;
452+ return ;
453+ }
454+
455+ this . _bufferedBytes -= options [ kByteLength ] ;
456+ const data = toBuffer ( arrayBuffer ) ;
457+
458+ if ( ! compress ) {
459+ this . _state = DEFAULT ;
460+ this . sendFrame ( Sender . frame ( data , options ) , cb ) ;
461+ this . dequeue ( ) ;
462+ } else {
463+ this . dispatch ( data , compress , options , cb ) ;
464+ }
465+ } )
466+ . catch ( ( err ) => {
467+ //
468+ // `onError` is called in the next tick for the same reason that
469+ // `callCallbacks` above is.
470+ //
471+ process . nextTick ( onError , this , err , cb ) ;
472+ } ) ;
473+ }
474+
394475 /**
395476 * Dispatches a message.
396477 *
@@ -423,27 +504,19 @@ class Sender {
423504 const perMessageDeflate = this . _extensions [ PerMessageDeflate . extensionName ] ;
424505
425506 this . _bufferedBytes += options [ kByteLength ] ;
426- this . _deflating = true ;
507+ this . _state = DEFLATING ;
427508 perMessageDeflate . compress ( data , options . fin , ( _ , buf ) => {
428509 if ( this . _socket . destroyed ) {
429510 const err = new Error (
430511 'The socket was closed while data was being compressed'
431512 ) ;
432513
433- if ( typeof cb === 'function' ) cb ( err ) ;
434-
435- for ( let i = 0 ; i < this . _queue . length ; i ++ ) {
436- const params = this . _queue [ i ] ;
437- const callback = params [ params . length - 1 ] ;
438-
439- if ( typeof callback === 'function' ) callback ( err ) ;
440- }
441-
514+ callCallbacks ( this , err , cb ) ;
442515 return ;
443516 }
444517
445518 this . _bufferedBytes -= options [ kByteLength ] ;
446- this . _deflating = false ;
519+ this . _state = DEFAULT ;
447520 options . readOnly = false ;
448521 this . sendFrame ( Sender . frame ( buf , options ) , cb ) ;
449522 this . dequeue ( ) ;
@@ -456,7 +529,7 @@ class Sender {
456529 * @private
457530 */
458531 dequeue ( ) {
459- while ( ! this . _deflating && this . _queue . length ) {
532+ while ( this . _state === DEFAULT && this . _queue . length ) {
460533 const params = this . _queue . shift ( ) ;
461534
462535 this . _bufferedBytes -= params [ 3 ] [ kByteLength ] ;
@@ -495,3 +568,35 @@ class Sender {
495568}
496569
497570module . exports = Sender ;
571+
572+ /**
573+ * Calls queued callbacks with an error.
574+ *
575+ * @param {Sender } sender The `Sender` instance
576+ * @param {Error } err The error to call the callbacks with
577+ * @param {Function } [cb] The first callback
578+ * @private
579+ */
580+ function callCallbacks ( sender , err , cb ) {
581+ if ( typeof cb === 'function' ) cb ( err ) ;
582+
583+ for ( let i = 0 ; i < sender . _queue . length ; i ++ ) {
584+ const params = sender . _queue [ i ] ;
585+ const callback = params [ params . length - 1 ] ;
586+
587+ if ( typeof callback === 'function' ) callback ( err ) ;
588+ }
589+ }
590+
591+ /**
592+ * Handles a `Sender` error.
593+ *
594+ * @param {Sender } sender The `Sender` instance
595+ * @param {Error } err The error
596+ * @param {Function } [cb] The first pending callback
597+ * @private
598+ */
599+ function onError ( sender , err , cb ) {
600+ callCallbacks ( sender , err , cb ) ;
601+ sender . onerror ( err ) ;
602+ }
0 commit comments