Skip to content

Commit ffe854a

Browse files
committed
fix: various fixes of port errors
1 parent 87dcc2e commit ffe854a

File tree

5 files changed

+148
-131
lines changed

5 files changed

+148
-131
lines changed

run-dev.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ SCRIPTPATH=$(dirname "$SCRIPT")
55
echo $SCRIPTPATH
66

77
set -ex
8-
pip3 install --editable src
8+
# pip3 install --editable src
99
python3 $SCRIPTPATH/cli.py

src/peerjs/dataconnection.py

Lines changed: 113 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Convenience wrapper around RTCDataChannel."""
2-
from util import util
2+
from .util import util
33
import logging
44
from negotiator import Negotiator
55
from enums import \
@@ -12,38 +12,45 @@
1212
from .encodingqueue import EncodingQueue
1313
import json
1414
from aiortc import RTCDataChannel
15+
import asyncio
16+
from .peer import Peer
1517

1618
log = logging.getLogger(__name__)
1719

1820

1921
class DataConnection(BaseConnection):
20-
"""Wraps a DataChannel between two Peers."""
22+
"""Wrap a DataChannel between two Peers."""
23+
2124
ID_PREFIX = "dc_"
2225
MAX_BUFFERED_AMOUNT = 8 * 1024 * 1024
2326

2427
@property
2528
def type():
29+
"""Return ConnectionType.Data."""
2630
return ConnectionType.Data
2731

2832
@property
29-
def dataChannel() -> RTCDataChannel:
33+
def dataChannel(self) -> RTCDataChannel:
34+
"""Return the RTCDataChannel of this data connection."""
3035
return self._dc
3136

32-
def bufferSize() -> int:
37+
def bufferSize(self) -> int:
38+
"""Return current data buffer size."""
3339
return self._bufferSize
3440

35-
def __init__(self, peerId: str, provider: Peer, **options):
41+
def __init__(self, peerId: str = None, provider: Peer = None, **options):
42+
"""Create a DataConnection instance."""
3643
super(peerId, provider, options)
37-
_negotiator: Negotiator
38-
label: str
39-
serialization: SerializationType
40-
reliable: boolean
41-
stringify: lambda data: json.dumps(data)
42-
parse: lambda jsn: json.loads(jsn)
43-
_buffer: []
44-
_bufferSize = 0
45-
_buffering = False
46-
_chunkedData = {}
44+
self._negotiator: Negotiator = None
45+
self.label: str = None
46+
self.serialization: SerializationType = None
47+
self.reliable: bool = None
48+
self.stringify = lambda data: json.dumps(data)
49+
self.parse = lambda jsn: json.loads(jsn)
50+
self._buffer: []
51+
self._bufferSize = 0
52+
self._buffering = False
53+
self._chunkedData = {}
4754
# {
4855
# [id: number]: {
4956
# data: Blob[],
@@ -52,94 +59,99 @@ def __init__(self, peerId: str, provider: Peer, **options):
5259
# }
5360
# }
5461

55-
_dc: RTCDataChannel
56-
_encodingQueue = EncodingQueue()
57-
if self.options.connectionId:
58-
self.connectionId = self.options.connectionId
62+
self._dc: RTCDataChannel
63+
self._encodingQueue = EncodingQueue()
64+
if options.connectionId:
65+
self.connectionId = options.connectionId
5966
else:
6067
self.connectionId = DataConnection.ID_PREFIX + util.randomToken()
61-
if self.options.label:
62-
self.label = this.options.label
68+
if options.label:
69+
self.label = options.label
6370
else:
6471
self.label = self.connectionId
65-
self.serialization = self.options.serialization if \
66-
self.options.serialization else SerializationType.Binary
72+
self.serialization = options.serialization if \
73+
options.serialization else SerializationType.Binary
6774
self.reliable = self.options.reliable
6875
@self._encodingQueue.on('done')
69-
def on_eq_done(ab: ArrayBuffer):
76+
def on_eq_done(ab): # ab : ArrayBuffer
7077
self._bufferedSend(ab)
7178

7279
@self._encodingQueue.on('error')
7380
def on_eq_error():
74-
log.error(f'DC#${this.connectionId}: '
81+
log.error(f'DC#${self.connectionId}: '
7582
'Error occured in encoding from blob to arraybuffer, '
7683
'closing Data Connection.')
7784
self.close()
7885
self._negotiator = Negotiator(self)
79-
payload_option = self.options._payload or {'originator': true}
80-
self._negotiator.startConnection()
86+
payload_option = options._payload or {'originator': True}
87+
self._negotiator.startConnection(payload_option)
8188

8289
def initialize(self, dc: RTCDataChannel) -> None:
83-
"""Called by the Negotiator when the DataChannel is ready."""
90+
"""Configure datachannel when available.
91+
92+
Called by the Negotiator when the DataChannel is ready.
93+
"""
8494
self._dc = dc
8595
self._configureDataChannel()
8696

8797
def _configureDataChannel(self) -> None:
8898
if not util.supports.binaryBlob or util.supports.reliable:
89-
this.dataChannel.binaryType = "arraybuffer"
99+
self.dataChannel.binaryType = "arraybuffer"
90100

91101
@self.dataChannel.on('open')
92-
def on_datachannel_open():
93-
log.debug(f'DC#${this.connectionId} dc connection success')
102+
async def on_datachannel_open():
103+
log.debug(f'DC#${self.connectionId} dc connection success')
94104
self._open = True
95105
self.emit(ConnectionEventType.Open)
96106

97107
@self.dataChannel.on('message')
98-
def on_datachannel_message(e):
108+
async def on_datachannel_message(e):
99109
log.debug(f'DC#${self.connectionId} dc onmessage: {e.data}')
100-
self._handleDataMessage(e)
110+
await self._handleDataMessage(e)
101111

102112
@self.dataChannel.on('close')
103-
def on_datachannel_close(e):
113+
async def on_datachannel_close(e):
104114
log.debug(f'DC#${self.connectionId} dc closed for: {self.peer}')
105115
self.close()
106116

107-
def _handleDataMessage(self, data) -> None:
108-
"""Handles a DataChannel message."""
109-
datatype = data.constructor
110-
111-
isBinarySerialization = \
112-
self.serialization == SerializationType.Binary or \
113-
self.serialization == SerializationType.BinaryUTF8
114-
117+
async def _handleDataMessage(self, data) -> None:
118+
"""Handle a DataChannel message."""
119+
# datatype = data.constructor
120+
# isBinarySerialization = \
121+
# self.serialization == SerializationType.Binary or \
122+
# self.serialization == SerializationType.BinaryUTF8
115123
deserializedData = data
116-
117-
if isBinarySerialization:
118-
if datatype == Blob:
119-
# Datatype should never be blob
120-
ab = await util.blobToArrayBuffer(data)
121-
unpackedData = util.unpack(ab)
122-
self.emit(ConnectionEventType.Data, unpackedData)
123-
return
124-
if datatype == ArrayBuffer:
125-
deserializedData = util.unpack(data)
126-
elif datatype == String:
127-
# String fallback for binary data for browsers
128-
# that don't support binary yet
129-
ab = util.binaryStringToArrayBuffer(data)
130-
deserializedData = util.unpack(ab)
124+
# Peerjs JavaScript version uses a messagepack library
125+
# which is not ported to Python yet
126+
# if isBinarySerialization:
127+
# if datatype == Blob:
128+
# # Datatype should never be blob
129+
# ab = await util.blobToArrayBuffer(data)
130+
# unpackedData = util.unpack(ab)
131+
# self.emit(ConnectionEventType.Data, unpackedData)
132+
# return
133+
# if datatype == ArrayBuffer:
134+
# deserializedData = util.unpack(data)
135+
# elif datatype == String:
136+
# # String fallback for binary data for browsers
137+
# # that don't support binary yet
138+
# ab = util.binaryStringToArrayBuffer(data)
139+
# deserializedData = util.unpack(ab)
140+
if self.serialization == SerializationType.Raw:
141+
# no special massaging of deserialized data
142+
pass
131143
elif self.serialization == SerializationType.JSON:
132144
deserializedData = self.parse(data)
133145

134146
# Check if we've chunked--if so, piece things back together.
135147
# We're guaranteed that this isn't 0.
136148
if deserializedData.__peerData:
137-
self._handleChunk(deserializedData)
149+
await self._handleChunk(deserializedData)
138150
return
139151

140152
self.emit(ConnectionEventType.Data, deserializedData)
141153

142-
def _handleChunk(self, data) -> None:
154+
async def _handleChunk(self, data) -> None:
143155
id = data.__peerData
144156
chunkInfo = self._chunkedData[id] or {
145157
'data': [],
@@ -148,15 +160,17 @@ def _handleChunk(self, data) -> None:
148160
}
149161
chunkInfo.data[data.n] = data.data
150162
chunkInfo.count += 1
151-
this._chunkedData[id] = chunkInfo
163+
self._chunkedData[id] = chunkInfo
152164
if chunkInfo.total == chunkInfo.count:
153165
# Clean up before making
154166
# the recursive call to `_handleDataMessage`.
155167
del self._chunkedData[id]
156168
# We've received all the chunks--time
157169
# to construct the complete data.
158-
data = Blob(chunkInfo.data)
159-
this._handleDataMessage(data)
170+
# Blog is a browser JavaScript type.
171+
# Not applicable in the Python port.
172+
# data = Blob(chunkInfo.data)
173+
await self._handleDataMessage(data)
160174

161175
#
162176
# Exposed functionality for users.
@@ -190,7 +204,7 @@ def send(self, data, chunked: bool) -> None:
190204
if not self.open:
191205
self.emit(
192206
ConnectionEventType.Error,
193-
RuntimrError(
207+
RuntimeError(
194208
'Connection is not open. '
195209
'You should listen for the `open` '
196210
'event before sending messages.'
@@ -199,76 +213,78 @@ def send(self, data, chunked: bool) -> None:
199213
return
200214

201215
if self.serialization == SerializationType.JSON:
202-
self._bufferedSend(this.stringify(data))
203-
elif \
204-
self.serialization == SerializationType.Binary or \
205-
self.serialization == SerializationType.BinaryUTF8:
206-
blob = util.pack(data)
207-
if not chunked and blob.size > util.chunkedMTU:
208-
self._sendChunks(blob)
209-
return
210-
211-
if not util.supports.binaryBlob:
212-
# We only do this if we really need to
213-
# (e.g. blobs are not supported),
214-
# because this conversion is costly.
215-
self._encodingQueue.enque(blob)
216-
else:
217-
self._bufferedSend(blob)
216+
self._bufferedSend(self.stringify(data))
217+
# Blob is a JavaScript browser type. Not supported in Python.
218+
# elif \
219+
# self.serialization == SerializationType.Binary or \
220+
# self.serialization == SerializationType.BinaryUTF8:
221+
# blob = util.pack(data)
222+
# if not chunked and blob.size > util.chunkedMTU:
223+
# self._sendChunks(blob)
224+
# return
225+
#
226+
# if not util.supports.binaryBlob:
227+
# # We only do this if we really need to
228+
# # (e.g. blobs are not supported),
229+
# # because this conversion is costly.
230+
# self._encodingQueue.enque(blob)
231+
# else:
232+
# self._bufferedSend(blob)
218233
else:
219234
self._bufferedSend(data)
220235

221-
def _bufferedSend(self, msg: any) -> None:
222-
if self._buffering or not self._trySend(msg):
236+
async def _bufferedSend(self, msg: any) -> None:
237+
if self._buffering or not await self._trySend(msg):
223238
self._buffer.push(msg)
224-
self._bufferSize = this._buffer.length
239+
self._bufferSize = self._buffer.length
225240

226241
async def _trySend(self, msg) -> bool:
227-
"""Returns true if the send succeeds."""
228-
if not this.open:
229-
return false
242+
"""Return true if the send succeeds."""
243+
if not self.open:
244+
return False
230245
if self.dataChannel.bufferedAmount > \
231246
DataConnection.MAX_BUFFERED_AMOUNT:
232-
self._buffering = true
247+
self._buffering = True
233248

234-
def delayBuf():
249+
async def delayBuf():
235250
# wait for 50ms before trying buffer
236251
await asyncio.sleep(0.05)
237252
self._buffering = False
238253
self._tryBuffer()
239-
asyncio.create_task(delayBuf)
254+
asyncio.create_task(delayBuf())
240255
return False
241256
try:
242257
self.dataChannel.send(msg)
243258
except Exception as e:
244-
log.error(f'DC#:${this.connectionId} Error when sending: {e}')
259+
log.error(f'DC#:${self.connectionId} Error when sending: {e}')
245260
self._buffering = True
246261
self.close()
247262
return False
248263
return True
249264

250265
def _tryBuffer(self) -> None:
251266
"""Try to send the first message in the buffer."""
252-
if not this.open:
267+
if not self.open:
253268
return
254269
if self._buffer.length == 0:
255270
return
256271

257-
msg = this._buffer[0]
272+
msg = self._buffer[0]
258273

259274
if self._trySend(msg):
260275
self._buffer.shift()
261276
self._bufferSize = self._buffer.length
262277
self._tryBuffer()
263278

264-
def _sendChunks(self, blob: Blob) -> None:
265-
blobs = util.chunk(blob)
266-
log.debug(f'DC#${this.connectionId} '
267-
f'Try to send ${blobs.length} chunks...')
268-
for blob in blobs:
269-
self.send(blob, True)
279+
# def _sendChunks(self, blob: Blob) -> None:
280+
# blobs = util.chunk(blob)
281+
# log.debug(f'DC#${this.connectionId} '
282+
# f'Try to send ${blobs.length} chunks...')
283+
# for blob in blobs:
284+
# self.send(blob, True)
270285

271286
def handleMessage(self, message: ServerMessage) -> None:
287+
"""Handle signaling server message."""
272288
payload = message.payload
273289
if message.type == ServerMessageType.Answer:
274290
self._negotiator.handleSDP(message.type, payload.sdp)

src/peerjs/socket.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .enums import SocketEventType
55
import json
66
import websockets
7-
from websockets import WebSocket, ConnectionClosed
7+
from websockets.exceptions import ConnectionClosedError
88
import asyncio
99

1010
log = logging.getLogger(__name__)
@@ -32,7 +32,7 @@ def __init__(
3232
self._disconnected: bool = True
3333
self._id: str = None
3434
self._messagesQueue: list = []
35-
self._websocket: WebSocket = None
35+
self._websocket: websockets.client.WebSocketClientProtocol = None
3636
self._receiver: asyncio.Task = None
3737

3838
async def _connect(self, wss_url=None):
@@ -61,8 +61,8 @@ async def _receive(self, websocket=None):
6161
log.warning("Invalid server message: {}, error {}",
6262
message, e)
6363
await self.emit('message', message)
64-
except ConnectionClosed:
65-
log.debug("Websocket connection closed.")
64+
except ConnectionClosedError as err:
65+
log.warning("Websocket connection closed with error. %s", err)
6666
except RuntimeError as e:
6767
log.warning("Websocket connection error: {}", e)
6868
finally:

0 commit comments

Comments
 (0)