Skip to content

Commit d561803

Browse files
author
adrian
committed
Refactored simulation client. Added keyed subscription substitute. Refactored app entry point. Added args. Added requirements.txt.
1 parent faedcc8 commit d561803

File tree

6 files changed

+96
-53
lines changed

6 files changed

+96
-53
lines changed

requirements.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ray==0.8.0
2+
sanic==19.9.0
3+
numpy==1.17.3
4+
tensorflow==2.0.0
5+
torch==1.3.1
6+
gym==0.15.4
7+
pulsar-client==2.4.0
8+
protobuf==3.11.0
9+
apache-bookkeeper-client==4.10.0
10+
11+

src/backend/messaging/__init__.py

Whitespace-only changes.

src/backend/messaging/client.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import pulsar
2+
from pulsar.schema import *
3+
from schema.simulation import SimulationRequestV1, CycleV1
4+
import logging
5+
import uuid
6+
import ray
7+
8+
@ray.remote
9+
class SimulationClient:
10+
client = None
11+
consumer = None
12+
def __init__(self, request):
13+
self.client = pulsar.Client('pulsar://localhost:6650')
14+
self.logger = logging.getLogger(__name__)
15+
self.consumer = self.client.subscribe("simulation-result", subscription_name="simulator-%s" % request.simId, schema=JsonSchema(CycleV1))
16+
17+
def __del__(self):
18+
if self.client:
19+
self.consumer.close()
20+
self.client.close()
21+
if ray.is_initialized():
22+
ray.shutdown()
23+
24+
def request(self, request):
25+
producer = self.client.create_producer('simulation-request', schema=JsonSchema(SimulationRequestV1))
26+
producer.send(request)
27+
28+
def listen(self, request):
29+
msg = self.consumer.receive()
30+
cycle = msg.value()
31+
try:
32+
# todo use a topic for each or key based subscription would be better
33+
# python client not caught up to java client yet
34+
if msg.partition_key() == request.simId:
35+
self.consumer.acknowledge(msg)
36+
return cycle
37+
except Exception as e:
38+
print(e)
39+
self.consumer.negative_acknowledge(msg)
40+
return None
41+
42+
@staticmethod
43+
def requestToRequest(r):
44+
j = json.loads(r)
45+
return SimulationRequestV1(**j)
46+
47+
@staticmethod
48+
async def run(request, ws):
49+
if not ray.is_initialized():
50+
ray.init()
51+
while True:
52+
msg = await ws.recv()
53+
msg = SimulationClient.requestToRequest(msg)
54+
sim = SimulationClient.remote(msg)
55+
sim.request.remote(msg)
56+
for i in range(msg.cycles):
57+
result = ray.get(sim.listen.remote(msg))
58+
if result:
59+
await ws.send(json.dumps(result.__dict__))
60+
else:
61+
i=i-1
62+
await ws.close()
63+
del sim

src/backend/server.py

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,63 +3,20 @@
33
from sanic.websocket import WebSocketProtocol
44
from os.path import dirname, abspath, join
55
import json
6-
import pulsar
7-
from pulsar.schema import *
8-
# todo make library
6+
97
from schema.simulation import SimulationRequestV1, FoodV1, AgentV1, CycleV1, ToDTO
10-
import logging
11-
import uuid
8+
from backend.messaging.client import SimulationClient
9+
import ray
1210

1311
_CURDIR = dirname(abspath(__file__))
1412

1513
app = Sanic()
1614
app.static('/', join(_CURDIR, './dist'))
1715

18-
class SimulationClient:
19-
def __init__(self):
20-
self.client = pulsar.Client('pulsar://localhost:6650')
21-
self.logger = logging.getLogger(__name__)
22-
23-
def __del__(self):
24-
self.client.close()
25-
26-
def request(self, request):
27-
producer = self.client.create_producer('simulation-request', schema=JsonSchema(SimulationRequestV1))
28-
producer.send(request)
29-
return request.simId
30-
31-
async def listen(self, request, ws):
32-
consumer = self.client.subscribe("simulation-result", subscription_name="simulator", schema=JsonSchema(CycleV1))
33-
for _ in range(request.cycles):
34-
msg = consumer.receive()
35-
cycle = msg.value()
36-
try:
37-
# todo use a topic for each or key based subscription would be better
38-
# if msg.partition_key() == request.simId | 1 == 1:
39-
consumer.acknowledge(msg)
40-
# Acknowledge successful processing of the message
41-
await ws.send(json.dumps(cycle.__dict__))
42-
#else:
43-
# consumer.negative_acknowledge(msg)
44-
except Exception as e:
45-
# Message failed to be processed
46-
print(e)
47-
consumer.negative_acknowledge(msg)
48-
consumer.close()
49-
50-
def requestToRequest(r):
51-
j = json.loads(r)
52-
return SimulationRequestV1(**j)
53-
54-
sim = SimulationClient()
55-
5616
@app.websocket('/environment')
5717
async def environment(request, ws):
58-
while True:
59-
msg = await ws.recv()
60-
msg = requestToRequest(msg)
61-
id = sim.request(msg)
62-
await sim.listen(msg, ws)
18+
await SimulationClient.run(request, ws)
19+
del sim
6320

6421
def start():
6522
app.run(host="0.0.0.0", port=3000, protocol=WebSocketProtocol, debug=True)

src/main.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,21 @@
22
import sys
33
from simulator.messaging.messaging import MessageClient
44
from schema.simulation import SimulationRequestV1, FoodV1, AgentV1, ToDTO
5+
from backend.server import start
6+
import argparse
7+
8+
def getArgs():
9+
parser = argparse.ArgumentParser(description='Start frontend or workers. Add --frontend for server.')
10+
parser.add_argument('--frontend',
11+
default=False,
12+
action="store_true",
13+
help='start socket server and front end app')
14+
return parser.parse_args()
515

616
if __name__ == "__main__":
7-
client = MessageClient()
8-
client.listen()
17+
args = getArgs()
18+
if not args.frontend:
19+
client = MessageClient()
20+
client.listen()
21+
else:
22+
start()

src/simulator/messaging/messaging.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@ def runSimulation(self, request : SimulationRequestV1):
3232
producer = self.client.create_producer('simulation-result', schema=JsonSchema(CycleV1))
3333
for i in range(request.cycles):
3434
cycle = sim.step(i)
35-
print(cycle)
3635
cyclev1 = ToDTO.toCycle(cycle)
37-
print("SENGING")
38-
producer.send(cyclev1)
36+
producer.send(cyclev1, partition_key=request.simId, sequence_id=i)
3937

4038
# subscribe to a simulation request topic
4139
def listen(self):

0 commit comments

Comments
 (0)