Skip to content

Commit 70b9eca

Browse files
committed
fix: progressing with port
1 parent 4a76bac commit 70b9eca

File tree

3 files changed

+165
-119
lines changed

3 files changed

+165
-119
lines changed

src/cli.py

Lines changed: 156 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,27 @@
11
"""Register with PNP server and wait for remote peers to connect."""
2-
import argparse
2+
# import argparse
33
import asyncio
44
import logging
5-
import time
65
from .peerroom import PeerRoom
7-
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
8-
from .pnpsignaling import AmbianicPnpSignaling
6+
from aiortc import RTCIceCandidate, RTCSessionDescription
7+
from .peer import Peer
98

109
log = logging.getLogger(__name__)
1110

12-
13-
def _server_register(channel, t, message):
14-
"""Register this peer with signaling server."""
15-
print('Registering this client with peer discovery server')
16-
17-
18-
def _room_join(channel, t, message):
19-
"""Join room with other local network peers to allow auto discovery."""
20-
print('Joining room with local network peers')
21-
22-
23-
def _channel_log(channel, t, message):
24-
print("channel(%s) %s %s" % (channel.label, t, message))
25-
26-
27-
def _channel_send(channel, message):
28-
_channel_log(channel, ">", message)
29-
channel.send(message)
30-
11+
peer = None
12+
myPeerId = None
13+
AMBIANIC_PNP_HOST = 'ambianic-pnp.herokuapp.com'
14+
AMBIANIC_PNP_PORT = 80
15+
AMBIANIC_PNP_SECURE = True
16+
time_start = None
17+
peerConnectionStatus = None
18+
discoveryLoop = None
3119

3220
async def _consume_signaling(pc, signaling):
3321
while True:
3422
obj = await signaling.receive()
35-
3623
if isinstance(obj, RTCSessionDescription):
3724
await pc.setRemoteDescription(obj)
38-
3925
if obj.type == "offer":
4026
# send answer
4127
await pc.setLocalDescription(await pc.createAnswer())
@@ -46,102 +32,165 @@ async def _consume_signaling(pc, signaling):
4632
print("Exiting")
4733
break
4834

49-
time_start = None
50-
51-
52-
def _current_stamp():
53-
global time_start
54-
if time_start is None:
55-
time_start = time.time()
56-
return 0
57-
else:
58-
return int((time.time() - time_start) * 1000000)
59-
6035

61-
async def _run_answer(pc, signaling):
62-
await signaling.connect()
63-
@pc.on("datachannel")
64-
def on_datachannel(channel):
65-
_channel_log(channel, "-", "created by remote party")
66-
@channel.on("message")
67-
def on_message(message):
68-
_channel_log(channel, "<", message)
69-
if isinstance(message, str) and message.startswith("ping"):
70-
# reply
71-
_channel_send(channel, "pong" + message[4:])
72-
await _consume_signaling(pc, signaling)
73-
74-
75-
async def _run_offer(pc, signaling):
76-
await signaling.connect()
77-
channel = pc.createDataChannel("chat")
78-
_channel_log(channel, "-", "created by local party")
36+
async def join_peer_room(peer=None):
37+
"""Join a peer room with other local peers."""
38+
# first try to find the remote peer ID in the same room
39+
assert peer
40+
myRoom = PeerRoom(peer)
41+
log.debug('Fetching room members', myRoom)
42+
peerIds = await myRoom.getRoomMembers()
43+
log.debug('myRoom members %r', peerIds)
44+
45+
46+
def _setPnPServiceConnectionHandlers(peer=None):
47+
global myPeerId
48+
@peer.on('open')
49+
async def peer_open(id):
50+
global myPeerId
51+
# Workaround for peer.reconnect deleting previous id
52+
if peer.id is None:
53+
log.warning('pnpService: Received null id from peer open')
54+
peer.id = myPeerId
55+
else:
56+
if myPeerId != peer.id:
57+
log.info(
58+
'pnpService: Service returned new peerId. Old %s, New %s',
59+
myPeerId,
60+
peer.id
61+
)
62+
myPeerId = peer.id
63+
log.info('pnpService: myPeerId: ', peer.id)
64+
65+
@peer.on('disconnected')
66+
async def peer_disconnected():
67+
global myPeerId
68+
log.info('pnpService: Connection lost. Please reconnect')
69+
# Workaround for peer.reconnect deleting previous id
70+
if not peer.id:
71+
log.info('BUG WORKAROUND: Peer lost ID. '
72+
'Resetting to last known ID.')
73+
peer._id = myPeerId
74+
peer._lastServerId = myPeerId
75+
peer.reconnect()
76+
77+
@peer.on('close')
78+
def peer_close():
79+
# peerConnection = null
80+
log.info('pnpService: Connection closed')
81+
82+
@peer.on('error')
83+
def peer_error(err):
84+
log.warning('pnpService %s', err)
85+
log.info('peerConnectionStatus', peerConnectionStatus)
86+
# retry peer connection in a few seconds
87+
asyncio.call_later(3, pnp_service_connect)
88+
89+
# remote peer tries to initiate connection
90+
@peer.on('connection')
91+
async def peer_connection(peerConnection):
92+
log.info('remote peer trying to establish connection')
93+
_setPeerConnectionHandlers(peerConnection)
94+
95+
96+
def _setPeerConnectionHandlers(peerConnection):
97+
@peerConnection.on('open')
98+
async def pc_open():
99+
log.info('pnpService: Connected to: %s', peerConnection.peer)
100+
101+
# Handle incoming data (messages only since this is the signal sender)
102+
@peerConnection.on('data')
103+
async def pc_data(data):
104+
log.info('pnpService: data received from remote peer %r', data)
105+
pass
79106

