Skip to content

Commit 55ac090

Browse files
committed
Basic idea building
1 parent b264dbb commit 55ac090

File tree

12 files changed

+344
-850
lines changed

12 files changed

+344
-850
lines changed

src/client/datascience/baseJupyterSession.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { Telemetry } from './constants';
1919
import { JupyterKernelPromiseFailedError } from './jupyter/kernels/jupyterKernelPromiseFailedError';
2020
import { KernelSelector } from './jupyter/kernels/kernelSelector';
2121
import { LiveKernelModel } from './jupyter/kernels/types';
22-
import { IJupyterKernelSpec, IJupyterSession, IKernelSession, KernelSocketInformation } from './types';
22+
import { IJupyterKernelSpec, IJupyterSession, ISessionWithSocket, KernelSocketInformation } from './types';
2323

2424
/**
2525
* Exception raised when starting a Jupyter Session fails.
@@ -37,10 +37,10 @@ export class JupyterSessionStartError extends Error {
3737
}
3838

3939
export abstract class BaseJupyterSession implements IJupyterSession {
40-
protected get session(): IKernelSession | undefined {
40+
protected get session(): ISessionWithSocket | undefined {
4141
return this._session;
4242
}
43-
protected set session(session: IKernelSession | undefined) {
43+
protected set session(session: ISessionWithSocket | undefined) {
4444
const oldSession = this._session;
4545
this._session = session;
4646

@@ -89,10 +89,10 @@ export abstract class BaseJupyterSession implements IJupyterSession {
8989
return this.connected;
9090
}
9191
protected onStatusChangedEvent: EventEmitter<ServerStatus> = new EventEmitter<ServerStatus>();
92-
protected statusHandler: Slot<IKernelSession, Kernel.Status>;
92+
protected statusHandler: Slot<ISessionWithSocket, Kernel.Status>;
9393
protected connected: boolean = false;
94-
protected restartSessionPromise: Promise<IKernelSession | undefined> | undefined;
95-
private _session: IKernelSession | undefined;
94+
protected restartSessionPromise: Promise<ISessionWithSocket | undefined> | undefined;
95+
private _session: ISessionWithSocket | undefined;
9696
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>();
9797
private _jupyterLab?: typeof import('@jupyterlab/services');
9898

@@ -141,7 +141,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
141141
}
142142

143143
public async changeKernel(kernel: IJupyterKernelSpec | LiveKernelModel, timeoutMS: number): Promise<void> {
144-
let newSession: IKernelSession | undefined;
144+
let newSession: ISessionWithSocket | undefined;
145145

146146
// If we are already using this kernel in an active session just return back
147147
if (this.kernelSpec?.name === kernel.name && this.session) {
@@ -319,19 +319,19 @@ export abstract class BaseJupyterSession implements IJupyterSession {
319319
protected abstract startRestartSession(): void;
320320
protected abstract async createRestartSession(
321321
kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined,
322-
session: IKernelSession,
322+
session: ISessionWithSocket,
323323
cancelToken?: CancellationToken
324-
): Promise<IKernelSession>;
324+
): Promise<ISessionWithSocket>;
325325

326326
// Sub classes need to implement their own kernel change specific code
327327
protected abstract createNewKernelSession(
328328
kernel: IJupyterKernelSpec | LiveKernelModel,
329329
timeoutMS: number
330-
): Promise<IKernelSession>;
330+
): Promise<ISessionWithSocket>;
331331

332332
protected async shutdownSession(
333-
session: IKernelSession | undefined,
334-
statusHandler: Slot<IKernelSession, Kernel.Status> | undefined
333+
session: ISessionWithSocket | undefined,
334+
statusHandler: Slot<ISessionWithSocket, Kernel.Status> | undefined
335335
): Promise<void> {
336336
if (session && session.kernel) {
337337
const kernelId = session.kernel.id;

src/client/datascience/jupyter/jupyterSession.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { BaseJupyterSession, JupyterSessionStartError } from '../baseJupyterSess
2626
import { Identifiers, Telemetry } from '../constants';
2727
import { reportAction } from '../progress/decorator';
2828
import { ReportableAction } from '../progress/types';
29-
import { IJupyterConnection, IJupyterKernelSpec, IKernelSession } from '../types';
29+
import { IJupyterConnection, IJupyterKernelSpec, ISessionWithSocket } from '../types';
3030
import { JupyterInvalidKernelError } from './jupyterInvalidKernelError';
3131
import { JupyterWaitForIdleError } from './jupyterWaitForIdleError';
3232
import { JupyterWebSockets } from './jupyterWebSocket';
@@ -107,8 +107,8 @@ export class JupyterSession extends BaseJupyterSession {
107107
public async createNewKernelSession(
108108
kernel: IJupyterKernelSpec | LiveKernelModel,
109109
timeoutMS: number
110-
): Promise<IKernelSession> {
111-
let newSession: IKernelSession | undefined;
110+
): Promise<ISessionWithSocket> {
111+
let newSession: ISessionWithSocket | undefined;
112112

113113
try {
114114
// Don't immediately assume this kernel is valid. Try creating a session with it first.
@@ -144,15 +144,15 @@ export class JupyterSession extends BaseJupyterSession {
144144

145145
protected async createRestartSession(
146146
kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined,
147-
session: IKernelSession,
147+
session: ISessionWithSocket,
148148
cancelToken?: CancellationToken
149-
): Promise<IKernelSession> {
149+
): Promise<ISessionWithSocket> {
150150
// We need all of the above to create a restart session
151151
if (!session || !this.contentsManager || !this.sessionManager) {
152152
throw new Error(localize.DataScience.sessionDisposed());
153153
}
154154

155-
let result: IKernelSession | undefined;
155+
let result: ISessionWithSocket | undefined;
156156
let tryCount = 0;
157157
// tslint:disable-next-line: no-any
158158
let exception: any;
@@ -187,7 +187,7 @@ export class JupyterSession extends BaseJupyterSession {
187187
}
188188

189189
@captureTelemetry(Telemetry.WaitForIdleJupyter, undefined, true)
190-
private async waitForIdleOnSession(session: IKernelSession | undefined, timeout: number): Promise<void> {
190+
private async waitForIdleOnSession(session: ISessionWithSocket | undefined, timeout: number): Promise<void> {
191191
if (session && session.kernel) {
192192
traceInfo(`Waiting for idle on (kernel): ${session.kernel.id} -> ${session.kernel.status}`);
193193
// tslint:disable-next-line: no-any
@@ -209,14 +209,14 @@ export class JupyterSession extends BaseJupyterSession {
209209
}
210210
};
211211

212-
let statusChangeHandler: Slot<IKernelSession, Kernel.Status> | undefined;
212+
let statusChangeHandler: Slot<ISessionWithSocket, Kernel.Status> | undefined;
213213
const kernelStatusChangedPromise = new Promise((resolve, reject) => {
214-
statusChangeHandler = (_: IKernelSession, e: Kernel.Status) => statusHandler(resolve, reject, e);
214+
statusChangeHandler = (_: ISessionWithSocket, e: Kernel.Status) => statusHandler(resolve, reject, e);
215215
session.statusChanged.connect(statusChangeHandler);
216216
});
217-
let kernelChangedHandler: Slot<IKernelSession, Session.IKernelChangedArgs> | undefined;
217+
let kernelChangedHandler: Slot<ISessionWithSocket, Session.IKernelChangedArgs> | undefined;
218218
const statusChangedPromise = new Promise((resolve, reject) => {
219-
kernelChangedHandler = (_: IKernelSession, e: Session.IKernelChangedArgs) =>
219+
kernelChangedHandler = (_: ISessionWithSocket, e: Session.IKernelChangedArgs) =>
220220
statusHandler(resolve, reject, e.newValue?.status);
221221
session.kernelChanged.connect(kernelChangedHandler);
222222
});
@@ -265,7 +265,7 @@ export class JupyterSession extends BaseJupyterSession {
265265
kernelSpec: IJupyterKernelSpec | LiveKernelModel | undefined,
266266
contentsManager: ContentsManager,
267267
cancelToken?: CancellationToken
268-
): Promise<IKernelSession> {
268+
): Promise<ISessionWithSocket> {
269269
// Create a temporary notebook for this session.
270270
this.notebookFiles.push(await contentsManager.newUntitled({ type: 'notebook' }));
271271

src/client/datascience/jupyter/jupyterWebSocket.ts

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
'use strict';
44
import * as WebSocketWS from 'ws';
55
import { traceError } from '../../common/logger';
6+
import { KernelSocketWrapper } from '../kernelSocketWrapper';
67
import { IKernelSocket } from '../types';
78

89
// tslint:disable: no-any
@@ -11,12 +12,8 @@ export const JupyterWebSockets = new Map<string, WebSocketWS & IKernelSocket>();
1112
// We need to override the websocket that jupyter lab services uses to put in our cookie information
1213
// Do this as a function so that we can pass in variables the the socket will have local access to
1314
export function createJupyterWebSocket(cookieString?: string, allowUnauthorized?: boolean) {
14-
class JupyterWebSocket extends WebSocketWS {
15+
class JupyterWebSocket extends KernelSocketWrapper(WebSocketWS) {
1516
private kernelId: string | undefined;
16-
private receiveHooks: ((data: WebSocketWS.Data) => Promise<void>)[];
17-
private sendHooks: ((data: any, cb?: (err?: Error) => void) => Promise<void>)[];
18-
private msgChain: Promise<any>;
19-
private sendChain: Promise<any>;
2017

2118
constructor(url: string, protocols?: string | string[] | undefined) {
2219
let co: WebSocketWS.ClientOptions = {};
@@ -36,12 +33,8 @@ export function createJupyterWebSocket(cookieString?: string, allowUnauthorized?
3633

3734
super(url, protocols, co);
3835

39-
// Make sure the message chain is initialized
40-
this.msgChain = Promise.resolve();
41-
this.sendChain = Promise.resolve();
42-
4336
// Parse the url for the kernel id
44-
const parsed = /.*\/kernels\/(.*)\/.*/.exec(this.url);
37+
const parsed = /.*\/kernels\/(.*)\/.*/.exec(url);
4538
if (parsed && parsed.length > 1) {
4639
this.kernelId = parsed[1];
4740
}
@@ -53,57 +46,6 @@ export function createJupyterWebSocket(cookieString?: string, allowUnauthorized?
5346
} else {
5447
traceError('KernelId not extracted from Kernel WebSocket URL');
5548
}
56-
this.receiveHooks = [];
57-
this.sendHooks = [];
58-
}
59-
60-
public send(data: any, a2: any): void {
61-
if (this.sendHooks) {
62-
// Stick the send hooks into the send chain. We use chain
63-
// to ensure that:
64-
// a) Hooks finish before we fire the event for real
65-
// b) Event fires
66-
// c) Next message happens after this one (so the UI can handle the message before another event goes through)
67-
this.sendChain = this.sendChain
68-
.then(() => Promise.all(this.sendHooks.map((s) => s(data, a2))))
69-
.then(() => super.send(data, a2));
70-
} else {
71-
super.send(data, a2);
72-
}
73-
}
74-
75-
public emit(event: string | symbol, ...args: any[]): boolean {
76-
if (event === 'message' && this.receiveHooks.length) {
77-
// Stick the receive hooks into the message chain. We use chain
78-
// to ensure that:
79-
// a) Hooks finish before we fire the event for real
80-
// b) Event fires
81-
// c) Next message happens after this one (so this side can handle the message before another event goes through)
82-
this.msgChain = this.msgChain
83-
.then(() => Promise.all(this.receiveHooks.map((p) => p(args[0]))))
84-
.then(() => super.emit(event, ...args));
85-
// True value indicates there were handlers. We definitely have 'message' handlers.
86-
return true;
87-
} else {
88-
return super.emit(event, ...args);
89-
}
90-
}
91-
92-
public addReceiveHook(hook: (data: WebSocketWS.Data) => Promise<void>) {
93-
this.receiveHooks.push(hook);
94-
}
95-
public removeReceiveHook(hook: (data: WebSocketWS.Data) => Promise<void>) {
96-
this.receiveHooks = this.receiveHooks.filter((l) => l !== hook);
97-
}
98-
99-
// tslint:disable-next-line: no-any
100-
public addSendHook(patch: (data: any, cb?: (err?: Error) => void) => Promise<void>): void {
101-
this.sendHooks.push(patch);
102-
}
103-
104-
// tslint:disable-next-line: no-any
105-
public removeSendHook(patch: (data: any, cb?: (err?: Error) => void) => Promise<void>): void {
106-
this.sendHooks = this.sendHooks.filter((p) => p !== patch);
10749
}
10850
}
10951
return JupyterWebSocket;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
import * as WebSocketWS from 'ws';
4+
import { ClassType } from '../ioc/types';
5+
import { IKernelSocket } from './types';
6+
7+
// tslint:disable: no-any prefer-method-signature
8+
export type IWebSocketLike = {
9+
onopen: (event: { target: any }) => void;
10+
onerror: (event: { error: any; message: string; type: string; target: any }) => void;
11+
onclose: (event: { wasClean: boolean; code: number; reason: string; target: any }) => void;
12+
onmessage: (event: { data: WebSocketWS.Data; type: string; target: any }) => void;
13+
emit(event: string | symbol, ...args: any[]): boolean;
14+
send(data: any, a2: any): void;
15+
};
16+
17+
/**
18+
* This is called a mixin class in TypeScript.
19+
* Allows us to have different base classes but inherit behavior (workaround for not allowing multiple inheritance).
20+
* Essentially it sticks a temp class in between the base class and the class you're writing.
21+
* Something like this:
22+
*
23+
* class Base {
24+
* doStuff() {
25+
*
26+
* }
27+
* }
28+
*
29+
* function Mixin = (SuperClass) {
30+
* return class extends SuperClass {
31+
* doExtraStuff() {
32+
* super.doStuff();
33+
* }
34+
* }
35+
* }
36+
*
37+
* function SubClass extends Mixin(Base) {
38+
* doBar() : {
39+
* super.doExtraStuff();
40+
* }
41+
* }
42+
*
43+
*/
44+
45+
export function KernelSocketWrapper<T extends ClassType<IWebSocketLike>>(SuperClass: T) {
46+
return class BaseKernelSocket extends SuperClass implements IKernelSocket {
47+
private receiveHooks: ((data: WebSocketWS.Data) => Promise<void>)[];
48+
private sendHooks: ((data: any, cb?: (err?: Error) => void) => Promise<void>)[];
49+
private msgChain: Promise<any>;
50+
private sendChain: Promise<any>;
51+
52+
constructor(...rest: any[]) {
53+
super(...rest);
54+
// Make sure the message chain is initialized
55+
this.msgChain = Promise.resolve();
56+
this.sendChain = Promise.resolve();
57+
this.receiveHooks = [];
58+
this.sendHooks = [];
59+
}
60+
61+
public send(data: any, a2: any): void {
62+
if (this.sendHooks) {
63+
// Stick the send hooks into the send chain. We use chain
64+
// to ensure that:
65+
// a) Hooks finish before we fire the event for real
66+
// b) Event fires
67+
// c) Next message happens after this one (so the UI can handle the message before another event goes through)
68+
this.sendChain = this.sendChain
69+
.then(() => Promise.all(this.sendHooks.map((s) => s(data, a2))))
70+
.then(() => super.send(data, a2));
71+
} else {
72+
super.send(data, a2);
73+
}
74+
}
75+
76+
public emit(event: string | symbol, ...args: any[]): boolean {
77+
if (event === 'message' && this.receiveHooks.length) {
78+
// Stick the receive hooks into the message chain. We use chain
79+
// to ensure that:
80+
// a) Hooks finish before we fire the event for real
81+
// b) Event fires
82+
// c) Next message happens after this one (so this side can handle the message before another event goes through)
83+
this.msgChain = this.msgChain
84+
.then(() => Promise.all(this.receiveHooks.map((p) => p(args[0]))))
85+
.then(() => super.emit(event, ...args));
86+
// True value indicates there were handlers. We definitely have 'message' handlers.
87+
return true;
88+
} else {
89+
return super.emit(event, ...args);
90+
}
91+
}
92+
93+
public addReceiveHook(hook: (data: WebSocketWS.Data) => Promise<void>) {
94+
this.receiveHooks.push(hook);
95+
}
96+
public removeReceiveHook(hook: (data: WebSocketWS.Data) => Promise<void>) {
97+
this.receiveHooks = this.receiveHooks.filter((l) => l !== hook);
98+
}
99+
100+
// tslint:disable-next-line: no-any
101+
public addSendHook(patch: (data: any, cb?: (err?: Error) => void) => Promise<void>): void {
102+
this.sendHooks.push(patch);
103+
}
104+
105+
// tslint:disable-next-line: no-any
106+
public removeSendHook(patch: (data: any, cb?: (err?: Error) => void) => Promise<void>): void {
107+
this.sendHooks = this.sendHooks.filter((p) => p !== patch);
108+
}
109+
};
110+
}

0 commit comments

Comments
 (0)