diff --git a/fsl/utils/async.py b/fsl/utils/async.py index 82b4c14a993cf1946716f0f795d7fb3c82bf67ae..0f18f884934ff62d810bd04e2d179f2425f1ca5f 100644 --- a/fsl/utils/async.py +++ b/fsl/utils/async.py @@ -57,12 +57,22 @@ the task (via :func:`idle`). The :class:`TaskThread` class is a simple thread which runs a queue of tasks. +Other facilities +---------------- + + +The ``async`` module also defines the :func:`mutex` decorator, which is +intended to be used to mark the methods of a class as being mutually exclusive. +The ``mutex`` decorator uses the :class:`MutexFactory` class to do its work. + + .. todo:: You could possibly use ``props.callqueue`` to drive the idle loop. """ import time import logging +import functools import threading import collections @@ -484,3 +494,98 @@ class TaskThread(threading.Thread): self.__q = None self.__enqueued = None log.debug('Task thread finished') + + + + +def mutex(*args, **kwargs): + """Decorator for use on methods of a class, which makes the method + call mutually exclusive. + + If you define a class which has one or more methods that must only + be accessed by one thread at a time, you can use the ``mutex`` decorator + to enforce this restriction. As a contrived example:: + + + class Example(object): + + def __init__(self): + self.__sharedData = [] + + @mutex + def dangerousMethod1(self, message): + sefl.__sharedData.append(message) + + @mutex + def dangerousMethod2(self): + return sefl.__sharedData.pop() + + + + The ``@mutex`` decorator will ensure that, at any point in time, only + one thread is running either of the ``dangerousMethod1`` or + ``dangerousMethod2`` methods. + + See the :class:`MutexFactory`` + """ + return MutexFactory(*args, **kwargs) + + +class MutexFactory(object): + """The ``MutexFactory`` is a placeholder for methods which have been + decorated with the :func:`mutex` decorator. When the method of a class + is decorated with ``@mutex``, a ``MutexFactory`` is created. + + Later on, when the method is accessed on an instance, the :meth:`__get__` + method creates the true decorator function, and replaces the instance + method with that decorator. + + .. note:: The ``MutexFactory`` adds an attribute called + ``_async_mutex_lock`` to all instances that have + ``@mutex``-decorated methods. + """ + + + def __init__(self, function): + """Create a ``MutexFactory``. + """ + self.__func = function + + + def __get__(self, instance, cls): + """When this ``MutexFactory`` is accessed through an instance, + a decorator function is created which enforces mutually exclusive + access to the decorated method. A single ``threading.Lock`` object + is shared between all ``@mutex``-decorated methods on a single + instance. + + If this ``MutexFactory`` is accessed through a class, the + decorated function is returned. + """ + + # Class-level access + if instance is None: + return self.__func + + # Get the lock object, creating if it necessary + lock = getattr(instance, '_async_mutex_lock', None) + if lock is None: + lock = threading.Lock() + instance._async_mutex_lock = lock + + # The true decorator function: + # - Acquire the lock (blocking until it has been released) + # - Run the decorated method + # - Release the lock + def decorator(*args, **kwargs): + lock.acquire() + try: + return self.__func(instance, *args, **kwargs) + finally: + lock.release() + + # Replace this MutexFactory with + # the decorator on the instance + decorator = functools.update_wrapper(decorator, self.__func) + setattr(instance, self.__func.__name__, decorator) + return decorator diff --git a/fsl/utils/cache.py b/fsl/utils/cache.py new file mode 100644 index 0000000000000000000000000000000000000000..6d39d916cac771cd5f8f4917c8bd762c9cd09831 --- /dev/null +++ b/fsl/utils/cache.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# +# cache.py - A simple cache based on an OrderedDict. +# +# Author: Paul McCarthy <pauldmccarthy@gmail.com> +# +"""This module provides the :class:`.Cache` class., a simple in-memory cache. +""" + + +import time +import collections + + +class Expired(Exception): + """``Exception`` raised by the :meth:`Cache.get` metho when an attempt is + made to access a cache item that has expired. + """ + pass + + +class CacheItem(object): + """Internal container class used to store :class:`Cache` items. """ + + def __init__(self, key, value, expiry=0): + self.key = key + self.value = value + self.expiry = expiry + self.storetime = time.time() + + +class Cache(object): + """The ``Cache`` is a simple in-memory cache built on a + ``collections.OrderedDict``. The ``Cache`` class has the following + features: + + - When an item is added to a full cache, the oldest entry is + automatically dropped. + + - Expiration times can be specified for individual items. If a request + is made to access an expired item, an :class:`Expired` exception is + raised. + """ + + def __init__(self, maxsize=100): + """Create a ``Cache``. + + :arg maxsize: Maximum number of items allowed in the ``Cache`` before + it starts dropping old items + """ + self.__cache = collections.OrderedDict() + self.__maxsize = maxsize + + + def put(self, key, value, expiry=0): + """Put an item in the cache. + + :arg key: Item identifier (must be hashable). + + :arg value: The item to store. + + :arg expiry: Expiry time in seconds. An item with an expiry time of + ``0`` will not expire. + """ + + if len(self.__cache) == self.__maxsize: + self.__cache.popitem(last=False) + + self.__cache[key] = CacheItem(key, value, expiry) + + + def get(self, key, *args, **kwargs): + """Get an item from the cache. + + :arg key: Item identifier. + :arg default: Default value to return if the item is not in the cache, + or has expired. + """ + + defaultSpecified, default = self.__parseDefault(*args, **kwargs) + + # Default value specified - return + # it if the key is not in the cache + if defaultSpecified: + + entry = self.__cache.get(key, None) + + if entry is None: + return default + + # No default value specified - + # allow KeyErrors to propagate + else: + entry = self.__cache[key] + + if entry.expiry > 0: + if time.time() - entry.storetime > entry.expiry: + + self.__cache.pop(key) + + if defaultSpecified: return default + else: raise Expired(key) + + return entry.value + + + def clear(self): + """Remove all items fromthe cache. + """ + self.__cache = collections.OrderedDict() + + + def __parseDefault(self, *args, **kwargs): + """Used by the :meth:`get` method. Parses the ``default`` argument, + which may be specified as either a positional or keyword argumnet. + + :returns: A tuple containing two values: + + - ``True`` if a default argument was specified, ``False`` + otherwise. + + - The specifeid default value, or ``None`` if it wasn't + specified. + """ + + nargs = len(args) + len(kwargs) + + # Nothing specified (ok), or too + # many arguments specified (not ok) + if nargs == 0: return False, None + elif nargs != 1: raise ValueError() + + # The default value is either specified as a + # positional argument, or as a keyword argument + if len(args) == 1: return True, args[0] + elif len(kwargs) == 1: return True, kwargs['default']