Commit 8caed2de authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

Merge branch 'rf/subtable_merge' into 'master'

RF: Re-work DataTable.merge - now merges multiple subtables in one go, with

See merge request !60
parents 71b01b8c 847a7667
......@@ -2,6 +2,22 @@ FUNPACK changelog
=================
2.3.0 (Under development)
-------------------------
Changed
^^^^^^^
* Modified the :func:`.binariseCategorical` function so that it parallelises
tasks internally, instead of being called in parallel for different
variables. This should give superior performance.
* Revisited the :meth:`.DataTable.merge` to optimise performance in all
scenarios.
* Improved performance of the :mod:`.fmrib` date/time normalisation routines.
2.2.1 (Monday 4th May 2020)
---------------------------
......
......@@ -6,7 +6,7 @@
#
__version__ = '2.2.1'
__version__ = '2.3.0.dev0'
"""The ``funpack`` versioning scheme roughly follows Semantic Versioning
conventions.
"""
......
......@@ -139,9 +139,7 @@ def applyCleaningFunctions(dtable):
log.debug('%u cleaning tasks complete - merging results '
'into main data table.', len(subtables))
for subtable in subtables:
dtable.merge(subtable)
dtable.merge(subtables)
def _runChildValues(dtable, exprs, cvals, vid):
......
......@@ -5,14 +5,14 @@ Variable Process
# generate useful descriptions for each column (and note
# the ICD/OPCS/other diagnosis vars have already been
# converted to numeric values in the cleaning stage).
independent,6150,6155,20003,20199 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='codingdesc')
independent,20001,20002,20004,40011,40012 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchycodedesc')
independent,40001,40002,40006,40013,41200,41201,41204,41205,41210,41256,41258,41272,41273 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchynumdesc')
6150,6155,20003,20199 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='codingdesc')
20001,20002,20004,40011,40012 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchycodedesc')
40001,40002,40006,40013,41200,41201,41204,41205,41210,41256,41258,41272,41273 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchynumdesc')
# Main ICD vars are binarised, but instead of having binary
# 1/0 values, each column contains the date of diagnosis,
# or '0' indicating no diagnosis.
independent,41202,41203,41270,41271 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchynumdesc', fillval=0, broadcast_take=[41262,41263,41280,41281])
41202,41203,41270,41271 binariseCategorical(acrossVisits=True, acrossInstances=True, metaproc='hierarchynumdesc', fillval=0, broadcast_take=[41262,41263,41280,41281])
# Columns will be dropped if they do not meet these criteria:
# - have at least 51 non-na values
......
......@@ -17,6 +17,7 @@ import string
import logging
import contextlib
import collections
import collections.abc as abc
import pandas as pd
......@@ -205,8 +206,7 @@ class DataTable(util.Singleton):
3. Merge the results back into the main table::
for subtable in subtables:
dtable.merge(subtable)
dtable.merge(subtables)
Modifications must occur through the :meth:`DataTable.__setitem__`
interface, so it can keep track of which columns have been modified.
......@@ -515,6 +515,8 @@ class DataTable(util.Singleton):
# dataframe. We've already checked for
# column conflicts, and we assume that
# row indices are aligned.
log.debug('Merging %u new columns into main data table.', len(series))
series = pd.concat(series, axis=1, verify_integrity=False)
self.__data = pd.concat((self.__data, series),
axis=1,
......@@ -618,6 +620,9 @@ class DataTable(util.Singleton):
columns. It is intended to be used when parallelising tasks, so that
child processes are given a view of only the relevant columns.
.. note:: The :meth:`merge` method cannot be used to merge in a
subtable that contains a subset of rows.
:arg columns: List of :class:`Column` objects.
:arg rows: Sequence of row indices.
:returns: A new :class:`DataTable`.
......@@ -643,36 +648,74 @@ class DataTable(util.Singleton):
subtable=True)
def merge(self, subtable):
"""Merge the data from the given ``subtable`` into this ``DataTable``.
def merge(self, subtables):
"""Merge the data from the given ``subtables`` into this ``DataTable``.
It is assumed that ``subtable`` contains a sub-set of the columns
in this ``DataTable``.
:arg subtable: ``DataTable`` returned by :meth:`subtable`.
"""
It is assumed that the ``subtables`` each contain a sub-set of the
columns in this ``DataTable``.
if self.shape[0] == subtable.shape[0]:
subrows = slice(None)
else:
subrows = subtable.index
.. note:: The :meth:`merge` method cannot be used to merge in a
subtable that contains a subset of rows.
# only copy modified columns - we assume
# that all changes to the subtable
# occurred via DataTable.__setitem__
subcols = [c.name for c in subtable.dataColumns
if MODIFIED_COLUMN in subtable.getFlags(c)]
:arg subtables: A single ``DataTable``, or a sequence of ``DataTable``
instances, returned by :meth:`subtable`.
"""
if not isinstance(subtables, abc.Sequence):
subtables = [subtables]
# Gather a list of all subtable dataframes,
# and a list of all columns to be copied.
# We only copy modified columns - we assume
# that all changes to the subtable occurred
# via DataTable.__setitem__
subdfs = []
subcols = []
for subtable in subtables:
subtcols = [c.name for c in subtable.dataColumns
if MODIFIED_COLUMN in subtable.getFlags(c)]
subdfs.append(subtable[:, subtcols])
subcols.extend(subtcols)
# if there are columns to merge from
# any subtable, create a single dataframe
# containing all of them - quicker than
# merging them separately
if len(subcols) > 0:
self.__data.loc[subrows, subcols] = subtable[subrows, subcols]
for subcol in subtable.dataColumns:
log.debug('merging %u subtable dataframes (%u columns)',
len(subdfs), len(subcols))
mycol = self.__colmap[subcol.name]
myflags = self.__flags[mycol]
subflags = subtable.getFlags(subcol)
subflags = subflags.difference((MODIFIED_COLUMN,))
self.__flags[mycol] = myflags.union(subflags)
if subcol.metadata is not None:
mycol.metadata = subcol.metadata
if len(subdfs) > 1:
subdf = pd.concat(subdfs,
axis='columns',
verify_integrity=False,
copy=False)
else:
subdf = subdfs[0]
# merge subtable data into the
# main dataframe, preserving
# column ordering
colorder = self.__data.columns
self.__data = pd.concat((self.__data.drop(columns=subcols),
subdf.loc[:, subcols]),
axis='columns',
verify_integrity=False,
copy=False)
self.__data = self.__data[colorder]
# copy column metadata
# over from subtables
for subtable in subtables:
for subcol in subtable.dataColumns:
mycol = self.__colmap[subcol.name]
myflags = self.__flags[mycol]
subflags = subtable.getFlags(subcol)
subflags = subflags.difference((MODIFIED_COLUMN,))
self.__flags[mycol] = myflags.union(subflags)
if subcol.metadata is not None:
mycol.metadata = subcol.metadata
......@@ -11,7 +11,6 @@ which contain data on imaged subjects.
import functools as ft
import datetime as dt
import calendar
import pandas as pd
import numpy as np
......@@ -89,52 +88,54 @@ def load_FMRIBImaging(infile):
return df
def dateAsYearFraction(d):
"""Normalise dates so they are represented as a year plus year fraction.
"""
try:
# this normalisation results in
# a non-linear representation of
# time, but this is apparently
# not important.
d = d.timetuple()
daysinyear = 366 if calendar.isleap(d.tm_year) else 365
return d.tm_year + (d.tm_yday - 1) / float(daysinyear)
except Exception:
return np.nan
@funpack.formatter('FMRIBImagingDate')
def format_dateAsYearFraction(dtable, column, series):
"""Formats dates using :func:`dateAsYearFraction`. """
return series.apply(dateAsYearFraction)
def normalisedDate(dtable, column, series):
"""Converts date values into a numeric fractional year representation.
def normalisedAcquisitionTime(t):
"""Normalises timestamps so they are represented as a year plus year
fraction, where days are normalised to lie between 7am and 8pm (as
no scans take place outside of these hours).
Converts a date into a single value x, where ``floor(x)`` is the calendar
year and the ``x mod 1`` is the fractional day within the year. The
conversion takes leap years into account.
"""
datetimes = series.to_numpy()
years = datetimes.astype('datetime64[Y]')
days = datetimes.astype('datetime64[D]')
try:
# convert to day of year
# calculate fraction of day
days = (days - years).astype(np.float32)
years = (years + 1970) .astype(np.float32)
leaps = pd.DatetimeIndex(datetimes).is_leap_year + 365
# see note about non-linearity
# in dateAsYearFaction. This
# could also potentially be non-
# monotonic because we are not
# taking into account leap-seconds.
t = t.timetuple()
daysinyear = 366 if calendar.isleap(t.tm_year) else 365
dayfrac = ((t.tm_hour - 7) +
(t.tm_min / 60.0) +
(t.tm_sec / 3600.0)) / 13.0
return t.tm_year + ((t.tm_yday - 1) + dayfrac) / float(daysinyear)
except Exception:
return np.nan
# calculate and return fraction of year
return pd.Series(years + (days / leaps), name=series.name)
@funpack.formatter('FMRIBImagingTime')
def format_normalisedAcquisitionTime(dtable, column, series):
"""Formats timestamps using :func:`normalisedAcquisitionTime`. """
return series.apply(normalisedAcquisitionTime)
def normalisedAcquisitionTime(dtable, column, series):
"""Converts timestamps into a numeric fractional year representation.
Converts a date or date+time into a single value x, where `floor(x)` is the
calendar year and the fraction day/time within the year *except* 'a day' is
redefined as the time between 7am and 8pm (UK BioBank scanning only takes
place within these hours).
"""
datetimes = series.to_numpy()
years = datetimes.astype('datetime64[Y]')
days = datetimes.astype('datetime64[D]')
hours = datetimes.astype('datetime64[h]')
mins = datetimes.astype('datetime64[m]')
secs = datetimes.astype('datetime64[s]')
# convert to day of year, hour
# of day, second of hour, then
# calculate fraction of day
secs = (secs - mins) .astype(np.float32)
mins = (mins - hours).astype(np.float32)
hours = (hours - days) .astype(np.float32)
days = (days - years).astype(np.float32)
years = (years + 1970) .astype(np.float32)
dayfracs = ((hours - 7) + (mins / 60) + (secs / 3600)) / 13
leaps = pd.DatetimeIndex(datetimes).is_leap_year + 365
# calculate and return fraction of year
return pd.Series(years + (days + dayfracs) / leaps, name=series.name)
......@@ -59,7 +59,7 @@ the following:
independent,1,2,3 processName
- ``'all'``: The process is applied to all vids.::
- ``'all'``: The process is applied to all vids::
all processName
......@@ -303,8 +303,9 @@ def runProcess(proc, dtable, vids, workDir, parallel, bcastIdxs):
# to columns, and column flags/
# metadata. Added/removed columns
# are handled below.
for subtable in subtables:
dtable.merge(subtable)
log.debug('Processing for %u vid groups complete - merging '
'results into main data table.', len(subtables))
dtable.merge(subtables)
remove = []
add = []
......
......@@ -288,6 +288,10 @@ def binariseCategorical(dtable,
one new column for each value, containing ``1`` for subjects
with that value, and ``0`` otherwise.
The :func:`.processing_functions_core.binariseCategorical` function is called
for each variable in ``vids`` - it internally parallelises generation of the
new columns using multiprocessing.
:arg dtable: The :class:`.DataTable`
:arg vids: Sequence of variable IDs to (independently) apply the
......@@ -403,7 +407,8 @@ def binariseCategorical(dtable,
binarised, values = core.binariseCategorical(data,
minpres=minpres,
take=tkdata,
token=vid)
token=vid,
njobs=dtable.njobs)
if replace: remove.extend(cols)
if replaceTake and (takecols is not None): remove.extend(takecols)
......
......@@ -19,28 +19,36 @@
"""
import logging
import collections
import enum
import logging
import collections
import functools as ft
import multiprocessing as mp
import multiprocessing.dummy as mpd
import numpy as np
import pandas as pd
import pandas.api.types as pdtypes
from typing import Optional, Tuple, List, Union, Any
import funpack.util as util
import numpy as np
import pandas as pd
import pandas.api.types as pdtypes
import funpack.util as util
log = logging.getLogger(__name__)
def isSparse(data,
ctype=None,
minpres=None,
minstd=None,
mincat=None,
maxcat=None,
abspres=True,
abscat=True,
naval=None):
def isSparse(
data : pd.Series,
ctype : Optional[enum.Enum] = None,
minpres : Optional[float] = None,
minstd : Optional[float] = None,
mincat : Optional[float] = None,
maxcat : Optional[float] = None,
abspres : bool = True,
abscat : bool = True,
naval : Optional[Any] = None
) -> Tuple[bool, Union[str, None], Any]:
"""Returns ``True`` if the given data looks sparse, ``False`` otherwise.
Used by :func:`removeIfSparse`.
......@@ -179,7 +187,11 @@ def isSparse(data,
return False, None, None
def naCorrelation(namask, nathres, nacounts=None):
def naCorrelation(
namask : np.ndarray,
nathres : float,
nacounts : Optional[np.ndarray] = None
) -> np.ndarray:
"""Compares the missingness correlation of every pair of columns in
``namask``.
......@@ -221,7 +233,12 @@ def naCorrelation(namask, nathres, nacounts=None):
return nacorr
def pairwiseRedundantColumns(data, colpairs, corrthres, token=None):
def pairwiseRedundantColumns(
data : pd.DataFrame,
colpairs : np.ndarray,
corrthres : float,
token : Optional[str] = None
) -> List[int]:
"""Identifies redundant columns based on their correlation with each
other by comparing each pair of columns one by one.
......@@ -272,7 +289,10 @@ def pairwiseRedundantColumns(data, colpairs, corrthres, token=None):
return list(redundant)
def matrixRedundantColumns(data, corrthres, nathres=None):
def matrixRedundantColumns(
data : pd.DataFrame,
corrthres : float,
nathres : Optional[float] = None) -> np.ndarray:
"""Identifies redundant columns based on their correlation with each
other using dot products to calculate a correlation matrix.
......@@ -370,11 +390,17 @@ def matrixRedundantColumns(data, corrthres, nathres=None):
return np.unique(redundant)
def binariseCategorical(data, minpres=None, take=None, token=None):
"""Takes one or more columns containing categorical data,, and generates a new
set of binary columns (as ``np.uint8``), one for each unique categorical
value, containing ``1`` in rows where the value was present, ``0``
otherwise.
def binariseCategorical(
data : pd.DataFrame,
minpres : Optional[int] = None,
take : Optional[pd.DataFrame] = None,
token : Optional[str] = None,
njobs : Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""Takes one or more columns containing categorical data,, and generates a
new set of binary columns (as ``np.uint8``), one for each unique
categorical value, containing ``1`` in rows where the value was present,
``0`` otherwise.
:arg data: A ``pandas.DataFrame`` containing the input columns
......@@ -382,11 +408,20 @@ def binariseCategorical(data, minpres=None, take=None, token=None):
occurrences will not be included in the output
:arg take: Optional ``pandas.DataFrame`` containing values to use in
the output, instead of using binary ``0``/``1`` values.
Must contain the same number of columns (and rows) as
``data``.
the output. Instead of using binary ``0``/``1`` values,
rows where a unique value is present will be populated with
the corresponding value from ``take``, and rows where a
unique value is not present will be populated with
``np.nan``. Must contain the same number of columns (and
rows) as ``data``.
:arg token: Unique token to identify this function call, to make it
re-entrant (in case multiple calls are made in a parallel
execution environment).
:arg token: Unique token to use in log messages.
:arg njobs: Number of jobs to parallelise tasks with - the
:func:`generateBinaryColumns` function is called in parallel
for different blocks of unique values.
:returns: A tuple containing:
......@@ -397,6 +432,12 @@ def binariseCategorical(data, minpres=None, take=None, token=None):
the unique values that are encoded in the binary columns.
"""
if njobs is None: njobs = 1
if njobs < 1: njobs = 1
if njobs == 1: Pool = mpd.Pool
else: Pool = mp.Pool
if (take is not None) and take.shape != data.shape:
takes = take.shape
datas = data.shape
......@@ -431,23 +472,143 @@ def binariseCategorical(data, minpres=None, take=None, token=None):
log.debug('%sCounted %u unique values [minpres: %u]',
token, len(uniq), minpres)
# Make a new subjects * uniqe_values array.
# if take is provided, we assume that
# every column in it has the same dtype
# Prepare inputs for parallelising the
# binarise using binariseUniqueValues
data = data.to_numpy()
# Figure out the type and fill value
# of the output array. if take is
# provided, we assume that every
# column in it has the same dtype
if take is None:
dtype, fill = np.uint8, 0
dtype = np.uint8
fill = 0
else:
dtype = take.dtypes[0]
take = take.to_numpy()
if np.issubdtype(dtype, np.integer): fill = 0
else: fill = np.nan
# We parallelise binarisation across
# blocks of unique values - each call
# to generateBinaryColumns will binarise
# one block. Only read access is needed
# for the uniq, data, and take arrays,
# so they are made available at the
# module level (and thus accessible to
# the worker processes in shared parent
# process memory).
#
# Write access is needed for the bindata
# array (where the result is written),
# so it is shared as a mp.RawArray.
#
# If take is None, bindata is a uint8
# array; otherwise it is a take.dtype
# array. If take has a dtype of
# np.datetime64, we need a hack, as
# we can't create a sharedmem array
# of that type. So for datetime, we
# make bindata uint64, and then cast
# it to datetime64 afterwards.
if np.issubdtype(dtype, np.datetime64):
cdtype = np.uint64
else:
cdtype = dtype
# Shared array to store binarised
# columns. If not parallelising,
# we use a regular numpy array
binshape = (len(data), len(uniq))
if njobs > 1:
rawbindata = mp.RawArray(np.ctypeslib.as_ctypes_type(cdtype),
int(np.prod(binshape)))
bindata = np.ctypeslib.as_array(rawbindata).reshape(binshape)
else:
dtype, fill = take.dtypes[0], np.nan
take = take.to_numpy()
bindata = np.empty(binshape, dtype=dtype)
# make the inputs module-level accessible
# before creating worker processess. We
# use a unique token for this invocation
# in case this function has been called
# multiple times
if not hasattr(generateBinaryColumns, 'inputs'):
generateBinaryColumns.inputs = {}
generateBinaryColumns.inputs[token] = (uniq, data, bindata, take)
# create an offset into the uniq
# list for each worker process
valsperjob = int(np.ceil(len(uniq) / njobs))
offsets = np.arange(0, len(uniq), valsperjob)
func = ft.partial(generateBinaryColumns,
nvals=valsperjob,
fill=fill,
token=token)
try:
with Pool(njobs) as pool:
pool.map(func, offsets)
pool.close()
pool.join()
finally:
generateBinaryColumns.inputs.pop(token)
# cast the result if necessary
if dtype != cdtype:
bindata = bindata.astype(dtype)
return bindata, uniq
def generateBinaryColumns(offset, nvals, fill, token):
"""Called by :func:`binariseCategorical`. Generates binarised columns for
a subset of unique values for a data set.
The following arguments are passed internally from
:func:`binariseCategorical` to this function:
- Sequence of unique values
- Numpy array containing the data
- Numpy array to store the output
- Numpy array to take values from
:arg offset: Offset into the sequence of unique values
:arg nvals: Number of unique values to binarise
:arg fill: Default value
:arg token: Unique token used to retrieve the data for one invocation of
this function.
"""
data = data.to_numpy()
bindata = np.full((len(data), len(uniq)), fill, dtype=dtype)