#-* encoding: utf8 -*-
"""Command Pattern"""
from threading import Thread, Lock
from Queue import Queue
from log import get_logger
import unittest
LOG = get_logger(__name__)
[docs]class Command(object):
def __init__(self, func, args=None, description=None):
"""Store the callable and its arguments.
Example:
>>> inspect_command_class = Command(dir, Command)
>>> inspect_command_class()
['__init__', '__call__', ...]
"""
assert callable(func), "%r is not callable" % func
self.func = func
self.args = args or tuple()
self.description = description or ""
def __call__(self):
"""Call the command.
"""
return apply(self.func, self.args)
def __repr__(self):
return "%s(<%s %s>)" % (self.__class__.__name__, self.func.__name__,
repr(self.args))
[docs]class CommandExecutor(Thread):
"""Execute commands in another thread.
Call hooks before and after executing commands, and before and after
processing the queue
"""
def __init__(self, delegate=None):
"Initialize"
super(CommandExecutor, self).__init__()
self.executor = Executor()
self.executor.delegate = self
self.size = 0
self.delegate = delegate
self._lock = Lock()
self.setDaemon(True)
[docs] def started(self):
"Execution started"
if self.delegate is not None:
with self._lock:
self.delegate.on_execution_started(self)
[docs] def ended(self):
"Execution ended"
if self.delegate is not None:
with self._lock:
self.delegate.on_execution_ended(self)
[docs] def command_execution_started(self, command):
"Command started hook"
if self.delegate is not None:
with self._lock:
self.delegate.on_command_start(command)
[docs] def command_execution_ended(self, command):
"Command ended hook"
if self.delegate is not None:
with self._lock:
self.delegate.on_command_end(command, self.size,
self.executor.items_done)
[docs] def add_command(self, func, args=None, description=None):
"""Create a Command instance and add it to the stack.
"""
command = Command(func, args, description)
f = self.executor.submit(command)
return f
[docs] def execute_all(self, commands):
self.size = len(commands)
for command in commands:
self.add_command(*command)
[docs] def run(self):
self.started()
self.executor.start()
self.executor.join()
self.ended()
[docs]class Executor(Thread):
def __init__(self, q=None):
super(Executor, self).__init__()
if q is None:
q = Queue()
self.q = q
self.items_done = 0
self.delegate = None
self.setDaemon(True)
[docs] def submit(self, item):
self.q.put(item)
[docs] def run(self):
while not self.q.empty():
item = self.q.get()
if self.delegate is not None:
self.delegate.command_execution_started(item)
item()
if self.delegate is not None:
self.delegate.command_execution_ended(item)
self.items_done += 1
[docs]class CommandTestCase(unittest.TestCase):
[docs] def testCommand(self):
command = Command(lambda x, y: x + y, (1, 2))
self.assertEquals(command(), 3)
[docs]class CommandExecutorTestCase(unittest.TestCase):
[docs] def testAddCommand(self):
"""Create a Command instance and add it to the stack.
"""
queue = Queue()
ce = CommandExecutor()
func = lambda x: x + 1
ce.add_command(func, (1,))
ce.start()
ce.join()
[docs] def testDelegate(self):
whitness = []
class Delegate:
def on_execution_started(self, ce):
whitness.append(1)
def on_execution_ended(self, ce):
whitness.append(2)
def on_command_start(self, command):
whitness.append(3)
def on_command_end(self, command, total, done):
whitness.append(4)
ce = CommandExecutor()
ce.delegate = Delegate()
commands = [ (lambda x: x+1, (1,), dict()) for _ in xrange(2) ]
ce.execute_all(commands)
ce.start()
ce.join()
self.assertEquals(sorted(whitness), [1, 2, 3, 3, 4, 4])
if __name__ == "__main__":
unittest.main()