@@ -9,6 +9,7 @@ import * as fs from 'fs-extra';
99import { inject , injectable } from 'inversify' ;
1010import * as path from 'path' ;
1111import { Observable } from 'rxjs/Observable' ;
12+ import { Subscriber } from 'rxjs/Subscriber' ;
1213import * as uuid from 'uuid/v4' ;
1314import * as vscode from 'vscode' ;
1415
@@ -148,7 +149,7 @@ export class JupyterServer implements INotebookServer {
148149 output = cells ;
149150 } ,
150151 ( error ) => {
151- deferred . resolve ( output ) ;
152+ deferred . reject ( error ) ;
152153 } ,
153154 ( ) => {
154155 deferred . resolve ( output ) ;
@@ -197,23 +198,34 @@ export class JupyterServer implements INotebookServer {
197198 }
198199
199200 public executeSilently = ( code : string ) : Promise < void > => {
200- // If we have a session, execute the code now.
201- if ( this . session ) {
202- // Generate a new request and wrap it in a promise as we wait for it to finish
203- const request = this . generateRequest ( code , true ) ;
204-
205- return new Promise ( ( resolve , reject ) => {
206- // Just wait for our observable to finish
207- const observable = this . generateExecuteObservable ( code , 'file' , - 1 , '0' , request ) ;
208- // tslint:disable-next-line:no-empty
209- observable . subscribe ( ( ) => {
210- } ,
211- reject ,
212- resolve ) ;
213- } ) ;
214- }
215-
216- return Promise . reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
201+ return new Promise ( ( resolve , reject ) => {
202+ // If we have a session, execute the code now.
203+ if ( this . session ) {
204+ // Generate a new request and resolve when it's done.
205+ const request = this . generateRequest ( code , true ) ;
206+
207+ if ( request ) {
208+
209+ // // For debugging purposes when silently is failing.
210+ // request.onIOPub = (msg: KernelMessage.IIOPubMessage) => {
211+ // try {
212+ // this.logger.logInformation(`Execute silently message ${msg.header.msg_type} : hasData=${'data' in msg.content}`);
213+ // } catch (err) {
214+ // this.logger.logError(err);
215+ // }
216+ // };
217+
218+ request . done . then ( ( ) => {
219+ this . logger . logInformation ( `Execute for ${ code } silently finished.` ) ;
220+ resolve ( ) ;
221+ } ) . catch ( reject ) ;
222+ } else {
223+ reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
224+ }
225+ } else {
226+ reject ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
227+ }
228+ } ) ;
217229 }
218230
219231 public get onStatusChanged ( ) : vscode . Event < boolean > {
@@ -300,7 +312,7 @@ export class JupyterServer implements INotebookServer {
300312 return this . session . kernel . requestExecute (
301313 {
302314 // Replace windows line endings with unix line endings.
303- code : code . replace ( ' \r\n' , '\n' ) ,
315+ code : code . replace ( / \r \n / g , '\n' ) ,
304316 stop_on_error : false ,
305317 allow_stdin : false ,
306318 silent : silent
@@ -422,68 +434,6 @@ export class JupyterServer implements INotebookServer {
422434 } ) ;
423435 }
424436
425- private changeDirectoryObservable = ( file : string ) : Observable < boolean > => {
426- return new Observable < boolean > ( subscriber => {
427- // Execute some code and when its done, finish our subscriber
428- const dir = path . dirname ( file ) ;
429- this . executeSilently ( `%cd "${ dir } "` )
430- . then ( ( ) => {
431- subscriber . next ( true ) ;
432- subscriber . complete ( ) ;
433- } )
434- . catch ( err => subscriber . error ( err ) ) ;
435- } ) ;
436- }
437-
438- private chainObservables < T > ( first : Observable < T > , second : ( ) => Observable < ICell > ) : Observable < ICell > {
439- return new Observable < ICell > ( subscriber => {
440- first . subscribe (
441- ( ) => { return ; } ,
442- ( err ) => subscriber . error ( err ) ,
443- ( ) => {
444- // When the first completes, tell the second to go
445- second ( ) . subscribe ( ( cell : ICell ) => {
446- subscriber . next ( cell ) ;
447- } ,
448- ( err ) => {
449- subscriber . error ( err ) ;
450- } ,
451- ( ) => {
452- subscriber . complete ( ) ;
453- } ) ;
454- }
455- ) ;
456- } ) ;
457- }
458-
459- private executeCodeObservable = ( code : string , file : string , line : number ) : Observable < ICell > => {
460-
461- if ( this . session ) {
462- // Send a magic that changes the current directory if we aren't already sending a magic
463- if ( line >= 0 && fs . existsSync ( file ) ) {
464- return this . chainObservables (
465- this . changeDirectoryObservable ( file ) ,
466- ( ) => this . executeCodeObservableInternal ( code , file , line ) ) ;
467- } else {
468- // We're inside of an execute silently already, don't recurse
469- return this . executeCodeObservableInternal ( code , file , line ) ;
470- }
471- }
472-
473- return new Observable < ICell > ( subscriber => {
474- subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
475- subscriber . complete ( ) ;
476- } ) ;
477- }
478-
479- private executeCodeObservableInternal = ( code : string , file : string , line : number ) : Observable < ICell > => {
480- // Send an execute request with this code
481- const id = uuid ( ) ;
482- const request = this . session ? this . generateRequest ( code , false ) : undefined ;
483-
484- return this . generateExecuteObservable ( code , file , line , id , request ) ;
485- }
486-
487437 private appendLineFeed ( arr : string [ ] , modifier ? : ( s : string ) => string ) {
488438 return arr . map ( ( s : string , i : number ) => {
489439 const out = modifier ? modifier ( s ) : s ;
@@ -514,7 +464,76 @@ export class JupyterServer implements INotebookServer {
514464 } ) ;
515465 }
516466
517- private generateExecuteObservable ( code : string , file : string , line : number , id : string , request : Kernel . IFuture | undefined ) : Observable < ICell > {
467+ private changeDirectoryIfPossible = async ( file : string , line : number ) : Promise < void > => {
468+ if ( line >= 0 && await fs . pathExists ( file ) ) {
469+ const dir = path . dirname ( file ) ;
470+ await this . executeSilently ( `%cd "${ dir } "` ) ;
471+ }
472+ }
473+
474+ private handleCodeRequest = ( subscriber : Subscriber < ICell > , startTime : number , cell : ICell , code : string ) => {
475+ // Generate a new request.
476+ const request = this . generateRequest ( code , false ) ;
477+
478+ // Transition to the busy stage
479+ cell . state = CellState . executing ;
480+
481+ // Listen to the reponse messages and update state as we go
482+ if ( request ) {
483+ request . onIOPub = ( msg : KernelMessage . IIOPubMessage ) => {
484+ try {
485+ if ( KernelMessage . isExecuteResultMsg ( msg ) ) {
486+ this . handleExecuteResult ( msg as KernelMessage . IExecuteResultMsg , cell ) ;
487+ } else if ( KernelMessage . isExecuteInputMsg ( msg ) ) {
488+ this . handleExecuteInput ( msg as KernelMessage . IExecuteInputMsg , cell ) ;
489+ } else if ( KernelMessage . isStatusMsg ( msg ) ) {
490+ this . handleStatusMessage ( msg as KernelMessage . IStatusMsg ) ;
491+ } else if ( KernelMessage . isStreamMsg ( msg ) ) {
492+ this . handleStreamMesssage ( msg as KernelMessage . IStreamMsg , cell ) ;
493+ } else if ( KernelMessage . isDisplayDataMsg ( msg ) ) {
494+ this . handleDisplayData ( msg as KernelMessage . IDisplayDataMsg , cell ) ;
495+ } else if ( KernelMessage . isErrorMsg ( msg ) ) {
496+ this . handleError ( msg as KernelMessage . IErrorMsg , cell ) ;
497+ } else {
498+ this . logger . logWarning ( `Unknown message ${ msg . header . msg_type } : hasData=${ 'data' in msg . content } ` ) ;
499+ }
500+
501+ // Set execution count, all messages should have it
502+ if ( msg . content . execution_count ) {
503+ cell . data . execution_count = msg . content . execution_count as number ;
504+ }
505+
506+ // Show our update if any new output
507+ subscriber . next ( cell ) ;
508+ } catch ( err ) {
509+ // If not a restart error, then tell the subscriber
510+ if ( startTime > this . sessionStartTime ) {
511+ this . logger . logError ( `Error during message ${ msg . header . msg_type } ` ) ;
512+ subscriber . error ( err ) ;
513+ }
514+ }
515+ } ;
516+
517+ // Create completion and error functions so we can bind our cell object
518+ // tslint:disable-next-line:no-any
519+ const completion = ( error ?: any ) => {
520+ cell . state = error ? CellState . error : CellState . finished ;
521+ // Only do this if start time is still valid. Dont log an error to the subscriber. Error
522+ // state should end up in the cell output.
523+ if ( startTime > this . sessionStartTime ) {
524+ subscriber . next ( cell ) ;
525+ }
526+ subscriber . complete ( ) ;
527+ } ;
528+
529+ // When the request finishes we are done
530+ request . done . then ( completion ) . catch ( completion ) ;
531+ } else {
532+ subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
533+ }
534+ }
535+
536+ private executeCodeObservable ( code : string , file : string , line : number ) : Observable < ICell > {
518537 return new Observable < ICell > ( subscriber => {
519538 // Start out empty;
520539 const cell : ICell = {
@@ -525,7 +544,7 @@ export class JupyterServer implements INotebookServer {
525544 metadata : { } ,
526545 execution_count : 0
527546 } ,
528- id : id ,
547+ id : uuid ( ) ,
529548 file : file ,
530549 line : line ,
531550 state : CellState . init
@@ -534,64 +553,20 @@ export class JupyterServer implements INotebookServer {
534553 // Keep track of when we started.
535554 const startTime = Date . now ( ) ;
536555
537- // Tell our listener.
556+ // Tell our listener. NOTE: have to do this asap so that markdown cells don't get
557+ // run before our cells.
538558 subscriber . next ( cell ) ;
539559
540- // Transition to the busy stage
541- cell . state = CellState . executing ;
542-
543- // Listen to the reponse messages and update state as we go
544- if ( request ) {
545- request . onIOPub = ( msg : KernelMessage . IIOPubMessage ) => {
546- try {
547- if ( KernelMessage . isExecuteResultMsg ( msg ) ) {
548- this . handleExecuteResult ( msg as KernelMessage . IExecuteResultMsg , cell ) ;
549- } else if ( KernelMessage . isExecuteInputMsg ( msg ) ) {
550- this . handleExecuteInput ( msg as KernelMessage . IExecuteInputMsg , cell ) ;
551- } else if ( KernelMessage . isStatusMsg ( msg ) ) {
552- this . handleStatusMessage ( msg as KernelMessage . IStatusMsg ) ;
553- } else if ( KernelMessage . isStreamMsg ( msg ) ) {
554- this . handleStreamMesssage ( msg as KernelMessage . IStreamMsg , cell ) ;
555- } else if ( KernelMessage . isDisplayDataMsg ( msg ) ) {
556- this . handleDisplayData ( msg as KernelMessage . IDisplayDataMsg , cell ) ;
557- } else if ( KernelMessage . isErrorMsg ( msg ) ) {
558- this . handleError ( msg as KernelMessage . IErrorMsg , cell ) ;
559- } else {
560- this . logger . logWarning ( `Unknown message ${ msg . header . msg_type } : hasData=${ 'data' in msg . content } ` ) ;
561- }
562-
563- // Set execution count, all messages should have it
564- if ( msg . content . execution_count ) {
565- cell . data . execution_count = msg . content . execution_count as number ;
566- }
567-
568- // Show our update if any new output
569- subscriber . next ( cell ) ;
570- } catch ( err ) {
571- // If not a restart error, then tell the subscriber
572- if ( startTime > this . sessionStartTime ) {
573- this . logger . logError ( `Error during message ${ msg . header . msg_type } ` ) ;
574- subscriber . error ( err ) ;
575- }
576- }
577- } ;
578-
579- // Create completion and error functions so we can bind our cell object
580- const completion = ( error : boolean ) => {
581- cell . state = error ? CellState . error : CellState . finished ;
582- // Only do this if start time is still valid
583- if ( startTime > this . sessionStartTime ) {
584- subscriber . next ( cell ) ;
585- }
586- subscriber . complete ( ) ;
587- } ;
588-
589- // When the request finishes we are done
590- request . done . then ( ( ) => completion ( false ) , ( ) => completion ( true ) ) ;
591- } else {
592- subscriber . error ( new Error ( localize . DataScience . sessionDisposed ( ) ) ) ;
593- }
594-
560+ // Attempt to change to the current directory. When that finishes
561+ // send our real request
562+ this . changeDirectoryIfPossible ( file , line )
563+ . then ( ( ) => {
564+ this . handleCodeRequest ( subscriber , startTime , cell , code ) ;
565+ } )
566+ . catch ( ( ) => {
567+ // Ignore errors if they occur. Just execute normally
568+ this . handleCodeRequest ( subscriber , startTime , cell , code ) ;
569+ } ) ;
595570 } ) ;
596571 }
597572
0 commit comments