80-
async def send_pings():
107+
@peerConnection.on('close')
108+
async def pc_close():
109+
log.info('Connection to remote peer closed')
110+
111+
112+
# async def _run_answer(pc, signaling):
113+
# await signaling.connect()
114+
# @pc.on("datachannel")
115+
# def on_datachannel(channel):
116+
# _channel_log(channel, "-", "created by remote party")
117+
# @channel.on("message")
118+
# def on_message(message):
119+
# _channel_log(channel, "<", message)
120+
# if isinstance(message, str) and message.startswith("ping"):
121+
# # reply
122+
# _channel_send(channel, "pong" + message[4:])
123+
# await _consume_signaling(pc, signaling)
124+
125+
async def pnp_service_connect() -> Peer:
126+
"""Create a Peer instance and register with PnP signaling server."""
127+
# if connection to pnp service already open, then nothing to do
128+
global peer
129+
if peer and peer.open:
130+
log.info('peer already connected')
131+
return
132+
# Create own peer object with connection to shared PeerJS server
133+
log.info('pnpService: creating peer')
134+
# If we already have an assigned peerId, we will reuse it forever.
135+
# We expect that peerId is crypto secure. No need to replace.
136+
# Unless the user explicitly requests a refresh.
137+
global myPeerId
138+
log.info('pnpService: last saved myPeerId', myPeerId)
139+
peer = Peer(myPeerId, {
140+
'host': AMBIANIC_PNP_HOST,
141+
'port': AMBIANIC_PNP_PORT,
142+
'secure': AMBIANIC_PNP_SECURE,
143+
'debug': 2
144+
})
145+
log.info('pnpService: peer created')
146+
await peer.start()
147+
log.info('pnpService: peer activated')
148+
_setPnPServiceConnectionHandlers(peer)
149+
make_discoverable(peer=peer)
150+
return peer
151+
152+
153+
async def make_discoverable(peer=None):
154+
"""Enable remote peers to find and connect to this peer."""
155+
assert peer
156+
157+
async def periodic():
81158
while True:
82-
_channel_send(channel, "ping %d" % _current_stamp())
83-
await asyncio.sleep(1)
159+
log.info('Making peer discoverable.')
160+
join_peer_room(peer=peer)
161+
await asyncio.sleep(5)
84162

85-
@channel.on("open")
86-
def on_open():
87-
asyncio.ensure_future(send_pings())
163+
def stop():
164+
discoveryLoop.cancel()
88165

89-
@channel.on("message")
90-
def on_message(message):
91-
_channel_log(channel, "<", message)
92-
if isinstance(message, str) and message.startswith("pong"):
93-
elapsed_ms = (_current_stamp() - int(message[5:])) / 1000
94-
print(" RTT %.2f ms" % elapsed_ms)
166+
discoveryLoop = asyncio.create_task(periodic())
95167

96-
# send offer
97-
await pc.setLocalDescription(await pc.createOffer())
98-
await signaling.send(pc.localDescription)
99-
await _consume_signaling(pc, signaling)
100-
101-
myPeerId = None
102-
remotePeerId = None
103-
104-
105-
async def discoverRemotePeerId(peer=None):
106-
"""Try to find a remote peer in the same local room."""
107-
global remotePeerId
108-
if not remotePeerId:
109-
# first try to find the remote peer ID in the same room
110-
myRoom = PeerRoom(peer)
111-
log.debug('Fetching room members', myRoom)
112-
peerIds = await myRoom.getRoomMembers()
113-
log.debug('myRoom members', peerIds)
114-
try:
115-
remotePeerId = [pid for pid in peerIds if pid != myPeerId][0]
116-
return remotePeerId
117-
except IndexError:
118-
# no other peers in room
119-
return None
120-
else:
121-
return remotePeerId
122168

123169
if __name__ == "__main__":
124-
args = None
125-
parser = argparse.ArgumentParser(description="Data channels ping/pong")
126-
parser.add_argument("role", choices=["offer", "answer"])
127-
parser.add_argument("--verbose", "-v", action="count")
170+
# args = None
171+
# parser = argparse.ArgumentParser(description="Data channels ping/pong")
172+
# parser.add_argument("role", choices=["offer", "answer"])
173+
# parser.add_argument("--verbose", "-v", action="count")
128174
# add_signaling_arguments(parser)
129-
args = parser.parse_args()
130-
if args.verbose:
131-
logging.basicConfig(level=logging.DEBUG)
175+
# args = parser.parse_args()
176+
# if args.verbose:
177+
logging.basicConfig(level=logging.DEBUG)
132178
# signaling = create_signaling(args)
133-
signaling = AmbianicPnpSignaling(args)
134-
pc = RTCPeerConnection()
135-
if args.role == "offer":
136-
coro = _run_offer(pc, signaling)
137-
else:
138-
coro = _run_answer(pc, signaling)
179+
# signaling = AmbianicPnpSignaling(args)
180+
# pc = RTCPeerConnection()
181+
# if args.role == "offer":
182+
# coro = _run_offer(pc, signaling)
183+
# else:
184+
# coro = _run_answer(pc, signaling)
185+
coro = pnp_service_connect()
139186
# run event loop
140187
loop = asyncio.get_event_loop()
141188
try:
142189
loop.run_until_complete(coro)
143190
except KeyboardInterrupt:
191+
log.info('KeyboardInterrupt detected. Exiting...')
144192
pass
145193
finally:
146-
loop.run_until_complete(pc.close())
147-
loop.run_until_complete(signaling.close())
194+
loop.run_until_complete(peer.destroy())
195+
# loop.run_until_complete(pc.close())
196+
# loop.run_until_complete(signaling.close())

src/dataconnection.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,8 @@ def close(self) -> None:
174174
self.provider._removeConnection(self)
175175
self.provider = None
176176
if self.dataChannel:
177-
self.dataChannel.onopen = None
178-
self.dataChannel.onmessage = None
179-
self.dataChannel.onclose = None
180-
self._dc = null
177+
self.dataChannel.removeAllListeners()
178+
self._dc = None
181179
if self._encodingQueue:
182180
self._encodingQueue.destroy()
183181
self._encodingQueue.removeAllListeners()

src/peer.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,18 @@ def __init__(self, id: str = None, **options):
144144
return
145145

146146
if userId:
147-
self._initialize(userId)
147+
self._id = id
148148
else:
149149
try:
150150
id = await self._api.retrieveId()
151-
self._initialize(id)
151+
self._id = id
152152
except Exception as e:
153153
self._abort(PeerErrorType.ServerError, e)
154154

155+
def start(self):
156+
"""Activate Peer instance."""
157+
await self.socket.start(id, self._options.token)
158+
155159
@property
156160
def id(self, ) -> None:
157161
return self._id
@@ -191,7 +195,7 @@ def disconnected(self, ):
191195
return self._disconnected
192196

193197
@property
194-
def _createServerConnection(self, ) -> Socket:
198+
def _createServerConnection(self) -> Socket:
195199
socket = Socket(
196200
self._options.secure,
197201
self._options.host,
@@ -223,11 +227,6 @@ def on_close():
223227

224228
return socket
225229

226-
def _initialize(self, id: str) -> None:
227-
""" Initialize a connection with the server."""
228-
self._id = id
229-
self.socket.start(id, self._options.token)
230-
231230
def _handleMessage(self, message: ServerMessage) -> None:
232231
"""Handles messages from the server."""
233232
type = message.type

0 commit comments

Comments
 (0)