From cfac81392f7549e200b5186e0ef3b021a95b8de6 Mon Sep 17 00:00:00 2001 From: Paul McCarthy <pauld.mccarthy@gmail.com> Date: Wed, 29 Jun 2016 11:47:11 +0100 Subject: [PATCH] New thing in async module - the TaskThread, a simple task queue. --- fsl/utils/async.py | 158 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 149 insertions(+), 9 deletions(-) diff --git a/fsl/utils/async.py b/fsl/utils/async.py index ef6bfbfff..767c0c9e3 100644 --- a/fsl/utils/async.py +++ b/fsl/utils/async.py @@ -4,12 +4,40 @@ # # Author: Paul McCarthy <pauldmccarthy@gmail.com> # -"""This module provides functions for running tasks asynchronously. +"""This module provides functions and classes for running tasks +asynchronously, either in an idle loop, or on a separate thread. -.. note:: The functions in this module are intended to be run from within a - ``wx`` application. However, they will still work without ``wx``, - albeit with slightly modified behaviour. +.. note:: The *idle* functions in this module are intended to be run from + within a ``wx`` application. However, they will still work without + ``wx``, albeit with slightly modified behaviour. + + +Idle tasks +---------- + +.. autosummary:: + :nosignatures: + + idle + inIdle + + +The :func:`idle` function is a simple way to run a task on an ``wx`` +``EVT_IDLE`` event handler. This effectively performs the same job as the +:func:`run` function, but is more suitable for short tasks which do not +warrant running in a separate thread. + + +Thread tasks +------------ + +.. autosummary:: + :nosignatures: + + run + wait + TaskThread The :func:`run` function simply runs a task in a separate thread. This @@ -21,15 +49,14 @@ intensitve task off the main GUI thread (preventing the GUI from locking up), and to perform some clean up/refresh afterwards. -The :func:`idle` function is a simple way to run a task on an ``wx`` -``EVT_IDLE`` event handler. This effectively performs the same job as the -:func:`run` function, but is more suitable for short tasks which do not -warrant running in a separate thread. - The :func:`wait` function is given one or more ``Thread`` instances, and a task to run. It waits until all the threads have finished, and then runs the task (via :func:`idle`). + +The :class:`TaskThread` class is a simple thread which runs a queue of tasks. + + .. todo:: You could possibly use ``props.callqueue`` to drive the idle loop. """ @@ -324,3 +351,116 @@ def wait(threads, task, *args, **kwargs): else: joinAll() return None + + +class Task(object): + """Container object which encapsulates a task that is run by a + :class:`TaskThread`. + """ + def __init__(self, name, func, args, kwargs): + self.name = name + self.func = func + self.args = args + self.kwargs = kwargs + self.enabled = True + + +class TaskThread(threading.Thread): + """The ``TaskThread`` is a simple thread which runs tasks. Tasks may be + enqueued and dequeued. + + .. note:: + """ + + + def __init__(self, *args, **kwargs): + """Create a ``TaskThread`` """ + + threading.Thread.__init__(self, *args, **kwargs) + + self.__q = queue.Queue() + self.__enqueued = {} + self.__stop = False + + log.debug('New task thread') + + + def enqueue(self, name, func, *args, **kwargs): + """Enqueue a task to be executed. + + :arg name: Task name. Does not necessarily have to be a string, + but must be hashable. + :arg func: The task function. + + All other arguments will be passed through to the task when it is + executed. + """ + + log.debug('Enqueueing task: {} [{}]'.format( + name, getattr(func, '__name__', '<unknown>'))) + + t = Task(name, func, args, kwargs) + self.__enqueued[name] = t + self.__q.put(t) + + + def isQueued(self, name): + """Returns ``True`` if a task with the given name is enqueued, + ``False`` otherwise. + """ + return name in self.__enqueued + + + def dequeue(self, name): + """Dequeues a previously enqueued task. + + :arg name: The task to dequeue. + """ + task = self.__enqueued.get(name, None) + if task is not None: + + log.debug('Dequeueing task: {}'.format(name)) + task.enabled = False + + + def stop(self): + """Stop the ``TaskThread`` after any currently running task has + completed. + """ + log.debug('Stopping task thread') + self.__stop = True + + + def run(self): + """Run the ``TaskThread``. """ + + while True: + + try: + task = self.__q.get(timeout=1) + + except queue.Empty: + continue + + finally: + if self.__stop: + break + + self.__enqueued.pop(task.name, None) + + if not task.enabled: + continue + + log.debug('Running task: {} [{}]'.format( + task.name, + getattr(task.func, '__name__', '<unknown>'))) + + task.func(*task.args, **task.kwargs) + + log.debug('Task completed: {} [{}]'.format( + task.name, + getattr(task.func, '__name__', '<unknown>'))) + + self.__q = None + self.__enqueued = None + log.debug('Task thread finished') -- GitLab