Skip to content
Snippets Groups Projects
Commit eca48715 authored by Paul McCarthy's avatar Paul McCarthy
Browse files

async.TaskThread allows a finish callback function to be specified. And

TaskThread also doesn't crash when an enqueued task raises an error.
parent 4ea95458
No related branches found
No related tags found
No related merge requests found
......@@ -457,7 +457,8 @@ class ImageWrapper(notifier.Notifier):
else:
name = '{}_read_{}'.format(id(self), slices)
if not self.__taskThread.isQueued(name):
self.__taskThread.enqueue(name, self.__expandCoverage, slices)
self.__taskThread.enqueue(
name, self.__expandCoverage, None, slices)
def __updateDataRangeOnWrite(self, slices, data):
......@@ -513,7 +514,8 @@ class ImageWrapper(notifier.Notifier):
else:
name = '{}_write_{}'.format(id(self), slices)
if not self.__taskThread.isQueued(name):
self.__taskThread.enqueue(name, self.__expandCoverage, slices)
self.__taskThread.enqueue(
name, self.__expandCoverage, None, slices)
def __getitem__(self, sliceobj):
......
......@@ -381,12 +381,13 @@ class Task(object):
"""Container object which encapsulates a task that is run by a
:class:`TaskThread`.
"""
def __init__(self, name, func, args, kwargs):
self.name = name
self.func = func
self.args = args
self.kwargs = kwargs
self.enabled = True
def __init__(self, name, func, onFinish, args, kwargs):
self.name = name
self.func = func
self.onFinish = onFinish
self.args = args
self.kwargs = kwargs
self.enabled = True
class TaskThread(threading.Thread):
......@@ -409,21 +410,28 @@ class TaskThread(threading.Thread):
log.debug('New task thread')
def enqueue(self, name, func, *args, **kwargs):
def enqueue(self, name, func, onFinish, *args, **kwargs):
"""Enqueue a task to be executed.
:arg name: Task name. Does not necessarily have to be a string,
but must be hashable.
:arg func: The task function.
:arg name: Task name. Does not necessarily have to be a string,
but must be hashable.
:arg func: The task function.
:arg onFinish: An optional function to be called (via :func:`idle`)
when the task funtion has finished.
All other arguments will be passed through to the task when it is
executed.
.. note:: If the specified ``name`` is not unique (i.e. another task
with the same name may already be enqueued), the
:meth:`isQueued` method will probably return invalid
results.
"""
log.debug('Enqueueing task: {} [{}]'.format(
name, getattr(func, '__name__', '<unknown>')))
t = Task(name, func, args, kwargs)
t = Task(name, func, onFinish, args, kwargs)
self.__enqueued[name] = t
self.__q.put(t)
......@@ -485,11 +493,23 @@ class TaskThread(threading.Thread):
task.name,
getattr(task.func, '__name__', '<unknown>')))
task.func(*task.args, **task.kwargs)
log.debug('Task completed: {} [{}]'.format(
task.name,
getattr(task.func, '__name__', '<unknown>')))
try:
task.func(*task.args, **task.kwargs)
if task.onFinish is not None:
idle(task.onFinish)
log.debug('Task completed: {} [{}]'.format(
task.name,
getattr(task.func, '__name__', '<unknown>')))
except Exception as e:
log.debug('Task crashed: {} [{}]: {}: {}'.format(
task.name,
getattr(task.func, '__name__', '<unknown>'),
type(e).__name__,
str(e)),
exc_info=True)
self.__q = None
self.__enqueued = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment