from os.path import join, exists
from glob import glob
from sys import exit
-from multiprocessing.connection import Listener
+from xmlrpc.server import SimpleXMLRPCServer
from threading import Thread
from panda3d.core import Filename
from direct.gui.OnscreenText import OnscreenText
from ya2.build.build import _branch
-class ListenerThread(Thread):
+class RPCServer(SimpleXMLRPCServer):
+
+ def __init__(self, callbacks):
+ super().__init__(('localhost', 6000), allow_none=True)
+ self._callbacks = callbacks
+ self.register_introspection_functions()
+ self.register_function(self.screenshot, 'screenshot')
+ self.register_function(self.enforce_res, 'enforce_res')
+ self.register_function(self.verify, 'verify')
+ self.register_function(self.set_idx, 'set_idx')
+ self.register_function(self.enforce_resolution, 'enforce_resolution')
+ self.register_function(self.destroy, 'destroy')
+
+ def screenshot(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[0], 'cb0', [arg])
+
+ def enforce_res(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[1], 'cb1', [arg])
+
+ def verify(self):
+ taskMgr.doMethodLater(.01, self._callbacks[2], 'cb2')
+
+ def set_idx(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[3], 'cb3', [arg])
+
+ def enforce_resolution(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[4], 'cb4', [arg])
+
+ def destroy(self):
+ self._BaseServer__shutdown_request = True
+
+
+class RPCServerThread(Thread):
def __init__(self, callbacks):
Thread.__init__(self)
- address = ('localhost', 6000)
- self._listener = Listener(address)
- self._listener._listener._socket.settimeout(15)
- try:
- self._conn = self._listener.accept()
- except TimeoutError:
- info('listener timeout')
self._callbacks = callbacks
def run(self):
- running = hasattr(self, '_conn')
- while running:
- try:
- msg = self._conn.recv()
- if msg[0] == 'screenshot':
- taskMgr.doMethodLater(.01, self._callbacks[0], 'cb0', [msg[1]])
- elif msg[0] == 'enforce_res':
- taskMgr.doMethodLater(.01, self._callbacks[1], 'cb1', [msg[1]])
- elif msg[0] == 'verify':
- taskMgr.doMethodLater(.01, self._callbacks[2], 'cb2')
- elif msg[0] == 'set_idx':
- taskMgr.doMethodLater(.01, self._callbacks[3], 'cb3', [msg[1]])
- elif msg[0] == 'enforce_resolution':
- taskMgr.doMethodLater(.01, self._callbacks[4], 'cb4', [msg[1]])
- except EOFError:
- running = False
+ self.server = RPCServer(self._callbacks)
+ self.server.serve_forever()
class FunctionalTest(GameObject):
def __init__(self, ref):
super().__init__()
- self._listener = ListenerThread([self._do_screenshot, self._do_enforce_res, self.__verify, self._set_idx, self._do_enforce_resolution])
- self._listener.start()
+ RPCServerThread([self._do_screenshot, self._do_enforce_res, self.__verify, self._set_idx, self._do_enforce_resolution]).start()
self.txt = OnscreenText('', fg=(1, 0, 0, 1), scale=.16)
#self._path = ''
#if self.eng.is_appimage: