--- /dev/null
+from panda3d.core import load_prc_file_data
+load_prc_file_data('', 'window-type none')
+from pathlib import Path
+import sys
+if '' in sys.path: sys.path.remove('')
+sys.path.append(str(Path(__file__).parent.parent.parent))
+from os import remove
+from os.path import exists
+from glob import glob
+from unittest import TestCase
+from threading import Thread
+from panda3d.core import NodePath
+from direct.showbase.ShowBase import ShowBase
+from direct.showbase.DirectObject import DirectObject
+from ya2.p3d.asserts import Assert
+
+
+class TestApp(ShowBase): pass
+
+
+class AssertsTests(TestCase):
+
+ def setUp(self):
+ self.__app = TestApp()
+
+ def tearDown(self):
+ self.__app.destroy()
+
+ def test_render3d(self):
+ np = NodePath('new node')
+ np.reparent_to(render)
+ with self.assertRaises(Exception):
+ Assert.assert_render3d()
+ np.remove_node()
+ Assert.assert_render3d()
+
+ def test_render2d(self):
+ np = NodePath('new node')
+ np.reparent_to(render2d)
+ with self.assertRaises(Exception):
+ Assert.assert_render2d()
+ np.remove_node()
+ Assert.assert_render2d()
+
+ def test_aspect2d(self):
+ np = NodePath('new node')
+ np.reparent_to(aspect2d)
+ with self.assertRaises(Exception):
+ Assert.assert_aspect2d()
+ np.remove_node()
+ Assert.assert_aspect2d()
+
+ def test_events(self):
+ d = DirectObject()
+ d.accept('new event', self.__accept)
+ with self.assertRaises(Exception):
+ Assert.assert_events()
+ d.ignore('new event')
+ Assert.assert_events()
+
+ def __accept(): pass
+
+ def test_tasks(self):
+ t = taskMgr.add(self.__new_task)
+ with self.assertRaises(Exception):
+ Assert.assert_tasks()
+ taskMgr.remove(t)
+ Assert.assert_tasks()
+
+ def __new_task(self, task):
+ return task.again
+
+ def test_threads(self):
+ self.__flag = False
+ t = Thread(target=self.__while_flag)
+ t.start()
+ with self.assertRaises(Exception):
+ Assert.assert_threads()
+ self.__flag = True
+ t.join()
+ Assert.assert_threads()
+
+ def __while_flag(self):
+ while not self.__flag: pass
import threading
-def assert_render3d():
- preserve = ['camera', 'DIRECT']
- for child in render.children:
- if child.name not in preserve:
- render.ls()
- msg = 'unexpected render3d node: %s' % child.name
- raise Exception(msg)
-
-def assert_aspect2d():
- preserve = [
- 'a2dBackground', 'a2dTopCenter', 'a2dTopCenterNS',
- 'a2dBottomCenter', 'a2dBottomCenterNS', 'a2dLeftCenter',
- 'a2dLeftCenterNS', 'a2dRightCenter', 'a2dRightCenterNS',
- 'a2dTopLeft', 'a2dTopLeftNS', 'a2dTopRight',
- 'a2dTopRightNS', 'a2dBottomLeft', 'a2dBottomLeftNS',
- 'a2dBottomRight', 'a2dBottomRightNS', 'test_txt']
- for child in aspect2d.children:
- if child.name not in preserve and not child.has_python_tag('preserve'):
- aspect2d.ls()
- msg = 'unexpected aspect2d node: %s' % child.name
- raise Exception(msg)
-
-def assert_render2d():
- for child in render2d.children:
+
+class Assert:
+
+ @staticmethod
+ def assert_render3d():
+ preserve = ['camera', 'DIRECT']
+ unexpected_nodes = [c for c in render.children if child.name not in preserve]
+ for node in unexpected_nodes: Assert.__process_render3d_exception(node)
+
+ @staticmethod
+ def __process_render3d_exception(node):
+ render.ls()
+ message = f'unexpected render3d node: {node.name}'
+ raise Exception(message)
+
+ @staticmethod
+ def assert_aspect2d():
+ preserve = [
+ 'a2dBackground', 'a2dTopCenter', 'a2dTopCenterNS',
+ 'a2dBottomCenter', 'a2dBottomCenterNS', 'a2dLeftCenter',
+ 'a2dLeftCenterNS', 'a2dRightCenter', 'a2dRightCenterNS',
+ 'a2dTopLeft', 'a2dTopLeftNS', 'a2dTopRight',
+ 'a2dTopRightNS', 'a2dBottomLeft', 'a2dBottomLeftNS',
+ 'a2dBottomRight', 'a2dBottomRightNS', 'test_txt']
+ unexpected_nodes = [c for c in aspect2d.children
+ if c.name not in preserve and not c.has_python_tag('preserve')]
+ for node in unexpected_nodes: Assert.__process_aspect2d_exception(node)
+
+ @staticmethod
+ def __process_aspect2d_exception(node):
+ aspect2d.ls()
+ message = f'unexpected aspect2d node: {node.name}'
+ raise Exception(message)
+
+ @staticmethod
+ def assert_render2d():
preserve = ['aspect2d', 'pixel2d', 'camera2d']
- if child.name not in preserve and not child.has_python_tag('preserve'):
- render2d.ls()
- msg = 'unexpected render2d node: %s' % child.name
- raise Exception(msg)
-
-def assert_events():
- preserve = ['window-event', 'window-closed', 'async_loader_0',
- 'render-texture-targets-changed', 'aspectRatioChanged']
- for evt in messenger.getEvents():
- if evt not in preserve:
- if (acc := messenger.who_accepts(evt)):
- for key, _acc in acc.items():
- if _acc[2]:
- print('the event %s accepted by <%s, %s> is persistent' % (evt, key, _acc[0]))
- else:
- msg = 'unexpected event: %s, %s' % (evt, str(acc))
- raise Exception(msg)
-
-def assert_tasks():
- preserve = [
- 'ivalLoop', 'garbageCollectStates', 'collisionLoop',
- 'igLoop', 'audioLoop', 'resetPrevTransform', 'dataLoop',
- 'eventManager', 'simplepbr update', 'on frame music',
- 'assert_fps', 'DIRECTContextTask']
- for task in taskMgr.getTasks() + taskMgr.getDoLaters():
- if task.name not in preserve and not hasattr(task, 'preserve'):
- msg = 'unexpected task: %s' % task.name
- raise Exception(msg)
-
-def assert_buffers():
- pass
- #for buffer in RenderToTexture.buffers:
- # raise Error()
-
-def assert_threads():
- thr_names = [thread.name for thread in threading.enumerate()]
- preserve = ['MainThread', 'rpc_server']
- for thr_name in thr_names:
- if thr_name not in preserve:
- msg = 'unexpected thread: %s' % thr_name
- raise Exception(msg)
+ unexpected_nodes = [c for c in render2d.children if c.name not in preserve and not c.has_python_tag('preserve')]
+ for node in unexpected_nodes: Assert.__process_render2d_exception(node)
+
+ @staticmethod
+ def __process_render2d_exception(self, node):
+ render2d.ls()
+ message = f'unexpected render2d node: {node.name}'
+ raise Exception(message)
+
+ @staticmethod
+ def assert_events():
+ preserve = ['window-event', 'window-closed', 'async_loader_0', 'async_loader_1',
+ 'render-texture-targets-changed', 'aspectRatioChanged']
+ unexpected_events = [e for e in messenger.getEvents() if e not in preserve]
+ for e in unexpected_events: Assert.__process_event_exception(e)
+
+ @staticmethod
+ def __process_event_exception(event):
+ if (acc := messenger.who_accepts(event)):
+ Assert.__process_event_acceptors(acc, event)
+
+ @staticmethod
+ def __process_event_acceptors(acceptors, event):
+ for key, a in acceptors.items(): Assert.__process_event_acceptor(key, a, event)
+
+ @staticmethod
+ def __process_event_acceptor(key, acceptor, event):
+ #if acceptor[2]:
+ # print(f'the event {event} accepted by <{key}, {acceptor[0]}> is persistent')
+ #else:
+ Assert.__actually_process_event_exception(acceptor, event)
+
+ @staticmethod
+ def __actually_process_event_exception(acceptor, event):
+ message = f'unexpected event: {event}, {str(acceptor)}'
+ raise Exception(message)
+
+ @staticmethod
+ def assert_tasks():
+ preserve = [
+ 'ivalLoop', 'garbageCollectStates', 'collisionLoop',
+ 'igLoop', 'audioLoop', 'resetPrevTransform', 'dataLoop',
+ 'eventManager', 'simplepbr update', 'on frame music',
+ 'assert_fps', 'DIRECTContextTask']
+ unexpected_tasks = [t for t in taskMgr.getTasks() + taskMgr.getDoLaters()
+ if t.name not in preserve and not hasattr(t, 'preserve')]
+ for t in unexpected_tasks: Assert.__process_task_exception(t)
+
+ @staticmethod
+ def __process_task_exception(task):
+ message = f'unexpected task: {task.name}'
+ raise Exception(message)
+
+ @staticmethod
+ def assert_buffers():
+ pass
+ #if RenderToTexture.buffers:
+ # raise Error()
+
+ @staticmethod
+ def assert_threads():
+ thread_names = [thread.name for thread in threading.enumerate()]
+ preserve = ['MainThread', 'rpc_server']
+ unexpected_tasks = [t for t in thread_names if t not in preserve]
+ for t in unexpected_tasks: Assert.__process_unexpected_thread(t)
+
+ @staticmethod
+ def __process_unexpected_thread(thread_name):
+ message = f'unexpected thread: {thread_name}'
+ raise Exception(message)