Skip to content

Commit 3564b3b

Browse files
committed
feat: initial code commit. Dev draft. Doesn't work yet.
1 parent 46aeb46 commit 3564b3b

File tree

9 files changed

+1567
-0
lines changed

9 files changed

+1567
-0
lines changed

src/base/base-connection.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { EventEmitter } from "eventemitter3";
2+
import { Peer } from "./peer";
3+
import { ServerMessage } from "./servermessage";
4+
import { ConnectionType } from "./enums";
5+
6+
export abstract class BaseConnection extends EventEmitter {
7+
protected _open = false;
8+
9+
readonly metadata: any;
10+
connectionId: string;
11+
12+
peerConnection: RTCPeerConnection;
13+
14+
abstract get type(): ConnectionType;
15+
16+
get open() {
17+
return this._open;
18+
}
19+
20+
constructor(
21+
readonly peer: string,
22+
public provider: Peer,
23+
readonly options: any
24+
) {
25+
super();
26+
27+
this.metadata = options.metadata;
28+
}
29+
30+
abstract close(): void;
31+
32+
abstract handleMessage(message: ServerMessage): void;
33+
}

src/base/data-connection.py

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
import { util } from "./util";
2+
import logger from "./logger";
3+
import { Negotiator } from "./negotiator";
4+
import {
5+
ConnectionType,
6+
ConnectionEventType,
7+
SerializationType,
8+
ServerMessageType
9+
} from "./enums";
10+
import { Peer } from "./peer";
11+
import { BaseConnection } from "./baseconnection";
12+
import { ServerMessage } from "./servermessage";
13+
import { EncodingQueue } from './encodingQueue';
14+
import { DataConnection as IDataConnection } from '../index';
15+
16+
/**
17+
* Wraps a DataChannel between two Peers.
18+
*/
19+
export class DataConnection extends BaseConnection implements IDataConnection {
20+
private static readonly ID_PREFIX = "dc_";
21+
private static readonly MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024;
22+
23+
private _negotiator: Negotiator;
24+
readonly label: string;
25+
readonly serialization: SerializationType;
26+
readonly reliable: boolean;
27+
stringify: (data: any) => string = JSON.stringify;
28+
parse: (data: string) => any = JSON.parse;
29+
30+
get type() {
31+
return ConnectionType.Data;
32+
}
33+
34+
private _buffer: any[] = [];
35+
private _bufferSize = 0;
36+
private _buffering = false;
37+
private _chunkedData: {
38+
[id: number]: {
39+
data: Blob[],
40+
count: number,
41+
total: number
42+
}
43+
} = {};
44+
45+
private _dc: RTCDataChannel;
46+
private _encodingQueue = new EncodingQueue();
47+
48+
get dataChannel(): RTCDataChannel {
49+
return this._dc;
50+
}
51+
52+
get bufferSize(): number { return this._bufferSize; }
53+
54+
constructor(peerId: string, provider: Peer, options: any) {
55+
super(peerId, provider, options);
56+
57+
this.connectionId =
58+
this.options.connectionId || DataConnection.ID_PREFIX + util.randomToken();
59+
60+
this.label = this.options.label || this.connectionId;
61+
this.serialization = this.options.serialization || SerializationType.Binary;
62+
this.reliable = !!this.options.reliable;
63+
64+
this._encodingQueue.on('done', (ab: ArrayBuffer) => {
65+
this._bufferedSend(ab);
66+
});
67+
68+
this._encodingQueue.on('error', () => {
69+
logger.error(`DC#${this.connectionId}: Error occured in encoding from blob to arraybuffer, close DC`);
70+
this.close();
71+
});
72+
73+
this._negotiator = new Negotiator(this);
74+
75+
this._negotiator.startConnection(
76+
this.options._payload || {
77+
originator: true
78+
}
79+
);
80+
}
81+
82+
/** Called by the Negotiator when the DataChannel is ready. */
83+
initialize(dc: RTCDataChannel): void {
84+
this._dc = dc;
85+
this._configureDataChannel();
86+
}
87+
88+
private _configureDataChannel(): void {
89+
if (!util.supports.binaryBlob || util.supports.reliable) {
90+
this.dataChannel.binaryType = "arraybuffer";
91+
}
92+
93+
this.dataChannel.onopen = () => {
94+
logger.log(`DC#${this.connectionId} dc connection success`);
95+
this._open = true;
96+
this.emit(ConnectionEventType.Open);
97+
};
98+
99+
this.dataChannel.onmessage = (e) => {
100+
logger.log(`DC#${this.connectionId} dc onmessage:`, e.data);
101+
this._handleDataMessage(e);
102+
};
103+
104+
this.dataChannel.onclose = () => {
105+
logger.log(`DC#${this.connectionId} dc closed for:`, this.peer);
106+
this.close();
107+
};
108+
}
109+
110+
// Handles a DataChannel message.
111+
private _handleDataMessage({ data }: { data: Blob | ArrayBuffer | string }): void {
112+
const datatype = data.constructor;
113+
114+
const isBinarySerialization = this.serialization === SerializationType.Binary ||
115+
this.serialization === SerializationType.BinaryUTF8;
116+
117+
let deserializedData: any = data;
118+
119+
if (isBinarySerialization) {
120+
if (datatype === Blob) {
121+
// Datatype should never be blob
122+
util.blobToArrayBuffer(data as Blob, (ab) => {
123+
const unpackedData = util.unpack(ab);
124+
this.emit(ConnectionEventType.Data, unpackedData);
125+
});
126+
return;
127+
} else if (datatype === ArrayBuffer) {
128+
deserializedData = util.unpack(data as ArrayBuffer);
129+
} else if (datatype === String) {
130+
// String fallback for binary data for browsers that don't support binary yet
131+
const ab = util.binaryStringToArrayBuffer(data as string);
132+
deserializedData = util.unpack(ab);
133+
}
134+
} else if (this.serialization === SerializationType.JSON) {
135+
deserializedData = this.parse(data as string);
136+
}
137+
138+
// Check if we've chunked--if so, piece things back together.
139+
// We're guaranteed that this isn't 0.
140+
if (deserializedData.__peerData) {
141+
this._handleChunk(deserializedData);
142+
return;
143+
}
144+
145+
super.emit(ConnectionEventType.Data, deserializedData);
146+
}
147+
148+
private _handleChunk(data: { __peerData: number, n: number, total: number, data: Blob }): void {
149+
const id = data.__peerData;
150+
const chunkInfo = this._chunkedData[id] || {
151+
data: [],
152+
count: 0,
153+
total: data.total
154+
};
155+
156+
chunkInfo.data[data.n] = data.data;
157+
chunkInfo.count++;
158+
this._chunkedData[id] = chunkInfo;
159+
160+
if (chunkInfo.total === chunkInfo.count) {
161+
// Clean up before making the recursive call to `_handleDataMessage`.
162+
delete this._chunkedData[id];
163+
164+
// We've received all the chunks--time to construct the complete data.
165+
const data = new Blob(chunkInfo.data);
166+
this._handleDataMessage({ data });
167+
}
168+
}
169+
170+
/**
171+
* Exposed functionality for users.
172+
*/
173+
174+
/** Allows user to close connection. */
175+
close(): void {
176+
this._buffer = [];
177+
this._bufferSize = 0;
178+
this._chunkedData = {};
179+
180+
if (this._negotiator) {
181+
this._negotiator.cleanup();
182+
this._negotiator = null;
183+
}
184+
185+
if (this.provider) {
186+
this.provider._removeConnection(this);
187+
188+
this.provider = null;
189+
}
190+
191+
if (this.dataChannel) {
192+
this.dataChannel.onopen = null;
193+
this.dataChannel.onmessage = null;
194+
this.dataChannel.onclose = null;
195+
this._dc = null;
196+
}
197+
198+
if (this._encodingQueue) {
199+
this._encodingQueue.destroy();
200+
this._encodingQueue.removeAllListeners();
201+
this._encodingQueue = null;
202+
}
203+
204+
if (!this.open) {
205+
return;
206+
}
207+
208+
this._open = false;
209+
210+
super.emit(ConnectionEventType.Close);
211+
}
212+
213+
/** Allows user to send data. */
214+
send(data: any, chunked?: boolean): void {
215+
if (!this.open) {
216+
super.emit(
217+
ConnectionEventType.Error,
218+
new Error(
219+
"Connection is not open. You should listen for the `open` event before sending messages."
220+
)
221+
);
222+
return;
223+
}
224+
225+
if (this.serialization === SerializationType.JSON) {
226+
this._bufferedSend(this.stringify(data));
227+
} else if (
228+
this.serialization === SerializationType.Binary ||
229+
this.serialization === SerializationType.BinaryUTF8
230+
) {
231+
const blob = util.pack(data);
232+
233+
if (!chunked && blob.size > util.chunkedMTU) {
234+
this._sendChunks(blob);
235+
return;
236+
}
237+
238+
if (!util.supports.binaryBlob) {
239+
// We only do this if we really need to (e.g. blobs are not supported),
240+
// because this conversion is costly.
241+
this._encodingQueue.enque(blob);
242+
} else {
243+
this._bufferedSend(blob);
244+
}
245+
} else {
246+
this._bufferedSend(data);
247+
}
248+
}
249+
250+
private _bufferedSend(msg: any): void {
251+
if (this._buffering || !this._trySend(msg)) {
252+
this._buffer.push(msg);
253+
this._bufferSize = this._buffer.length;
254+
}
255+
}
256+
257+
// Returns true if the send succeeds.
258+
private _trySend(msg: any): boolean {
259+
if (!this.open) {
260+
return false;
261+
}
262+
263+
if (this.dataChannel.bufferedAmount > DataConnection.MAX_BUFFERED_AMOUNT) {
264+
this._buffering = true;
265+
setTimeout(() => {
266+
this._buffering = false;
267+
this._tryBuffer();
268+
}, 50);
269+
270+
return false;
271+
}
272+
273+
try {
274+
this.dataChannel.send(msg);
275+
} catch (e) {
276+
logger.error(`DC#:${this.connectionId} Error when sending:`, e);
277+
this._buffering = true;
278+
279+
this.close();
280+
281+
return false;
282+
}
283+
284+
return true;
285+
}
286+
287+
// Try to send the first message in the buffer.
288+
private _tryBuffer(): void {
289+
if (!this.open) {
290+
return;
291+
}
292+
293+
if (this._buffer.length === 0) {
294+
return;
295+
}
296+
297+
const msg = this._buffer[0];
298+
299+
if (this._trySend(msg)) {
300+
this._buffer.shift();
301+
this._bufferSize = this._buffer.length;
302+
this._tryBuffer();
303+
}
304+
}
305+
306+
private _sendChunks(blob: Blob): void {
307+
const blobs = util.chunk(blob);
308+
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
309+
310+
for (let blob of blobs) {
311+
this.send(blob, true);
312+
}
313+
}
314+
315+
handleMessage(message: ServerMessage): void {
316+
const payload = message.payload;
317+
318+
switch (message.type) {
319+
case ServerMessageType.Answer:
320+
this._negotiator.handleSDP(message.type, payload.sdp);
321+
break;
322+
case ServerMessageType.Candidate:
323+
this._negotiator.handleCandidate(payload.candidate);
324+
break;
325+
default:
326+
logger.warn(
327+
"Unrecognized message type:",
328+
message.type,
329+
"from peer:",
330+
this.peer
331+
);
332+
break;
333+
}
334+
}
335+
}

0 commit comments

Comments
 (0)