ya2 · news · projects · code · about

refactoring of functional tests
[pmachines.git] / lib / build / mtprocesser.py
1 '''A multi-threaded command dispatcher.'''
2 from datetime import datetime
3 from multiprocessing import cpu_count
4 from threading import Thread, RLock
5 from os import system
6 from logging import info, debug
7
8
9 class ProcesserThread(Thread):
10 '''A thread which asynchronously processes commands from a command list.'''
11
12 def __init__(self, cmd_lst, lock):
13 '''The constructor.
14 cmd_lst: commands that can be exectuted from serveral processers
15 lock: shared lock between processers for accessing the command list'''
16 Thread.__init__(self)
17 self.cmd_lst = cmd_lst
18 self.lock = lock
19
20 def run(self):
21 '''The thread's logics.'''
22 while True:
23 with self.lock:
24 if not self.cmd_lst:
25 return
26 cmd = self.cmd_lst.pop(0)
27 info('%s %s' % (datetime.now().strftime("%H:%M:%S"), cmd))
28 system(cmd)
29
30
31 class SyncProcesser:
32 '''Synchronously processes a command list.'''
33
34 def __init__(self, cmd_lst):
35 '''The constructor.
36 cmd_lst: the list that must be executed'''
37 self.cmd_lst = cmd_lst
38
39 def run(self):
40 '''The processer's logics.'''
41 for cmd in self.cmd_lst:
42 before_str = datetime.now().strftime("(executing) %H:%M:%S")
43 info('%s %s' % (before_str, cmd))
44 system(cmd)
45 after_str = datetime.now().strftime("(executed) %H:%M:%S")
46 debug('%s %s' % (after_str, cmd))
47
48
49 class ProcesserMgr:
50 '''Synchronously processes commands that are submitted (eventually) using
51 multiple threads.'''
52
53 def __init__(self, cores):
54 '''The constructor.
55 cores: how many cpu cores are used to process the commands'''
56 try:
57 self.cores = cpu_count()
58 except NotImplementedError:
59 self.cores = 1
60 self.cores = cores if cores else int(self.cores / 4 + 1)
61 debug('processer-mgr: using %s cores' % self.cores)
62 self.cmd_lst = []
63
64 def add(self, cmd):
65 '''Adds cmd to the list that will be processed.'''
66 self.cmd_lst += [cmd]
67
68 def run(self):
69 '''Performs the commands that have been added.'''
70 if self.cores != 1:
71 threads, lock = [], RLock()
72 threads = [ProcesserThread(self.cmd_lst, lock)
73 for _ in range(self.cores)]
74 [thread.start() for thread in threads]
75 [thread.join() for thread in threads]
76 else:
77 SyncProcesser(self.cmd_lst).run()