from importlib import import_module
from inspect import isclass
from webbrowser import open_new_tab
+from xmlrpc.client import ServerProxy
from panda3d.core import Texture, TextNode, WindowProperties, LVector2i, \
TextProperties, TextPropertiesManager
from direct.gui.DirectGui import DirectButton, DirectCheckButton, \
self._widgets += [DirectButton(
text=_('Credits'), pos=(0, 1, -.2), command=self.on_credits,
**self._common_btn)]
+ def btn_exit():
+ if self._fun_test:
+ ServerProxy('http://localhost:6000').destroy()
+ exit()
self._widgets += [DirectButton(
- text=_('Exit'), pos=(0, 1, -.6), command=lambda: exit(),
+ text=_('Exit'), pos=(0, 1, -.6), command=lambda: btn_exit(),
**self._common_btn)]
self._rearrange_width()
self.accept('enforce_resolution', self.enforce_res)
import datetime
from time import sleep
from os import getcwd, system
-from multiprocessing.connection import Client
+from xmlrpc.client import ServerProxy
from logging import debug, info
from pathlib import Path
from shutil import rmtree
info('test idx: %s' % idx)
self._offset = offset
sleep(5)
- address = ('localhost', 6000)
- self._conn = Client(address)
+ self._proxy = ServerProxy('http://localhost:6000')
self._curr_time = 0
self._tasks = []
self._prev_time = 0
taskMgr.add(self.on_frame_unpausable, 'on-frame-unpausable')
- self._conn.send(['set_idx', idx])
+ self._proxy.set_idx(idx)
self._do_screenshots(idx)
def _do_screenshot(self, name):
time = datetime.datetime.now().strftime('%y%m%d%H%M%S')
name = name + '.png'
- self._conn.send(['screenshot', name])
+ self._proxy.screenshot(name)
info('screenshot %s' % name)
def _screenshot(self, time, name):
def _enforce_res(self, time, res):
def cback():
- self._conn.send(['enforce_res', res])
+ self._proxy.enforce_res(res)
info('enforce_res %s' % res)
self._tasks += [(
self._curr_time + time,
def _enforce_resolution(self, time, res):
def cback():
- self._conn.send(['enforce_resolution', res])
+ self._proxy.enforce_resolution(res)
info('enforce_resolution %s (send)' % res)
self._tasks += [(
self._curr_time + time,
def _verify(self):
def __verify():
- self._conn.send(['verify'])
+ self._proxy.verify()
info('verify')
self._tasks += [(
self._curr_time + 3,
def _exit(self):
def do_exit():
- self._conn.close()
+ self._proxy.close()
exit()
self._tasks += [(
self._curr_time + 3,
def _do_screenshots_1(self):
info('_do_screenshots_1')
self._screenshot(FunctionalTest.start_time, 'main_menu')
- self._do_screenshots_credits()
- self._do_screenshots_options()
+ #self._do_screenshots_credits()
+ #self._do_screenshots_options()
self._do_screenshots_exit()
def _do_screenshots_credits(self):
from os.path import join, exists
from glob import glob
from sys import exit
-from multiprocessing.connection import Listener
+from xmlrpc.server import SimpleXMLRPCServer
from threading import Thread
from panda3d.core import Filename
from direct.gui.OnscreenText import OnscreenText
from ya2.build.build import _branch
-class ListenerThread(Thread):
+class RPCServer(SimpleXMLRPCServer):
+
+ def __init__(self, callbacks):
+ super().__init__(('localhost', 6000), allow_none=True)
+ self._callbacks = callbacks
+ self.register_introspection_functions()
+ self.register_function(self.screenshot, 'screenshot')
+ self.register_function(self.enforce_res, 'enforce_res')
+ self.register_function(self.verify, 'verify')
+ self.register_function(self.set_idx, 'set_idx')
+ self.register_function(self.enforce_resolution, 'enforce_resolution')
+ self.register_function(self.destroy, 'destroy')
+
+ def screenshot(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[0], 'cb0', [arg])
+
+ def enforce_res(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[1], 'cb1', [arg])
+
+ def verify(self):
+ taskMgr.doMethodLater(.01, self._callbacks[2], 'cb2')
+
+ def set_idx(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[3], 'cb3', [arg])
+
+ def enforce_resolution(self, arg):
+ taskMgr.doMethodLater(.01, self._callbacks[4], 'cb4', [arg])
+
+ def destroy(self):
+ self._BaseServer__shutdown_request = True
+
+
+class RPCServerThread(Thread):
def __init__(self, callbacks):
Thread.__init__(self)
- address = ('localhost', 6000)
- self._listener = Listener(address)
- self._listener._listener._socket.settimeout(15)
- try:
- self._conn = self._listener.accept()
- except TimeoutError:
- info('listener timeout')
self._callbacks = callbacks
def run(self):
- running = hasattr(self, '_conn')
- while running:
- try:
- msg = self._conn.recv()
- if msg[0] == 'screenshot':
- taskMgr.doMethodLater(.01, self._callbacks[0], 'cb0', [msg[1]])
- elif msg[0] == 'enforce_res':
- taskMgr.doMethodLater(.01, self._callbacks[1], 'cb1', [msg[1]])
- elif msg[0] == 'verify':
- taskMgr.doMethodLater(.01, self._callbacks[2], 'cb2')
- elif msg[0] == 'set_idx':
- taskMgr.doMethodLater(.01, self._callbacks[3], 'cb3', [msg[1]])
- elif msg[0] == 'enforce_resolution':
- taskMgr.doMethodLater(.01, self._callbacks[4], 'cb4', [msg[1]])
- except EOFError:
- running = False
+ self.server = RPCServer(self._callbacks)
+ self.server.serve_forever()
class FunctionalTest(GameObject):
def __init__(self, ref):
super().__init__()
- self._listener = ListenerThread([self._do_screenshot, self._do_enforce_res, self.__verify, self._set_idx, self._do_enforce_resolution])
- self._listener.start()
+ RPCServerThread([self._do_screenshot, self._do_enforce_res, self.__verify, self._set_idx, self._do_enforce_resolution]).start()
self.txt = OnscreenText('', fg=(1, 0, 0, 1), scale=.16)
#self._path = ''
#if self.eng.is_appimage: