Source code for vinstall.core.command

#-* encoding: utf8 -*-


"""Command Pattern"""


from threading import Thread, Lock
from Queue import Queue
from log import get_logger
import unittest


LOG = get_logger(__name__)


[docs]class Command(object): def __init__(self, func, args=None, description=None): """Store the callable and its arguments. Example: >>> inspect_command_class = Command(dir, Command) >>> inspect_command_class() ['__init__', '__call__', ...] """ assert callable(func), "%r is not callable" % func self.func = func self.args = args or tuple() self.description = description or "" def __call__(self): """Call the command. """ return apply(self.func, self.args) def __repr__(self): return "%s(<%s %s>)" % (self.__class__.__name__, self.func.__name__, repr(self.args))
[docs]class CommandExecutor(Thread): """Execute commands in another thread. Call hooks before and after executing commands, and before and after processing the queue """ def __init__(self, delegate=None): "Initialize" super(CommandExecutor, self).__init__() self.executor = Executor() self.executor.delegate = self self.size = 0 self.delegate = delegate self._lock = Lock() self.setDaemon(True)
[docs] def started(self): "Execution started" if self.delegate is not None: with self._lock: self.delegate.on_execution_started(self)
[docs] def ended(self): "Execution ended" if self.delegate is not None: with self._lock: self.delegate.on_execution_ended(self)
[docs] def command_execution_started(self, command): "Command started hook" if self.delegate is not None: with self._lock: self.delegate.on_command_start(command)
[docs] def command_execution_ended(self, command): "Command ended hook" if self.delegate is not None: with self._lock: self.delegate.on_command_end(command, self.size, self.executor.items_done)
[docs] def add_command(self, func, args=None, description=None): """Create a Command instance and add it to the stack. """ command = Command(func, args, description) f = self.executor.submit(command) return f
[docs] def execute_all(self, commands): self.size = len(commands) for command in commands: self.add_command(*command)
[docs] def run(self): self.started() self.executor.start() self.executor.join() self.ended()
[docs]class Executor(Thread): def __init__(self, q=None): super(Executor, self).__init__() if q is None: q = Queue() self.q = q self.items_done = 0 self.delegate = None self.setDaemon(True)
[docs] def submit(self, item): self.q.put(item)
[docs] def run(self): while not self.q.empty(): item = self.q.get() if self.delegate is not None: self.delegate.command_execution_started(item) item() if self.delegate is not None: self.delegate.command_execution_ended(item) self.items_done += 1
[docs]class CommandTestCase(unittest.TestCase):
[docs] def testCommand(self): command = Command(lambda x, y: x + y, (1, 2)) self.assertEquals(command(), 3)
[docs]class CommandExecutorTestCase(unittest.TestCase):
[docs] def testAddCommand(self): """Create a Command instance and add it to the stack. """ queue = Queue() ce = CommandExecutor() func = lambda x: x + 1 ce.add_command(func, (1,)) ce.start() ce.join()
[docs] def testDelegate(self): whitness = [] class Delegate: def on_execution_started(self, ce): whitness.append(1) def on_execution_ended(self, ce): whitness.append(2) def on_command_start(self, command): whitness.append(3) def on_command_end(self, command, total, done): whitness.append(4) ce = CommandExecutor() ce.delegate = Delegate() commands = [ (lambda x: x+1, (1,), dict()) for _ in xrange(2) ] ce.execute_all(commands) ce.start() ce.join() self.assertEquals(sorted(whitness), [1, 2, 3, 3, 4, 4])
if __name__ == "__main__": unittest.main()