From eca48715049b05b506a92d57ebb06c346561e781 Mon Sep 17 00:00:00 2001
From: Paul McCarthy <pauld.mccarthy@gmail.com>
Date: Tue, 26 Jul 2016 15:10:58 +0100
Subject: [PATCH] async.TaskThread allows a finish callback function to be
 specified. And TaskThread also doesn't crash when an enqueued task raises an
 error.

---
 fsl/data/imagewrapper.py |  6 +++--
 fsl/utils/async.py       | 52 +++++++++++++++++++++++++++-------------
 2 files changed, 40 insertions(+), 18 deletions(-)

diff --git a/fsl/data/imagewrapper.py b/fsl/data/imagewrapper.py
index 2651e144e..066469b33 100644
--- a/fsl/data/imagewrapper.py
+++ b/fsl/data/imagewrapper.py
@@ -457,7 +457,8 @@ class ImageWrapper(notifier.Notifier):
         else:
             name = '{}_read_{}'.format(id(self), slices)
             if not self.__taskThread.isQueued(name):
-                self.__taskThread.enqueue(name, self.__expandCoverage, slices)
+                self.__taskThread.enqueue(
+                    name, self.__expandCoverage, None, slices)
 
 
     def __updateDataRangeOnWrite(self, slices, data):
@@ -513,7 +514,8 @@ class ImageWrapper(notifier.Notifier):
         else:
             name = '{}_write_{}'.format(id(self), slices)
             if not self.__taskThread.isQueued(name):
-                self.__taskThread.enqueue(name, self.__expandCoverage, slices)
+                self.__taskThread.enqueue(
+                    name, self.__expandCoverage, None, slices)
 
             
     def __getitem__(self, sliceobj):
diff --git a/fsl/utils/async.py b/fsl/utils/async.py
index 0f18f8849..51e6ae5cd 100644
--- a/fsl/utils/async.py
+++ b/fsl/utils/async.py
@@ -381,12 +381,13 @@ class Task(object):
     """Container object which encapsulates a task that is run by a
     :class:`TaskThread`.
     """
-    def __init__(self, name, func, args, kwargs):
-        self.name    = name
-        self.func    = func
-        self.args    = args
-        self.kwargs  = kwargs
-        self.enabled = True
+    def __init__(self, name, func, onFinish, args, kwargs):
+        self.name     = name
+        self.func     = func
+        self.onFinish = onFinish
+        self.args     = args
+        self.kwargs   = kwargs
+        self.enabled  = True
 
 
 class TaskThread(threading.Thread):
@@ -409,21 +410,28 @@ class TaskThread(threading.Thread):
         log.debug('New task thread')
 
 
-    def enqueue(self, name, func, *args, **kwargs):
+    def enqueue(self, name, func, onFinish, *args, **kwargs):
         """Enqueue a task to be executed.
 
-        :arg name: Task name. Does not necessarily have to be a string,
-                    but must be hashable.
-        :arg func: The task function.
+        :arg name:     Task name. Does not necessarily have to be a string,
+                       but must be hashable.
+        :arg func:     The task function.
+        :arg onFinish: An optional function to be called (via :func:`idle`)
+                       when the task funtion has finished.
 
         All other arguments will be passed through to the task when it is
         executed.
+
+        .. note:: If the specified ``name`` is not unique (i.e. another task
+                  with the same name may already be enqueued), the
+                  :meth:`isQueued` method will probably return invalid
+                  results.
         """
 
         log.debug('Enqueueing task: {} [{}]'.format(
             name, getattr(func, '__name__', '<unknown>')))
 
-        t = Task(name, func, args, kwargs)
+        t = Task(name, func, onFinish, args, kwargs)
         self.__enqueued[name] = t
         self.__q.put(t)
 
@@ -485,11 +493,23 @@ class TaskThread(threading.Thread):
                 task.name,
                 getattr(task.func, '__name__', '<unknown>')))
 
-            task.func(*task.args, **task.kwargs)
-
-            log.debug('Task completed: {} [{}]'.format(
-                task.name,
-                getattr(task.func, '__name__', '<unknown>')))
+            try:
+                task.func(*task.args, **task.kwargs)
+
+                if task.onFinish is not None:
+                    idle(task.onFinish)
+
+                log.debug('Task completed: {} [{}]'.format(
+                    task.name,
+                    getattr(task.func, '__name__', '<unknown>')))
+                
+            except Exception as e:
+                log.debug('Task crashed: {} [{}]: {}: {}'.format(
+                    task.name,
+                    getattr(task.func, '__name__', '<unknown>'),
+                    type(e).__name__,
+                    str(e)),
+                    exc_info=True)
 
         self.__q        = None
         self.__enqueued = None
-- 
GitLab