Skip to content

Commit c9da24a

Browse files
committed
feat: datachannel handshake successful
- first successful datachannel message exchanged between ambianic-ui(PeerJS) and ambianic-edge(PeerJS-python)
1 parent 15c9958 commit c9da24a

File tree

3 files changed

+48
-31
lines changed

3 files changed

+48
-31
lines changed

src/peerjs/dataconnection.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ def _apply_options(
8484
# }
8585
# }
8686

87-
self._dc: RTCDataChannel
88-
# self._encodingQueue = EncodingQueue()
87+
self._dc: RTCDataChannel = None
88+
self._encodingQueue = None # EncodingQueue()
8989
# @self._encodingQueue.on('done')
9090
# def on_eq_done(ab): # ab : ArrayBuffer
9191
# self._bufferedSend(ab)
@@ -101,10 +101,10 @@ def _apply_options(
101101
async def start(self):
102102
"""Start data connection negotiation."""
103103
payload_options = self._payload or {'originator': True}
104-
log.info('Starting new connection with payload: %r '
105-
'and payload_option: %r',
106-
self._payload,
107-
payload_options)
104+
log.debug('\n Starting new connection with payload: \n %r '
105+
'\n and payload_options: %r',
106+
self._payload,
107+
payload_options)
108108
await self._negotiator.startConnection(**payload_options)
109109

110110
def initialize(self, dc: RTCDataChannel) -> None:
@@ -127,7 +127,9 @@ async def on_datachannel_open():
127127

128128
@self.dataChannel.on('message')
129129
async def on_datachannel_message(e):
130-
log.debug(f'DC#${self.connectionId} dc onmessage: {e.data}')
130+
log.info('DataChannel message from %s, \n: %r',
131+
self.peerId,
132+
e)
131133
await self._handleDataMessage(e)
132134

133135
@self.dataChannel.on('close')

src/peerjs/negotiator.py

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,21 @@ def _setupListeners(self, peerConnection: RTCPeerConnection = None):
106106
# });
107107
# };
108108

109+
@peerConnection.on('icegatheringstatechange')
110+
def peerconn_icegatheringstatechange():
111+
if peerConnection.iceGatheringState == 'complete':
112+
log.warning('Local ICE candidates gathered.')
113+
109114
@peerConnection.on('iceconnectionstatechange')
110115
def peerconn_oniceconnectionstatechange():
111116
state_change = BaseEventEmitter()
112117
@state_change.once("failed")
113-
def on_failed():
118+
def on_failed(error=None):
114119
log.warning(
115-
f"iceConnectionState is failed, "
116-
"closing connections to {peerId}"
120+
"\n IceConnectionState failed with error %s "
121+
"\n Closing connections to peer id %s",
122+
error,
123+
peerId
117124
)
118125
self.connection.emit(
119126
ConnectionEventType.Error,
@@ -148,7 +155,7 @@ def on_disconnected():
148155

149156
@state_change.once("completed")
150157
def on_completed():
151-
log.info(
158+
log.warning(
152159
'iceConnectionState completed for peer id %s',
153160
peerId
154161
)
@@ -160,6 +167,10 @@ def on_completed():
160167
# peerconn_onicecandidate)
161168
pass
162169

170+
log.warning(
171+
'iceConnectionState event: %s',
172+
peerConnection.iceConnectionState
173+
)
163174
# forward connection stage change event to local handlers
164175
state_change.emit(peerConnection.iceConnectionState)
165176
# notify higher level connection listeners
@@ -171,9 +182,8 @@ def on_completed():
171182
# Fired between offer and answer, so options should already be saved
172183
# in the options hash.
173184
@peerConnection.on("datachannel")
174-
def peerconn_ondatachanel(evt):
175-
log.info("Received data channel.")
176-
dataChannel = evt.channel
185+
def peerconn_ondatachanel(dataChannel):
186+
log.warning("Received data channel %r", dataChannel)
177187
connection = provider.getConnection(peerId, connectionId)
178188
connection.initialize(dataChannel)
179189

@@ -207,7 +217,10 @@ async def cleanup(self) -> None:
207217
dataChannelNotClosed = dataChannel.readyState and \
208218
dataChannel.readyState != "closed"
209219
if peerConnectionNotClosed or dataChannelNotClosed:
210-
await peerConnection.close()
220+
try:
221+
await peerConnection.close()
222+
except Exception as err:
223+
log.exception('Error while closing connection', err)
211224

212225
async def _makeOffer(self):
213226
peerConnection = self.connection.peerConnection
@@ -262,21 +275,23 @@ async def _makeAnswer(self) -> None:
262275
provider = self.connection.provider
263276
try:
264277
answer = await peerConnection.createAnswer()
265-
log.info('\n Created answer: \n %r', answer)
266-
log.info('\n Connection options: %r',
267-
self.connection.options)
268278
sdpTransformFunction = \
269279
self.connection.options.get('sdpTransform', None)
270280
if sdpTransformFunction and \
271281
callable(sdpTransformFunction):
272282
answer.sdp = sdpTransformFunction(answer.sdp) \
273283
or answer.sdp
274-
json_answer = object_to_string(answer)
284+
log.info('\n Created answer header: \n %r', answer)
285+
log.info('\n Connection options: %r',
286+
self.connection.options)
275287
try:
288+
log.info('Gathering ICE candidates to complete answer...')
276289
await peerConnection.setLocalDescription(answer)
277-
log.info(f"Set localDescription: {answer}"
278-
f"for:{self.connection.peer}")
279-
290+
answer = peerConnection.localDescription
291+
json_answer = object_to_string(answer)
292+
log.warning('\n Sending SDP ANSWER to peer id %s: \n %r ',
293+
self.connection.peer,
294+
json_answer)
280295
await provider.socket.send({
281296
'type': ServerMessageType.Answer,
282297
'payload': {
@@ -299,12 +314,12 @@ async def handleSDP(self, type: str = None, sdp: str = None) -> None:
299314
rsd = RTCSessionDescription(type=type, sdp=sdp)
300315
peerConnection = self.connection.peerConnection
301316
provider = self.connection.provider
302-
log.info("Setting remote session description %r", rsd)
317+
log.debug("\n Setting remote session description \n %r", rsd)
303318
try:
304319
await peerConnection.setRemoteDescription(rsd)
305-
log.info('Set remoteDescription type: %s'
306-
'for peer: %r',
307-
type, self.connection.peer)
320+
log.debug('\n Set remoteDescription type: %s'
321+
'\n for peer: %r',
322+
type, self.connection.peer)
308323
if type == "offer":
309324
await self._makeAnswer()
310325
except Exception as err:

src/peerjs/peer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ async def _handleMessage(self, message: ServerMessage) -> None:
198198
peerId = message.src
199199
payload = message.payload
200200

201-
log.info('\n Handling server message \n type %s, '
202-
'\n source peer/client id %s, \n payload %s, \n message %r',
203-
type, peerId, payload, message)
201+
log.debug('\n Handling server message \n type %s, '
202+
'\n source peer/client id %s, \n payload %s, \n message %r',
203+
type, peerId, payload, message)
204204

205205
server_messenger = AsyncIOEventEmitter()
206206

@@ -273,8 +273,8 @@ async def _handle_offer(self, peerId=None, payload=None):
273273
"""Handle remote peer offer for a direct connection."""
274274
# we should consider switching this to CALL/CONNECT,
275275
# but this is the least breaking option.
276-
log.info("Remote peer id %s offering connection. Payload %s",
277-
peerId, payload)
276+
log.debug("\n Remote peer id %s offering connection. \n Payload %s",
277+
peerId, payload)
278278
connectionId = payload['connectionId']
279279
connection = self.getConnection(peerId, connectionId)
280280

0 commit comments

Comments
 (0)