Skip to content
Snippets Groups Projects
Commit 9739482f authored by Paul McCarthy's avatar Paul McCarthy :mountain_bicyclist:
Browse files

Async unit tests are now idle unit tests

parent 8b7f2cfa
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python #!/usr/bin/env python
# #
# test_async.py - # test_idle.py -
# #
# Author: Paul McCarthy <pauldmccarthy@gmail.com> # Author: Paul McCarthy <pauldmccarthy@gmail.com>
# #
...@@ -14,12 +14,12 @@ from six.moves import reload_module ...@@ -14,12 +14,12 @@ from six.moves import reload_module
import pytest import pytest
import mock import mock
import fsl.utils.async as async import fsl.utils.idle as idle
from fsl.utils.platform import platform as fslplatform from fsl.utils.platform import platform as fslplatform
# We use a single wx.App object because wx.GetApp() # We use a single wx.App object because wx.GetApp()
# will still return an old App objectd after its # will still return an old App objectd after its
# mainloop has exited. and therefore async.idle # mainloop has exited. and therefore idle.idle
# will potentially register on EVT_IDLE with the # will potentially register on EVT_IDLE with the
# wrong wx.App object. # wrong wx.App object.
_wxapp = None _wxapp = None
...@@ -65,7 +65,7 @@ def _run_with_wx(func, *args, **kwargs): ...@@ -65,7 +65,7 @@ def _run_with_wx(func, *args, **kwargs):
wx.CallLater(startingDelay, wrap) wx.CallLater(startingDelay, wrap)
_wxapp.MainLoop() _wxapp.MainLoop()
async.idleReset() idle.idleReset()
if raised[0] and propagateRaise: if raised[0] and propagateRaise:
raise raised[0] raise raised[0]
...@@ -82,7 +82,7 @@ def _wait_for_idle_loop_to_clear(): ...@@ -82,7 +82,7 @@ def _wait_for_idle_loop_to_clear():
def busywait(): def busywait():
idleDone[0] = True idleDone[0] = True
async.idle(busywait) idle.idle(busywait)
while not idleDone[0]: while not idleDone[0]:
wx.Yield() wx.Yield()
...@@ -109,7 +109,7 @@ def _test_run(): ...@@ -109,7 +109,7 @@ def _test_run():
def onError(e): def onError(e):
onErrorCalled[0] = True onErrorCalled[0] = True
t = async.run(task) t = idle.run(task)
if t is not None: if t is not None:
t.join() t.join()
...@@ -118,7 +118,7 @@ def _test_run(): ...@@ -118,7 +118,7 @@ def _test_run():
taskRun[0] = False taskRun[0] = False
t = async.run(task, onFinish, onError) t = idle.run(task, onFinish, onError)
if t is not None: if t is not None:
t.join() t.join()
...@@ -128,7 +128,7 @@ def _test_run(): ...@@ -128,7 +128,7 @@ def _test_run():
taskRun[ 0] = False taskRun[ 0] = False
onFinishCalled[0] = False onFinishCalled[0] = False
t = async.run(errtask, onFinish, onError) t = idle.run(errtask, onFinish, onError)
if t is not None: if t is not None:
t.join() t.join()
...@@ -141,12 +141,12 @@ def _test_run(): ...@@ -141,12 +141,12 @@ def _test_run():
def test_idleTimeout(): def test_idleTimeout():
async.idleReset() idle.idleReset()
default = async.getIdleTimeout() default = idle.getIdleTimeout()
async.setIdleTimeout(999) idle.setIdleTimeout(999)
assert async.getIdleTimeout() == 999 assert idle.getIdleTimeout() == 999
async.setIdleTimeout() idle.setIdleTimeout()
assert async.getIdleTimeout() == default assert idle.getIdleTimeout() == default
def test_idle(): def test_idle():
...@@ -159,24 +159,24 @@ def test_idle(): ...@@ -159,24 +159,24 @@ def test_idle():
def errtask(arg, kwarg1=None): def errtask(arg, kwarg1=None):
raise Exception('Task which was supposed to crash crashed!') raise Exception('Task which was supposed to crash crashed!')
assert async.getIdleTimeout() > 0 assert idle.getIdleTimeout() > 0
# Run directly # Run directly
async.idle(task, 1, kwarg1=2, name='direct') idle.idle(task, 1, kwarg1=2, name='direct')
assert called[0] assert called[0]
called[0] = False called[0] = False
# Run on wx idle loop # Run on wx idle loop
_run_with_wx(async.idle, task, 1, kwarg1=2) _run_with_wx(idle.idle, task, 1, kwarg1=2)
assert called[0] assert called[0]
# Run a crashing task directly # Run a crashing task directly
with pytest.raises(Exception): with pytest.raises(Exception):
async.idle(errtask, 1, kwarg1=2) idle.idle(errtask, 1, kwarg1=2)
# Run a crashing task on idle loop - error should not propagate # Run a crashing task on idle loop - error should not propagate
_run_with_wx(async.idle, errtask, 1, kwarg1=2) _run_with_wx(idle.idle, errtask, 1, kwarg1=2)
def test_inidle(): def test_inidle():
...@@ -189,8 +189,8 @@ def test_inidle(): ...@@ -189,8 +189,8 @@ def test_inidle():
def queuetask(): def queuetask():
async.idle(task, after=0.01, name=name) idle.idle(task, after=0.01, name=name)
assert async.inIdle(name) assert idle.inIdle(name)
_run_with_wx(queuetask) _run_with_wx(queuetask)
...@@ -207,8 +207,8 @@ def test_cancelidle(): ...@@ -207,8 +207,8 @@ def test_cancelidle():
def queuetask(): def queuetask():
async.idle(task, after=0.01, name=name) idle.idle(task, after=0.01, name=name)
async.cancelIdle(name) idle.cancelIdle(name)
_run_with_wx(queuetask) _run_with_wx(queuetask)
...@@ -229,8 +229,8 @@ def test_idle_skipIfQueued(): ...@@ -229,8 +229,8 @@ def test_idle_skipIfQueued():
def queuetask(): def queuetask():
async.idle(task1, after=0.01, name=name) idle.idle(task1, after=0.01, name=name)
async.idle(task2, after=0.01, name=name, skipIfQueued=True) idle.idle(task2, after=0.01, name=name, skipIfQueued=True)
_run_with_wx(queuetask) _run_with_wx(queuetask)
...@@ -252,8 +252,8 @@ def test_idle_dropIfQueued(): ...@@ -252,8 +252,8 @@ def test_idle_dropIfQueued():
def queuetask(): def queuetask():
async.idle(task1, after=0.01, name=name) idle.idle(task1, after=0.01, name=name)
async.idle(task2, after=0.01, name=name, dropIfQueued=True) idle.idle(task2, after=0.01, name=name, dropIfQueued=True)
_run_with_wx(queuetask) _run_with_wx(queuetask)
...@@ -279,10 +279,10 @@ def test_idle_alwaysQueue1(): ...@@ -279,10 +279,10 @@ def test_idle_alwaysQueue1():
# The task should be run # The task should be run
# when the mainloop starts # when the mainloop starts
async.idle(task, alwaysQueue=True) idle.idle(task, alwaysQueue=True)
# Second call to async.idle # Second call to idle.idle
_run_with_wx(async.idle, nop) _run_with_wx(idle.idle, nop)
assert called[0] assert called[0]
...@@ -299,7 +299,7 @@ def test_idle_alwaysQueue2(): ...@@ -299,7 +299,7 @@ def test_idle_alwaysQueue2():
called[0] = True called[0] = True
def queue(): def queue():
async.idle(task, alwaysQueue=True) idle.idle(task, alwaysQueue=True)
_run_with_wx(None, callAfterApp=queue) _run_with_wx(None, callAfterApp=queue)
...@@ -321,7 +321,7 @@ def test_idle_alwaysQueue3(): ...@@ -321,7 +321,7 @@ def test_idle_alwaysQueue3():
def task(): def task():
called[0] = True called[0] = True
_run_with_wx(async.idle, task, alwaysQueue=True) _run_with_wx(idle.idle, task, alwaysQueue=True)
assert called[0] assert called[0]
...@@ -339,11 +339,11 @@ def test_idle_alwaysQueue4(): ...@@ -339,11 +339,11 @@ def test_idle_alwaysQueue4():
import fsl.utils.platform import fsl.utils.platform
with mock.patch.dict('sys.modules', {'wx' : None}): with mock.patch.dict('sys.modules', {'wx' : None}):
# async uses the platform module to # idle uses the platform module to
# determine whether a GUI is available, # determine whether a GUI is available,
# so we have to reload it # so we have to reload it
reload_module(fsl.utils.platform) reload_module(fsl.utils.platform)
async.idle(task, alwaysQueue=True) idle.idle(task, alwaysQueue=True)
with pytest.raises(ImportError): with pytest.raises(ImportError):
import wx import wx
...@@ -360,7 +360,7 @@ def test_idle_timeout(): ...@@ -360,7 +360,7 @@ def test_idle_timeout():
def task(): def task():
called[0] = True called[0] = True
_run_with_wx(async.idle, task, timeout=0.0000000000000001) _run_with_wx(idle.idle, task, timeout=0.0000000000000001)
assert not called[0] assert not called[0]
...@@ -378,9 +378,9 @@ def test_idleWhen(): ...@@ -378,9 +378,9 @@ def test_idleWhen():
def task(): def task():
called[0] = True called[0] = True
async.setIdleTimeout(1) idle.setIdleTimeout(1)
_run_with_wx(async.idleWhen, task, condition, pollTime=0.001) _run_with_wx(idle.idleWhen, task, condition, pollTime=0.001)
assert called[0] assert called[0]
assert timesPolled[0] == 50 assert timesPolled[0] == 50
...@@ -409,7 +409,7 @@ def _test_wait(): ...@@ -409,7 +409,7 @@ def _test_wait():
for t in threads: for t in threads:
t.start() t.start()
t = async.wait(threads, waittask, wait_direct=wait_direct) t = idle.wait(threads, waittask, wait_direct=wait_direct)
if t is not None: if t is not None:
t.join() t.join()
...@@ -427,7 +427,7 @@ def test_TaskThread(): ...@@ -427,7 +427,7 @@ def test_TaskThread():
def task(): def task():
called[0] = True called[0] = True
tt = async.TaskThread() tt = idle.TaskThread()
tt.start() tt.start()
tt.enqueue(task) tt.enqueue(task)
...@@ -451,7 +451,7 @@ def test_TaskThread_onFinish(): ...@@ -451,7 +451,7 @@ def test_TaskThread_onFinish():
def onFinish(): def onFinish():
onFinishCalled[0] = True onFinishCalled[0] = True
tt = async.TaskThread() tt = idle.TaskThread()
tt.start() tt.start()
tt.enqueue(task, onFinish=onFinish) tt.enqueue(task, onFinish=onFinish)
...@@ -475,7 +475,7 @@ def test_TaskThread_isQueued(): ...@@ -475,7 +475,7 @@ def test_TaskThread_isQueued():
def realTask(): def realTask():
called[0] = True called[0] = True
tt = async.TaskThread() tt = idle.TaskThread()
tt.start() tt.start()
tt.enqueue(busyTask) tt.enqueue(busyTask)
...@@ -504,7 +504,7 @@ def test_TaskThread_dequeue(): ...@@ -504,7 +504,7 @@ def test_TaskThread_dequeue():
def realTask(): def realTask():
called[0] = True called[0] = True
tt = async.TaskThread() tt = idle.TaskThread()
tt.start() tt.start()
tt.enqueue(busyTask) tt.enqueue(busyTask)
...@@ -529,12 +529,12 @@ def test_TaskThread_TaskVeto(): ...@@ -529,12 +529,12 @@ def test_TaskThread_TaskVeto():
def task(): def task():
taskCalled[0] = True taskCalled[0] = True
raise async.TaskThreadVeto() raise idle.TaskThreadVeto()
def onFinish(): def onFinish():
onFinishCalled[0] = True onFinishCalled[0] = True
tt = async.TaskThread() tt = idle.TaskThread()
tt.start() tt.start()
tt.enqueue(task, onFinish=onFinish) tt.enqueue(task, onFinish=onFinish)
...@@ -553,13 +553,13 @@ def test_mutex(): ...@@ -553,13 +553,13 @@ def test_mutex():
class Thing(object): class Thing(object):
@async.mutex @idle.mutex
def method1(self): def method1(self):
self.method1start = time.time() self.method1start = time.time()
time.sleep(0.5) time.sleep(0.5)
self.method1end = time.time() self.method1end = time.time()
@async.mutex @idle.mutex
def method2(self): def method2(self):
self.method2start = time.time() self.method2start = time.time()
time.sleep(0.5) time.sleep(0.5)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment