diff --git a/fsl/data/imagewrapper.py b/fsl/data/imagewrapper.py index 2651e144e6cd4858ef54979e87dca06dd1ecca68..066469b332d6c6a1b8ba80183f826d36d19b4612 100644 --- a/fsl/data/imagewrapper.py +++ b/fsl/data/imagewrapper.py @@ -457,7 +457,8 @@ class ImageWrapper(notifier.Notifier): else: name = '{}_read_{}'.format(id(self), slices) if not self.__taskThread.isQueued(name): - self.__taskThread.enqueue(name, self.__expandCoverage, slices) + self.__taskThread.enqueue( + name, self.__expandCoverage, None, slices) def __updateDataRangeOnWrite(self, slices, data): @@ -513,7 +514,8 @@ class ImageWrapper(notifier.Notifier): else: name = '{}_write_{}'.format(id(self), slices) if not self.__taskThread.isQueued(name): - self.__taskThread.enqueue(name, self.__expandCoverage, slices) + self.__taskThread.enqueue( + name, self.__expandCoverage, None, slices) def __getitem__(self, sliceobj): diff --git a/fsl/utils/async.py b/fsl/utils/async.py index 0f18f884934ff62d810bd04e2d179f2425f1ca5f..51e6ae5cd396d42947f1984b10f9a96913cb8ef5 100644 --- a/fsl/utils/async.py +++ b/fsl/utils/async.py @@ -381,12 +381,13 @@ class Task(object): """Container object which encapsulates a task that is run by a :class:`TaskThread`. """ - def __init__(self, name, func, args, kwargs): - self.name = name - self.func = func - self.args = args - self.kwargs = kwargs - self.enabled = True + def __init__(self, name, func, onFinish, args, kwargs): + self.name = name + self.func = func + self.onFinish = onFinish + self.args = args + self.kwargs = kwargs + self.enabled = True class TaskThread(threading.Thread): @@ -409,21 +410,28 @@ class TaskThread(threading.Thread): log.debug('New task thread') - def enqueue(self, name, func, *args, **kwargs): + def enqueue(self, name, func, onFinish, *args, **kwargs): """Enqueue a task to be executed. - :arg name: Task name. Does not necessarily have to be a string, - but must be hashable. - :arg func: The task function. + :arg name: Task name. Does not necessarily have to be a string, + but must be hashable. + :arg func: The task function. + :arg onFinish: An optional function to be called (via :func:`idle`) + when the task funtion has finished. All other arguments will be passed through to the task when it is executed. + + .. note:: If the specified ``name`` is not unique (i.e. another task + with the same name may already be enqueued), the + :meth:`isQueued` method will probably return invalid + results. """ log.debug('Enqueueing task: {} [{}]'.format( name, getattr(func, '__name__', '<unknown>'))) - t = Task(name, func, args, kwargs) + t = Task(name, func, onFinish, args, kwargs) self.__enqueued[name] = t self.__q.put(t) @@ -485,11 +493,23 @@ class TaskThread(threading.Thread): task.name, getattr(task.func, '__name__', '<unknown>'))) - task.func(*task.args, **task.kwargs) - - log.debug('Task completed: {} [{}]'.format( - task.name, - getattr(task.func, '__name__', '<unknown>'))) + try: + task.func(*task.args, **task.kwargs) + + if task.onFinish is not None: + idle(task.onFinish) + + log.debug('Task completed: {} [{}]'.format( + task.name, + getattr(task.func, '__name__', '<unknown>'))) + + except Exception as e: + log.debug('Task crashed: {} [{}]: {}: {}'.format( + task.name, + getattr(task.func, '__name__', '<unknown>'), + type(e).__name__, + str(e)), + exc_info=True) self.__q = None self.__enqueued = None