idle.py 30 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
#
# idle.py - Run functions on an idle loop or in a separate thread.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides functions and classes for running tasks
asynchronously, either in an idle loop, or on a separate thread.


.. note:: The *idle* functions in this module are intended to be run from
          within a ``wx`` application. However, they will still work without
          ``wx``, albeit with slightly modified behaviour.


Idle tasks
----------

.. autosummary::
   :nosignatures:

Paul McCarthy's avatar
Paul McCarthy committed
22
   block
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
   idle
   idleWhen
   inIdle
   cancelIdle
   idleReset
   getIdleTimeout
   setIdleTimeout


The :func:`idle` function is a simple way to run a task on an ``wx``
``EVT_IDLE`` event handler. This effectively performs the same job as the
:func:`run` function, but is more suitable for short tasks which do not
warrant running in a separate thread.


The ``EVT_IDLE`` event is generated automatically by ``wx``. However, there
are some circumstances in which ``EVT_IDLE`` will not be generated, and
pending events may be left on the queue. For this reason, the
:func:`_wxIdleLoop` will occasionally use a ``wx.Timer`` to ensure that it
continues to be called. The time-out used by this ``Timer`` can be queried
and set via the :func:`getIdleTimeout` and :func:`setIdleTimeout` functions.


Thread tasks
------------

.. autosummary::
   :nosignatures:

   run
   wait
   TaskThread


The :func:`run` function simply runs a task in a separate thread.  This
doesn't seem like a worthy task to have a function of its own, but the
:func:`run` function additionally provides the ability to schedule another
function to run on the ``wx.MainLoop`` when the original function has
completed.  This therefore gives us a simple way to run a computationally
intensitve task off the main GUI thread (preventing the GUI from locking up),
and to perform some clean up/refresh afterwards.


The :func:`wait` function is given one or more ``Thread`` instances, and a
task to run. It waits until all the threads have finished, and then runs
the task (via :func:`idle`).


The :class:`TaskThread` class is a simple thread which runs a queue of tasks.


Other facilities
----------------


