Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self): enable_splitbrain() server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "bin", "rpyc_classic.py") self.proc = subprocess.Popen([sys.executable, server_file, "--mode=oneshot", "--host=localhost", "-p0"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) line = self.proc.stdout.readline().strip() if not line: print (self.proc.stderr.read()) self.fail("server failed to start") self.assertEqual(line, b("rpyc-oneshot"), "server failed to start") host, port = self.proc.stdout.readline().strip().split(b("\t")) self.conn = rpyc.classic.connect(host, int(port))
def setUp(self): server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "bin", "rpyc_classic.py") self.proc = subprocess.Popen([sys.executable, server_file, "--mode=oneshot", "--host=localhost", "-p0"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) line = self.proc.stdout.readline().strip() if not line: print(self.proc.stderr.read()) self.fail("server failed to start") self.assertEqual(line, b("rpyc-oneshot"), "server failed to start") host, port = self.proc.stdout.readline().strip().split(b("\t")) self.conn = rpyc.classic.connect(host, int(port))
def _get_myclass(self, proto_config): self.conn.close() self.server.protocol_config.update(proto_config) self.conn = rpyc.connect("localhost", self.server.port) return self.conn.root.MyClass()
def doListDupes(on_file): print("Finding files similar to: '{}'".format(on_file)) remote = rpyc.connect("localhost", 12345) commons = remote.root.listDupes(filePath=on_file) print("result:") print(commons) print("Wut?")
def setUp(self): self.cfg = {'allow_pickle': True} self.server = rpyc.utils.server.ThreadedServer(MyService, port=0, protocol_config=self.cfg.copy()) self.server.logger.quiet = False self.thd = self.server._start_in_thread() self.conn = rpyc.connect("localhost", self.server.port, config=self.cfg) self.conn2 = rpyc.connect("localhost", self.server.port, config=self.cfg) # globals are made available to timeit, prepare them cfg_tests.timeit['conn'] = self.conn cfg_tests.timeit['conn2'] = self.conn2 cfg_tests.timeit['df'] = pd.DataFrame(np.random.rand(DF_ROWS, DF_COLS))
def test_parallelism(self): conns = [rpyc.classic.connect("localhost", port=18878) for _ in range(50)] try: start = time.time() gevent.joinall([ gevent.spawn(c.modules.time.sleep, 1) for c in conns ]) stop = time.time() self.assertLessEqual(stop - start, 2) finally: for c in conns: c.close()
def setUp(self): self.conn = rpyc.classic.connect_thread()
def test_connection(self): with rpyc.classic.connect("localhost", port=18878) as c: c.execute("x = 5") self.assertEqual(c.namespace["x"], 5) self.assertEqual(c.eval("1+x"), 6)
def setUp(self): self.conn = rpyc.classic.connect_thread()
def test_multiple_connections(self): def get_ident(gevent): return gevent.monkey.get_original('threading', 'get_ident')() c1 = rpyc.classic.connect("localhost", port=18878) c2 = rpyc.classic.connect("localhost", port=18878) c3 = rpyc.classic.connect("localhost", port=18878) with c1, c2, c3: id0 = get_ident(gevent) id1 = get_ident(c1.modules.gevent) id2 = get_ident(c2.modules.gevent) id3 = get_ident(c3.modules.gevent) # all server greenlets and clients running in same OS thread ;) self.assertEqual(id0, id1) self.assertEqual(id1, id2) self.assertEqual(id1, id3)