Forked from
FSL / fslpy
1295 commits behind the upstream repository.
-
Paul McCarthy authoredPaul McCarthy authored
test_idle.py 12.24 KiB
#!/usr/bin/env python
#
# test_idle.py -
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
import gc
import time
import threading
import random
from six.moves import reload_module
import pytest
import mock
import fsl.utils.idle as idle
from fsl.utils.platform import platform as fslplatform
def _run_with_wx(func, *args, **kwargs):
gc.collect()
propagateRaise = kwargs.pop('propagateRaise', True)
startingDelay = kwargs.pop('startingDelay', 500)
finishingDelay = kwargs.pop('finishingDelay', 500)
callAfterApp = kwargs.pop('callAfterApp', None)
import wx
result = [None]
raised = [None]
app = [wx.App()]
frame = wx.Frame(None)
if callAfterApp is not None:
callAfterApp()
def wrap():
try:
if func is not None:
result[0] = func(*args, **kwargs)
except Exception as e:
print(e)
raised[0] = e
finally:
def finish():
frame.Destroy()
app[0].ExitMainLoop()
wx.CallLater(finishingDelay, finish)
frame.Show()
wx.CallLater(startingDelay, wrap)
app[0].MainLoop()
time.sleep(1)
idle.idleReset()
if raised[0] and propagateRaise:
raise raised[0]
del app[0]
return result[0]
def _run_without_wx(func, *args, **kwargs):
with mock.patch.dict('sys.modules', wx=None):
return func(*args, **kwargs)
def _wait_for_idle_loop_to_clear():
if fslplatform.haveGui:
import wx
idleDone = [False]
def busywait():
idleDone[0] = True
idle.idle(busywait)
while not idleDone[0]:
wx.GetApp().Yield()
@pytest.mark.wxtest
def test_run_with_gui(): _run_with_wx( _test_run)
def test_run_without_gui(): _run_without_wx(_test_run)
def _test_run():
taskRun = [False]
onFinishCalled = [False]
onErrorCalled = [False]
def task():
taskRun[0] = True
def errtask():
taskRun[0] = True
raise Exception('Task which was supposed to crash crashed!')
def onFinish():
onFinishCalled[0] = True
def onError(e):
onErrorCalled[0] = True
t = idle.run(task)
if t is not None:
t.join()
assert taskRun[0]
taskRun[0] = False
t = idle.run(task, onFinish, onError)
if t is not None:
t.join()
_wait_for_idle_loop_to_clear()
taskRun[ 0] = False
onFinishCalled[0] = False
t = idle.run(errtask, onFinish, onError)
if t is not None:
t.join()
_wait_for_idle_loop_to_clear()
assert taskRun[ 0]
assert not onFinishCalled[0]
assert onErrorCalled[ 0]
@pytest.mark.wxtest
def test_idleTimeout_with_gui(): _run_with_wx( _test_idleTimeout)
def test_idleTimeout_without_gui(): _run_without_wx(_test_idleTimeout)
def _test_idleTimeout():
idle.idleReset()
default = idle.getIdleTimeout()
idle.setIdleTimeout(999)
assert idle.getIdleTimeout() == 999
idle.setIdleTimeout()
assert idle.getIdleTimeout() == default
@pytest.mark.wxtest
def test_block_with_gui(): _run_with_wx( _test_block)
def test_block_without_gui(): _run_without_wx(_test_block)
def _test_block():
called = [False]
if fslplatform.haveGui:
import wx
def idlefunc():
called[0] = True
wx.CallLater(1000, idlefunc)
start = time.time()
idle.block(2)
end = time.time()
# Be relaxed about precision - timing
# can sometimes be pretty sloppy when
# running in a docker container.
assert abs((end - start) < 2) < 0.05
if fslplatform.haveGui:
assert called[0]
@pytest.mark.wxtest
def test_idle():
called = [False]
def task(arg, kwarg1=None):
called[0] = arg == 1 and kwarg1 == 2
def errtask(arg, kwarg1=None):
raise Exception('Task which was supposed to crash crashed!')
assert idle.getIdleTimeout() > 0
# Run directly
_run_without_wx(idle.idle, task, 1, kwarg1=2, name='direct')
assert called[0]
called[0] = False
# Run on wx idle loop
_run_with_wx(idle.idle, task, 1, kwarg1=2)
assert called[0]
# Run a crashing task directly
with pytest.raises(Exception):
idle.idle(errtask, 1, kwarg1=2)
# Run a crashing task on idle loop - error should not propagate
_run_with_wx(idle.idle, errtask, 1, kwarg1=2)
@pytest.mark.wxtest
def test_inidle():
called = [False]
name = 'mytask'
def task():
called[0] = True
def queuetask():
idle.idle(task, after=0.01, name=name)
assert idle.inIdle(name)
_run_with_wx(queuetask)
assert called[0]
@pytest.mark.wxtest
def test_cancelidle():
called = [False]
name = 'mytask'
def task():
called[0] = True
def queuetask():
idle.idle(task, after=0.01, name=name)
idle.cancelIdle(name)
_run_with_wx(queuetask)
assert not called[0]
@pytest.mark.wxtest
def test_idle_skipIfQueued():
task1called = [False]
task2called = [False]
name = 'mytask'
def task1():
task1called[0] = True
def task2():
task2called[0] = True
def queuetask():
idle.idle(task1, after=0.01, name=name)
idle.idle(task2, after=0.01, name=name, skipIfQueued=True)
_run_with_wx(queuetask)
assert task1called[0]
assert not task2called[0]
@pytest.mark.wxtest
def test_idle_dropIfQueued():
task1called = [False]
task2called = [False]
name = 'mytask'
def task1():
print('task1 called')
task1called[0] = True
def task2():
print('task2 called')
task2called[0] = True
def queuetask():
print('Queuetask running')
idle.idle(task1, after=0.01, name=name)
idle.idle(task2, after=0.01, name=name, dropIfQueued=True)
print('Queuetask finished')
import sys
print('running with wx')
sys.stdout.flush()
_run_with_wx(queuetask)
print('run with wx finished')
sys.stdout.flush()
assert not task1called[0]
assert task2called[0]
@pytest.mark.wxtest
def test_idle_alwaysQueue1():
# Test scheduling the task before
# a wx.App has been created.
called = [False]
def task():
called[0] = True
# In this scenario, an additional call
# to idle (after the App has been created)
# is necessary, otherwise the originally
# queued task will not be called.
def nop():
pass
# The task should be run
# when the mainloop starts
idle.idle(task, alwaysQueue=True)
# Second call to idle.idle
_run_with_wx(idle.idle, nop)
assert called[0]
@pytest.mark.wxtest
def test_idle_alwaysQueue2():
# Test scheduling the task
# after a wx.App has been craeted,
# but before MainLoop has started
called = [False]
def task():
called[0] = True
def queue():
idle.idle(task, alwaysQueue=True)
_run_with_wx(None, callAfterApp=queue)
assert called[0]
@pytest.mark.wxtest
def test_idle_alwaysQueue3():
# Test scheduling the task
# after a wx.App has been craeted
# and the MainLoop has started.
# In this case, alwaysQueue should
# have no effect - the task should
# just be queued and executed as
# normal.
called = [False]
def task():
called[0] = True
_run_with_wx(idle.idle, task, alwaysQueue=True)
assert called[0]
@pytest.mark.wxtest
def test_idle_alwaysQueue4():
# Test scheduling the task when
# wx is not present - the task
# should just be executed immediately
called = [False]
def task():
called[0] = True
import fsl.utils.platform
with mock.patch.dict('sys.modules', {'wx' : None}):
# idle uses the platform module to
# determine whether a GUI is available,
# so we have to reload it
reload_module(fsl.utils.platform)
idle.idle(task, alwaysQueue=True)
with pytest.raises(ImportError):
import wx
reload_module(fsl.utils.platform)
assert called[0]
@pytest.mark.wxtest
def test_idle_timeout():
called = [False]
def task():
called[0] = True
_run_with_wx(idle.idle, task, timeout=0.0000000000000001)
assert not called[0]
@pytest.mark.wxtest
def test_idleWhen():
called = [False]
timesPolled = [0]
def condition():
timesPolled[0] += 1
return timesPolled[0] == 50
def task():
called[0] = True
idle.setIdleTimeout(1)
_run_with_wx(idle.idleWhen, task, condition, pollTime=0.001)
assert called[0]
assert timesPolled[0] == 50
@pytest.mark.wxtest
def test_wait_with_gui(): _run_with_wx(_test_wait, finishingDelay=1100)
def test_wait_without_gui(): _test_wait()
def _test_wait():
ntasks = 10
def threadtask(num):
time.sleep(random.random())
threadtaskscalled[num] = True
def waittask():
waittaskcalled[0] = True
for wait_direct in [False, True]:
threadtaskscalled = [False] * ntasks
waittaskcalled = [False]
threads = [threading.Thread(target=threadtask, args=(n,))
for n in range(ntasks)]
for t in threads:
t.start()
t = idle.wait(threads, waittask, wait_direct=wait_direct)
if t is not None:
t.join()
_wait_for_idle_loop_to_clear()
assert all(threadtaskscalled)
assert waittaskcalled[0]
def test_TaskThread():
called = [False]
def task():
called[0] = True
tt = idle.TaskThread()
tt.start()
tt.enqueue(task)
time.sleep(0.5)
tt.stop()
tt.join()
assert called[0]
def test_TaskThread_onFinish():
taskCalled = [False]
onFinishCalled = [False]
def task():
taskCalled[0] = True
def onFinish():
onFinishCalled[0] = True
tt = idle.TaskThread()
tt.start()
tt.enqueue(task, onFinish=onFinish)
time.sleep(0.5)
tt.stop()
tt.join()
assert taskCalled[0]
assert onFinishCalled[0]
def test_TaskThread_isQueued():
called = [False]
def busyTask():
time.sleep(0.5)
def realTask():
called[0] = True
tt = idle.TaskThread()
tt.start()
tt.enqueue(busyTask)
tt.enqueue(realTask, taskName='realTask')
time.sleep(0.25)
queued = tt.isQueued('realTask')
time.sleep(0.3)
tt.stop()
tt.join()
assert queued
assert called[0]
def test_TaskThread_dequeue():
called = [False]
def busyTask():
time.sleep(0.5)
def realTask():
called[0] = True
tt = idle.TaskThread()
tt.start()
tt.enqueue(busyTask)
tt.enqueue(realTask, taskName='realTask')
time.sleep(0.25)
tt.dequeue('realTask')
time.sleep(0.3)
tt.stop()
tt.join()
assert not called[0]
def test_TaskThread_TaskVeto():
taskCalled = [False]
onFinishCalled = [False]
def task():
taskCalled[0] = True
raise idle.TaskThreadVeto()
def onFinish():
onFinishCalled[0] = True
tt = idle.TaskThread()
tt.start()
tt.enqueue(task, onFinish=onFinish)
time.sleep(0.5)
tt.stop()
tt.join()
assert taskCalled[0]
assert not onFinishCalled[0]
def test_mutex():
class Thing(object):
@idle.mutex
def method1(self):
self.method1start = time.time()
time.sleep(0.01)
self.method1end = time.time()
@idle.mutex
def method2(self):
self.method2start = time.time()
time.sleep(0.01)
self.method2end = time.time()
for i in range(200):
t = [Thing()]
def thread1():
t[0].method1()
def thread2():
t[0].method2()
for i in range(10):
t[0].method1start = None
t[0].method2start = None
t[0].method1end = None
t[0].method2end = None
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()
# Either t1 has to start and
# finish before t2 or vice versa
assert (t[0].method2start >= t[0].method1end or
t[0].method1start >= t[0].method2end)