The ``idle`` module also defines the :func:`mutex` decorator, which is
intended to be used to mark the methods of a class as being mutually exclusive.
The ``mutex`` decorator uses the :class:`MutexFactory` class to do its work.
"""


import time
import atexit
import logging
import functools
import threading
import collections

try:                import queue
except ImportError: import Queue as queue


log = logging.getLogger(__name__)


def run(task, onFinish=None, onError=None, name=None):
    """Run the given ``task`` in a separate thread.

    :arg task:     The function to run. Must accept no arguments.

    :arg onFinish: An optional function to schedule (on the ``wx.MainLoop``,
                   via :func:`idle`) once the ``task`` has finished.

    :arg onError:  An optional function to be called (on the ``wx.MainLoop``,
                   via :func:`idle`) if the ``task`` raises an error. Passed
                   the ``Exception`` that was raised.

    :arg name:     An optional name to use for this task in log statements.

    :returns: A reference to the ``Thread`` that was created.

    .. note:: If a ``wx`` application is not running, the ``task`` and
              ``onFinish`` functions will simply be called directly, and
              the return value will be ``None``.
    """

    from fsl.utils.platform import platform as fslplatform

    if name is None:
        name = getattr(task, '__name__', '<unknown>')

    haveWX = fslplatform.haveGui

    # Calls the onFinish or onError handler
    def callback(cb, *args, **kwargs):

        if cb is None:
            return

        if haveWX: idle(cb, *args, **kwargs)
        else:      cb(      *args, **kwargs)

    # Runs the task, and calls
    # callback functions as needed.
    def wrapper():

        try:
            task()
            log.debug('Task "{}" finished'.format(name))
            callback(onFinish)

        except Exception as e:

            log.warning('Task "{}" crashed'.format(name), exc_info=True)
            callback(onError, e)

    # If WX, run on a thread
    if haveWX:

        log.debug('Running task "{}" on thread'.format(name))

        thread = threading.Thread(target=wrapper)
        thread.start()
        return thread

    # Otherwise run directly
    else:
        log.debug('Running task "{}" directly'.format(name))
        wrapper()
        return None


_idleRegistered = False
"""Boolean flag indicating whether the :func:`_wxIdleLoop` function has
been registered as a ``wx.EVT_IDLE`` event handler. Checked and set
in the :func:`idle` function.
"""


_idleQueue = queue.Queue()
"""A ``Queue`` of functions which are to be run on the ``wx.EVT_IDLE``
loop.
"""


_idleQueueDict = {}
"""A ``dict`` containing the names of all named tasks which are
currently queued on the idle loop (see the ``name`` parameter to the
:func:`idle` function).
"""


_idleTimer = None
"""A ``wx.Timer`` instance which is used to periodically trigger the
:func:`_wxIdleLoop` in circumstances where ``wx.EVT_IDLE`` events may not
be generated. This is created in the first call to :func:`idle`.
"""


_idleCallRate = 200
"""Minimum time (in milliseconds) between consecutive calls to
:func:`_wxIdleLoop`. If ``wx.EVT_IDLE`` events are not being fired, the
:attr:`_idleTimer` is used to maintain the idle loop at this rate.
"""


def idleReset():
    """Reset the internal :func:`idle` queue state.

    In a normal execution environment, this function will never need to be
    called.  However, in an execution environment where multiple ``wx.App``
    instances are created, run, and destroyed sequentially, this function
    will need to be called after each ``wx.App`` has been destroyed.
    Otherwise the ``idle`` function will not work during exeution of
    subsequent ``wx.App`` instances.
    """
    global _idleRegistered
    global _idleQueue
    global _idleQueueDict
    global _idleTimer
    global _idleCallRate

    if _idleTimer is not None:
        _idleTimer.Stop()

218
219
220
221
222
223
    # If we're atexit, the ref
    # to the queue module might
    # have been cleared.
    if queue is not None: newQueue = queue.Queue()
    else:                 newQueue = None

224
    _idleRegistered = False
225
    _idleQueue      = newQueue
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
    _idleQueueDict  = {}
    _idleTimer      = None
    _idleCallRate   = 200


# Call idleReset on exit, in
# case the idleTimer is active.
atexit.register(idleReset)


def getIdleTimeout():
    """Returns the current ``wx`` idle loop time out/call rate.
    """
    return _idleCallRate


def setIdleTimeout(timeout=None):
    """Set the ``wx`` idle loop time out/call rate. If ``timeout`` is not
    provided, or is set to ``None``, the timeout is set to 200 milliseconds.
    """

    global _idleCallRate

    if timeout is None:
        timeout = 200

    log.debug('Idle loop timeout changed to {}'.format(timeout))

    _idleCallRate = timeout


class IdleTask(object):
    """Container object used by the :func:`idle` and :func:`_wxIdleLoop`
    functions.
    """

    def __init__(self,
                 name,
                 task,
                 schedtime,
                 after,
                 timeout,
                 args,
                 kwargs):
        self.name      = name
        self.task      = task
        self.schedtime = schedtime
        self.after     = after
        self.timeout   = timeout
        self.args      = args
        self.kwargs    = kwargs


def _wxIdleLoop(ev):
    """Function which is called on ``wx.EVT_IDLE`` events, and occasionally
    on ``wx.EVT_TIMER` events via the :attr:`_idleTimer`. If there
    is a function on the :attr:`_idleQueue`, it is popped and called.

    .. note:: The ``wx.EVT_IDLE`` event is only triggered on user interaction
              (e.g. mouse movement). This means that a situation may arise
              whereby a function is queued via the :func:`idle` function, but
              no ``EVT_IDLE`` event gets generated. Therefore, the
              :attr:`_idleTimer` object is occasionally used to call this
              function as well.
    """

    import wx
    global _idleQueue
    global _idleQueueDict
    global _idleTimer
    global _idleCallRate

    ev.Skip()

    try:
        task = _idleQueue.get_nowait()

    except queue.Empty:

        # Make sure that we get called periodically,
        # if EVT_IDLE decides to stop firing. If
        # _idleTimer is None, then idleReset has
        # probably been called.
        if _idleTimer is not None:
            _idleTimer.Start(_idleCallRate, wx.TIMER_ONE_SHOT)
        return

    now             = time.time()
    elapsed         = now - task.schedtime
    queueSizeOffset = 0
    taskName        = task.name
    funcName        = getattr(task.task, '__name__', '<unknown>')

    if taskName is None: taskName = funcName
    else:                taskName = '{} [{}]'.format(taskName, funcName)

    # Has enough time elapsed
    # since the task was scheduled?
    # If not, re-queue the task.
    # If this is the only task on the
    # queue, the idle loop will be
    # called again after
    # _idleCallRate millisecs.
    if elapsed < task.after:
        log.debug('Re-queueing function ({}) on wx idle loop'.format(taskName))
        _idleQueue.put_nowait(task)
        queueSizeOffset = 1

    # Has the task timed out?
    elif task.timeout == 0 or (elapsed < task.timeout):

        log.debug('Running function ({}) on wx idle loop'.format(taskName))

        try:
            task.task(*task.args, **task.kwargs)
        except Exception as e:
            log.warning('Idle task {} crashed - {}: {}'.format(
                taskName, type(e).__name__, str(e)), exc_info=True)

        if task.name is not None:
            try:             _idleQueueDict.pop(task.name)
            except KeyError: pass

    # More tasks on the queue?
    # Request anotherd event
    if _idleQueue.qsize() > queueSizeOffset:
        ev.RequestMore()

    # Otherwise use the idle
    # timer to make sure that
    # the loop keeps ticking
    # over
    else:
        _idleTimer.Start(_idleCallRate, wx.TIMER_ONE_SHOT)


def inIdle(taskName):
    """Returns ``True`` if a task with the given name is queued on the
    idle loop (or is currently running), ``False`` otherwise.
    """
    global _idleQueueDict
    return taskName in _idleQueueDict


def cancelIdle(taskName):
    """If a task with the given ``taskName`` is in the idle queue, it
    is cancelled. If the task is already running, it cannot be cancelled.

    A ``KeyError`` is raised if no task called ``taskName`` exists.
    """

    global _idleQueueDict
    _idleQueueDict[taskName].timeout = -1


Paul McCarthy's avatar
Paul McCarthy committed
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def block(secs, delta=0.01):
    """Blocks for the specified number of seconds, yielding to the main ``wx``
    loop.

    If ``wx`` is not available, or a ``wx`` application is not running, this
    function is equivalent to ``time.sleep(secs)``.

    :arg secs:  Time in seconds to block
    :arg delta: Time in seconds to sleep between successive yields to ``wx``.
    """

    from fsl.utils.platform import platform as fslplatform

    if not fslplatform.haveGui:
        time.sleep(secs)
    else:
        import wx
        start = time.time()
        while (time.time() - start) < secs:
            wx.YieldIfNeeded()
            time.sleep(delta)


404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def idle(task, *args, **kwargs):
    """Run the given task on a ``wx.EVT_IDLE`` event.

    :arg task:         The task to run.

    :arg name:         Optional. If provided, must be provided as a keyword
                       argument. Specifies a name that can be used to query
                       the state of this task via the :func:`inIdle` function.

    :arg after:        Optional. If provided, must be provided as a keyword
                       argument. A time, in seconds, which specifies the
                       amount of time to wait before running this task after
                       it has been scheduled.

    :arg timeout:      Optional. If provided, must be provided as a keyword
                       argument. Specifies a time out, in seconds. If this
                       amount of time passes before the function gets
                       scheduled to be called on the idle loop, the function
                       is not called, and is dropped from the queue.

    :arg dropIfQueued: Optional. If provided, must be provided as a keyword
                       argument. If ``True``, and a task with the given
                       ``name`` is already enqueud, that function is dropped
                       from the queue, and the new task is enqueued. Defaults
                       to ``False``. This argument takes precedence over the
                       ``skipIfQueued`` argument.

    :arg skipIfQueued: Optional. If provided, must be provided as a keyword
                       argument. If ``True``, and a task with the given
                       ``name`` is already enqueud, (or is running), the
                       function is not called. Defaults to ``False``.

    :arg alwaysQueue:  Optional. If provided, must be provided as a keyword
                       argument. If ``True``, and a ``wx.MainLoop`` is not
                       running, the task is enqueued anyway, under the
                       assumption that a ``wx.MainLoop`` will be started in
                       the future. Note that, if ``wx.App`` has not yet been
                       created, another  call to ``idle`` must be made after
                       the app has been created for the original task to be
                       executed. If ``wx`` is not available, this parameter
                       will be ignored, and the task executed directly.


    All other arguments are passed through to the task function.


    If a ``wx.App`` is not running, the ``timeout``, ``name`` and
    ``skipIfQueued`` arguments are ignored. Instead, the call will sleep for
    ``after`` seconds, and then the ``task`` is called directly.


    .. note:: If the ``after`` argument is used, there is no guarantee that
              the task will be executed in the order that it is scheduled.
              This is because, if the required time has not elapsed when
              the task is popped from the queue, it will be re-queued.

    .. note:: If you schedule multiple tasks with the same ``name``, and you
              do not use the ``skipIfQueued`` or ``dropIfQueued`` arguments,
              all of those tasks will be executed, but you will only be able
              to query/cancel the most recently enqueued task.

    .. note:: You will run into difficulties if you schedule a function that
              expects/accepts its own keyword arguments called ``name``,
              ``skipIfQueued``, ``dropIfQueued``, ``after``, ``timeout``, or
              ``alwaysQueue``.
    """

    from fsl.utils.platform import platform as fslplatform

    global _idleRegistered
    global _idleTimer
    global _idleQueue
    global _idleQueueDict

    schedtime    = time.time()
    timeout      = kwargs.pop('timeout',      0)
    after        = kwargs.pop('after',        0)
    name         = kwargs.pop('name',         None)
    dropIfQueued = kwargs.pop('dropIfQueued', False)
    skipIfQueued = kwargs.pop('skipIfQueued', False)
    alwaysQueue  = kwargs.pop('alwaysQueue',  False)

    canHaveGui = fslplatform.canHaveGui
    haveGui    = fslplatform.haveGui

    # If there is no possibility of a
    # gui being available in the future,
    # then alwaysQueue is ignored.
    if haveGui or (alwaysQueue and canHaveGui):

        import wx
        app = wx.GetApp()

        # Register on the idle event
        # if an app is available
        #
        # n.b. The 'app is not None' test will
        # potentially fail in scenarios where
        # multiple wx.Apps have been instantiated,
        # as it may return a previously created
        # app.
        if (not _idleRegistered) and (app is not None):

            log.debug('Registering async idle loop')

            app.Bind(wx.EVT_IDLE, _wxIdleLoop)

            _idleTimer      = wx.Timer(app)
            _idleRegistered = True

            _idleTimer.Bind(wx.EVT_TIMER, _wxIdleLoop)

        if name is not None and inIdle(name):

            if dropIfQueued:

                # The cancelIdle function sets the old
                # task timeout to -1, so it won't get
                # executed. But the task is left in the
                # _idleQueue, and in the _idleQueueDict.
                # In the latter, the old task gets
                # overwritten with the new task below.
                cancelIdle(name)
                log.debug('Idle task ({}) is already queued - '
                          'dropping the old task'.format(name))

            elif skipIfQueued:
                log.debug('Idle task ({}) is already queued '
                          '- skipping it'.format(name))
                return

        log.debug('Scheduling idle task ({}) on wx idle '
                  'loop'.format(getattr(task, '__name__', '<unknown>')))

        idleTask = IdleTask(name,
                            task,
                            schedtime,
                            after,
                            timeout,
                            args,
                            kwargs)

        _idleQueue.put_nowait(idleTask)

        if name is not None:
            _idleQueueDict[name] = idleTask

    else:
        time.sleep(after)
        log.debug('Running idle task directly')
        task(*args, **kwargs)


def idleWhen(func, condition, *args, **kwargs):
    """Poll the ``condition`` function periodically, and schedule ``func`` on
    :func:`idle` when it returns ``True``.

    :arg func:      Function to call.

    :arg condition: Function which returns ``True`` or ``False``. The ``func``
                    function is only called when the ``condition`` function
                    returns ``True``.

    :arg pollTime:  Must be passed as a keyword argument. Time (in seconds) to
                    wait between successive calls to ``when``. Defaults to
                    ``0.2``.
    """

    pollTime = kwargs.get('pollTime', 0.2)

    if not condition():
        idle(idleWhen, func, condition, after=pollTime, *args, **dict(kwargs))
    else:
        kwargs.pop('pollTime', None)
        idle(func, *args, **kwargs)


def wait(threads, task, *args, **kwargs):
    """Creates and starts a new ``Thread`` which waits for all of the ``Thread``
Paul McCarthy's avatar
Paul McCarthy committed
583
    instances to finish (by ``join``ing them), and then runs the given
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
    ``task`` via :func:`idle`.

    If the ``direct`` parameter is ``True``, or a ``wx.App`` is not running,
    this function ``join``s the threads directly instead of creating a new
    ``Thread`` to do so.

    :arg threads:      A ``Thread``, or a sequence of ``Thread`` instances to
                       join. Elements in the sequence may be ``None``.

    :arg task:         The task to run once all ``threads`` have completed.

    :arg wait_direct:  Must be passed as a keyword argument.  If ``True``, this
                       function call will ``join`` all of the ``threads``, and
                       then call the ``task``. Otherwise (the default), this
                       function will create a new thread to ``join`` the
                       ``threads``, and will return immediately.


    All other arguments are passed to the ``task`` function.


    .. note:: This function will not support ``task`` functions which expect
              a keyword argument called ``wait_direct``.
    """

    from fsl.utils.platform import platform as fslplatform

    direct = kwargs.pop('wait_direct', False)

    if not isinstance(threads, collections.Sequence):
        threads = [threads]

    haveWX = fslplatform.haveGui

    def joinAll():
        log.debug('Wait thread joining on all targets')
        for t in threads:
            if t is not None:
                t.join()

        log.debug('Wait thread scheduling task on idle loop')
        idle(task, *args, **kwargs)

    if haveWX and not direct:
        thread = threading.Thread(target=joinAll)
        thread.start()
        return thread

    else:
        joinAll()
        return None


class Task(object):
    """Container object which encapsulates a task that is run by a
    :class:`TaskThread`.
    """
    def __init__(self, name, func, onFinish, args, kwargs):
        self.name     = name
        self.func     = func
        self.onFinish = onFinish
        self.args     = args
        self.kwargs   = kwargs
        self.enabled  = True


class TaskThreadVeto(Exception):
    """Task functions which are added to a :class:`TaskThread` may raise
    a ``TaskThreadVeto`` error to skip processing of the task's ``onFinish``
    handler (if one has been specified). See the :meth:`TaskThread.enqueue`
    method for more details.
    """
    pass


class TaskThread(threading.Thread):
    """The ``TaskThread`` is a simple thread which runs tasks. Tasks may be
    enqueued and dequeued.
    """


    def __init__(self, *args, **kwargs):
        """Create a ``TaskThread``. """

        threading.Thread.__init__(self, *args, **kwargs)

        self.__q        = queue.Queue()
        self.__enqueued = {}
        self.__stop     = False

        log.debug('New task thread')


    def enqueue(self, func, *args, **kwargs):
        """Enqueue a task to be executed.

        :arg func:     The task function.

        :arg taskName: Task name. Must be specified as a keyword
                       argument. Does not necessarily have to be a string, but
                       must be hashable. If you wish to use the :meth:`dequeue`
                       or :meth:`isQueued` methods, you must provide a task
                       name.

        :arg onFinish: An optional function to be called (via :func:`idle`)
                       when the task funtion has finished. Must be provided as
                       a keyword argument. If the ``func`` raises a
                       :class`TaskThreadVeto` error, this function will not
                       be called.

        All other arguments are passed through to the task function when it is
        executed.

        .. note:: If the specified ``taskName`` is not unique (i.e. another
                  task with the same name may already be enqueued), the
                  :meth:`isQueued` method will probably return invalid
                  results.

        .. warning:: Make sure that your task function is not expecting keyword
                     arguments called ``taskName`` or ``onFinish``!
        """

        name     = kwargs.pop('taskName', None)
        onFinish = kwargs.pop('onFinish', None)

        log.debug('Enqueueing task: {} [{}]'.format(
            name, getattr(func, '__name__', '<unknown>')))

        t = Task(name, func, onFinish, args, kwargs)
        self.__enqueued[name] = t
        self.__q.put(t)


    def isQueued(self, name):
        """Returns ``True`` if a task with the given name is enqueued,
        ``False`` otherwise.
        """
        return name  in self.__enqueued


    def dequeue(self, name):
        """Dequeues a previously enqueued task.

        :arg name: The task to dequeue.
        """
        task = self.__enqueued.get(name, None)
        if task is not None:

            log.debug('Dequeueing task: {}'.format(name))
            task.enabled = False


    def stop(self):
        """Stop the ``TaskThread`` after any currently running task has
        completed.
        """
        log.debug('Stopping task thread')
        self.__stop = True


    def waitUntilIdle(self):
        """Causes the calling thread to block until the task queue is empty.
        """
        self.__q.join()


    def run(self):
        """Run the ``TaskThread``. """

        while True:

            try:
                # Clear ref to previous task if any. This
                # is very important, because otherwise, if
                # no tasks get posted to the queue, this
                # loop will spin on queue.Empty exceptions,
                # and the previous Task object will preserve
                # a hanging ref to its function/method. Not
                # ideal if the ref is to a method of the
                # object which created this TaskThread, and
                # needs to be GC'd!
                task = None

                # An example: Without clearing the task
                # reference, the following code would
                # result in the TaskThread spinning on empty
                # forever, and would prevent the Blah
                # instance from being GC'd:
                #
                #     class Blah(object):
                #         def __init__(self):
                #             tt = TaskThraed()
                #             tt.enqueue(self.method)
                #             tt.start()
                #
                #     def method(self):
                #         pass
                #
                #     b = Blah()
                #     del b
                task = self.__q.get(timeout=1)

            except queue.Empty:
                continue

            # Any other error typically indicates
            # that this is a daemon thread, and
            # the TaskThread object has been GC'd
            except Exception:
                break

            finally:
                if self.__stop:
                    break

            self.__enqueued.pop(task.name, None)

            if not task.enabled:
                self.__q.task_done()
                continue

            log.debug('Running task: {} [{}]'.format(
                task.name,
                getattr(task.func, '__name__', '<unknown>')))

            try:
                task.func(*task.args, **task.kwargs)

                if task.onFinish is not None:
                    idle(task.onFinish)

                log.debug('Task completed: {} [{}]'.format(
                    task.name,
                    getattr(task.func, '__name__', '<unknown>')))

            # If the task raises a TaskThreadVeto error,
            # we just have to skip the onFinish handler
            except TaskThreadVeto:
                log.debug('Task completed (vetoed onFinish): {} [{}]'.format(
                    task.name,
                    getattr(task.func, '__name__', '<unknown>')))

            except Exception as e:
                log.warning('Task crashed: {} [{}]: {}: {}'.format(
                    task.name,
                    getattr(task.func, '__name__', '<unknown>'),
                    type(e).__name__,
                    str(e)),
                    exc_info=True)
            finally:
                self.__q.task_done()

        self.__q        = None
        self.__enqueued = None
        log.debug('Task thread finished')


def mutex(*args, **kwargs):
    """Decorator for use on methods of a class, which makes the method
    call mutually exclusive.

    If you define a class which has one or more methods that must only
    be accessed by one thread at a time, you can use the ``mutex`` decorator
    to enforce this restriction. As a contrived example::


        class Example(object):

            def __init__(self):
                self.__sharedData = []

            @mutex
            def dangerousMethod1(self, message):
                sefl.__sharedData.append(message)

            @mutex
            def dangerousMethod2(self):
                return sefl.__sharedData.pop()



    The ``@mutex`` decorator will ensure that, at any point in time, only
    one thread is running either of the ``dangerousMethod1`` or
    ``dangerousMethod2`` methods.

    See the :class:`MutexFactory``
    """
    return MutexFactory(*args, **kwargs)


class MutexFactory(object):
    """The ``MutexFactory`` is a placeholder for methods which have been
    decorated with the :func:`mutex` decorator. When the method of a class
    is decorated with ``@mutex``, a ``MutexFactory`` is created.

    Later on, when the method is accessed on an instance, the :meth:`__get__`
    method creates the true decorator function, and replaces the instance
    method with that decorator.

    .. note:: The ``MutexFactory`` adds an attribute called
              ``_async_mutex_lock`` to all instances that have
              ``@mutex``-decorated methods.
    """


889
890
891
892
893
894
895
896
897
898
899
    createLock = threading.Lock()
    """This lock is used by all ``MutexFactory`` instances when a decorated
    instance method is accessed for the first time.

    The first time that a mutexed method is accessed on an instance, a new
    ``threading.Lock`` is created, to be shared by all mutexed methods of that
    instance. The ``createLock`` is used to ensure that this can only occur
    once for each instance.
    """


900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
    def __init__(self, function):
        """Create a ``MutexFactory``.
        """
        self.__func = function


    def __get__(self, instance, cls):
        """When this ``MutexFactory`` is accessed through an instance,
        a decorator function is created which enforces mutually exclusive
        access to the decorated method. A single ``threading.Lock`` object
        is shared between all ``@mutex``-decorated methods on a single
        instance.

        If this ``MutexFactory`` is accessed through a class, the
        decorated function is returned.
        """

        # Class-level access
        if instance is None:
            return self.__func

921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
        # Get the lock object, creating if it necessary.
        # We use the createLock in case multiple threads
        # access a method at the same time, in which case
        # only one of them will be able to create the
        # instance lock.
        with MutexFactory.createLock:

            lock = getattr(instance, '_idle_mutex_lock', None)
            if lock is None:
                lock                      = threading.Lock()
                instance._idle_mutex_lock = lock

            # The true decorator function
            def decorator(*args, **kwargs):
                with instance._idle_mutex_lock:
                    return self.__func(instance, *args, **kwargs)

            # Replace this MutexFactory with
            # the decorator on the instance
            decorator = functools.update_wrapper(decorator, self.__func)
            setattr(instance, self.__func.__name__, decorator)
            return decorator