Skip to content
Snippets Groups Projects
Forked from FSL / fslpy
2652 commits behind the upstream repository.
async.py 9.44 KiB
#!/usr/bin/env python
#
# async.py - Run a function in a separate thread.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides functions for running tasks asynchronously.


.. note:: The functions in this module are intended to be run from within a
          ``wx`` application. However, they will still work without ``wx``,
          albeit with slightly modified behaviour.


The :func:`run` function simply runs a task in a separate thread.  This
doesn't seem like a worthy task to have a function of its own, but the
:func:`run` function additionally provides the ability to schedule another
function to run on the ``wx.MainLoop`` when the original function has
completed.  This therefore gives us a simple way to run a computationally
intensitve task off the main GUI thread (preventing the GUI from locking up),
and to perform some clean up/refresh afterwards.


The :func:`idle` function is a simple way to run a task on an ``wx``
``EVT_IDLE`` event handler. This effectively performs the same job as the
:func:`run` function, but is more suitable for short tasks which do not
warrant running in a separate thread.

The :func:`wait` function is given one or more ``Thread`` instances, and a
task to run. It waits until all the threads have finished, and then runs
the task (via :func:`idle`).

.. todo:: You could possibly use ``props.callqueue`` to drive the idle loop.
"""


import time
import logging
import threading
import collections

try:    import queue
except: import Queue as queue


log = logging.getLogger(__name__)


def _haveWX():
    """Returns ``True`` if wqe are running within a ``wx`` application,
    ``False`` otherwise.
    """
    
    try:
        import wx
        return wx.GetApp() is not None
    
    except ImportError:
        return False


def run(task, onFinish=None, onError=None, name=None):
    """Run the given ``task`` in a separate thread.

    :arg task:     The function to run. Must accept no arguments.

    :arg onFinish: An optional function to schedule (on the ``wx.MainLoop``,
                   via :func:`idle`) once the ``task`` has finished.

    :arg onError:  An optional function to be called (on the ``wx.MainLoop``,
                   via :func:`idle`) if the ``task`` raises an error. Passed
                   the ``Exception`` that was raised.

    :arg name:     An optional name to use for this task in log statements.

    :returns: A reference to the ``Thread`` that was created.

    .. note:: If a ``wx`` application is not running, the ``task`` and
              ``onFinish`` functions will simply be called directly, and
              the return value will be ``None``.
    """

    if name is None:
        name = getattr(task, '__name__', '<unknown>')

    haveWX = _haveWX()

    # Calls the onFinish or onError handler
    def callback(cb, *args, **kwargs):
        
        if cb is None:
            return
        
        if haveWX: idle(cb, *args, **kwargs)
        else:      cb(      *args, **kwargs)

    # Runs the task, and calls 
    # callback functions as needed.
    def wrapper():

        try:
            task()
            log.debug('Task "{}" finished'.format(name))
            callback(onFinish) 
            
        except Exception as e:
            
            log.warn('Task "{}" crashed'.format(name), exc_info=True)
            callback(onError, e)

    # If WX, run on a thread
    if haveWX:
        
        log.debug('Running task "{}" on thread'.format(name))

        thread = threading.Thread(target=wrapper)
        thread.start()
        return thread

    # Otherwise run directly
    else:
        log.debug('Running task "{}" directly'.format(name))
        wrapper()
        return None


_idleRegistered = False
"""Boolean flag indicating whether the :func:`wxIdleLoop` function has
been registered as a ``wx.EVT_IDLE`` event handler. Checked and set
in the :func:`idle` function.
"""


_idleQueue = queue.Queue()
"""A ``Queue`` of functions which are to be run on the ``wx.EVT_IDLE``
loop.
"""


_idleQueueSet = set()
"""A ``set`` containing the names of all named tasks which are
currently queued on the idle loop (see the ``name`` parameter to the
:func:`idle` function).
"""


class IdleTask(object):
    """Container object used by the :func:`idle` and :func:`_wxIdleLoop`
    functions.
    """

    def __init__(self,
                 name,
                 task,
                 schedtime,
                 after,
                 timeout,
                 args,
                 kwargs):
        self.name      = name
        self.task      = task
        self.schedtime = schedtime
        self.after     = after
        self.timeout   = timeout
        self.args      = args
        self.kwargs    = kwargs



def _wxIdleLoop(ev):
    """Function which is called on ``wx.EVT_IDLE`` events. If there
    is a function on the :attr:`_idleQueue`, it is popped and called.
    """

    global _idleQueue
    global _idleQueueSet
        
    ev.Skip()

    try:
        task = _idleQueue.get_nowait()
    except queue.Empty:
        return

    now     = time.time()
    elapsed = now - task.schedtime

    # Has enouggh time elapsed
    # since the task was scheduled?
    # If not, re-queue the task.
    if elapsed < task.after:
        log.debug('Re-queueing function ({}) on wx idle '
                  'loop'.format(getattr(task.task, '__name__', '<unknown>'))) 
        _idleQueue.put_nowait(task)

    # Has the task timed out?
    elif task.timeout == 0 or (elapsed < task.timeout):
        
        log.debug('Running function ({}) on wx idle '
                  'loop'.format(getattr(task.task, '__name__', '<unknown>')))
        task.task(*task.args, **task.kwargs)

        if task.name is not None:
            _idleQueueSet.discard(task.name)

    if _idleQueue.qsize() > 0:
        ev.RequestMore()


def inIdle(taskName):
    """Returns ``True`` if a task with the given name is queued on the
    idle loop (or is currently running), ``False`` otherwise. 
    """
    global _idleQueueSet
    return taskName in _idleQueueSet
    

def idle(task, *args, **kwargs):
    """Run the given task on a ``wx.EVT_IDLE`` event.

    :arg task:    The task to run.

    :arg name:    Optional. If provided, must be provided as a keyword
                  argument. Specifies a name that can be used to query
                  the state of this task via the :func:`inIdle` function.

    :arg after:   Optional. If provided, must be provided as a keyword
                  argument. A time, in seconds, which specifies the amount
                  of time to wait before running this task after it has
                  been scheduled.

    :arg timeout: Optional. If provided, must be provided as a keyword
                  argument. Specifies a time out, in seconds. If this
                  amount of time passes before the function gets
                  scheduled to be called on the idle loop, the function
                  is not called, and is dropped from the queue.

    
    All other arguments are passed through to the task function.

    
    If a ``wx.App`` is not running, the ``after`` and ``timeout`` arguments
    are ignored, and the task is called directly.


    .. note:: If the ``after`` argument is used, there is no guarantee that
              the task will be executed in the order that it is scheduled.
              This is because, if the required time has not elapsed when
              the task is poppsed from the queue, it will be re-queued.
    """

    global _idleRegistered
    global _idleQueue
    global _idleQueueSet

    schedtime = time.time()
    timeout   = kwargs.pop('timeout', 0)
    after     = kwargs.pop('after',   0)
    name      = kwargs.pop('name',    None)

    if _haveWX():
        import wx

        if not _idleRegistered:
            wx.GetApp().Bind(wx.EVT_IDLE, _wxIdleLoop)
            _idleRegistered = True

        log.debug('Scheduling idle task ({}) on wx idle '
                  'loop'.format(getattr(task, '__name__', '<unknown>')))

        idleTask = IdleTask(name,
                            task,
                            schedtime,
                            after,
                            timeout,
                            args,
                            kwargs)

        _idleQueue.put_nowait(idleTask)

        if name is not None:
            _idleQueueSet.add(name)
            
    else:
        log.debug('Running idle task directly') 
        task(*args, **kwargs)


def wait(threads, task, *args, **kwargs):
    """Creates and starts a new ``Thread`` which waits for all of the ``Thread``
    instances to finsih (by ``join``ing them), and then runs the given
    ``task`` via :func:`idle`.

    If a ``wx.App`` is not running, this function ``join``s the threads
    directly instead of creating a new ``Thread`` to do so.

    :arg threads: A ``Thread``, or a sequence of ``Thread`` instances to
                  join. Elements in the sequence may be ``None``.

    :arg task:    The task to run.

    All other arguments are passed to the ``task`` function.
    """

    if not isinstance(threads, collections.Sequence):
        threads = [threads]
    
    haveWX = _haveWX()

    def joinAll():
        log.debug('Wait thread joining on all targets')
        for t in threads:
            if t is not None:
                t.join()

        log.debug('Wait thread scheduling task on idle loop')
        idle(task, *args, **kwargs)

    if haveWX:
        thread = threading.Thread(target=joinAll)
        thread.start()
        return thread
    
    else:
        joinAll()
        return None