Commit 6e544503 authored by Paul McCarthy's avatar Paul McCarthy 🚵
Browse files

Merge branch 'enh/perf' into 'master'

Enh/perf

See merge request !58
parents 122010a0 9c7e1d7c
Paul McCarthy <paul.mccarthy@ndcn.ox.ac.uk>
Thomas Nichols <tom.nichols@bdi.ox.ac.uk>
Jessie Liu <Zhangdaihong.Liu@warwick.ac.uk>
\ No newline at end of file
Jessie Liu <Zhangdaihong.Liu@warwick.ac.uk>
Steve Smith <steve@fmrib.ox.ac.uk>
......@@ -2,8 +2,8 @@ FUNPACK changelog
=================
2.2.0 (Under development)
-------------------------
2.2.0 (Friday 1st May 2020)
---------------------------
Changed
......@@ -13,6 +13,12 @@ Changed
* Substantial performance improvements to the :func:`.codeToNumeric` cleaning
function, and to :func:`.removeIfRedundant`, :func:`.binariseCategorical`,
and other processing functions.
* The default implementation of :func:`.removeIfRedundant` now uses matrix
algebra rather thsn pairwise comparisons. This requires more memory, but
is much faster.
* Added [`threadpoolctl`](https://github.com/joblib/threadpoolctl/) as a
dependency, for setting the number of threads to use when parallelising
``numpy`` operations.
Fixed
......@@ -21,6 +27,8 @@ Fixed
* The :func:`.removeIfRedundant` processing function was not testing columns
with no missing values when a NA correlation threshold was being used.
* :func:`.removeIfRedundant` was also potentially producing inconsistent
results for columns with no present values, or with a constant value.
2.1.0 (Tuesday 21st April 2020)
......
......@@ -6,7 +6,7 @@
#
__version__ = '2.2.0.dev0'
__version__ = '2.2.0'
"""The ``funpack`` versioning scheme roughly follows Semantic Versioning
conventions.
"""
......
......@@ -17,6 +17,7 @@ import calendar
import pandas as pd
import pandas.api.types as pdtypes
import threadpoolctl
import funpack
import funpack.util as util
......@@ -50,6 +51,10 @@ def main(argv=None):
args, argv = config.parseArgsWithConfigFile(argv)
date = datetime.date.today()
# Set the number of threads
# that numpy should use
threadpoolctl.threadpool_limits(args.num_jobs)
# Now that args are passed,
# we can set up logging properly.
configLogging(args)
......
......@@ -134,14 +134,17 @@ def removeIfSparse(dtable,
@custom.processor()
def removeIfRedundant(dtable, vids, corrthres, nathres=None):
"""removeIfRedundant(corrthres, [nathres])
def removeIfRedundant(dtable, vids, corrthres, nathres=None, pairwise=False):
"""removeIfRedundant(corrthres, [nathres], [pairwise])
Removes columns deemed to be redundant.
Removes columns from the variables in ``vids`` if they are redundant.
Redundancy is determined by calculating the correlation between pairs
of columns - see the :func:`.isRedundant` function.
If ``pairwise`` is ``True``, an alternative implementation is used which
may be faster on data sets with high missingness correlation.
"""
# Ignore non-numeric columns
......@@ -150,22 +153,72 @@ def removeIfRedundant(dtable, vids, corrthres, nathres=None):
colnames = [c.name for c in cols]
data = dtable[:, colnames]
if pairwise:
redundant = _pairwiseRemoveIfRedundant(
dtable, data, corrthres, nathres)
else:
redundant = _removeIfRedundant(dtable, data, corrthres, nathres)
redundant = util.dedup(sorted(redundant))
if len(redundant) > 0:
log.debug('Dropping %u redundant columns: %s ...',
len(redundant), redundant[:5])
return [cols[r] for r in redundant]
def _removeIfRedundant(dtable, data, corrthres, nathres=None):
"""Default fast implementation of redundancy check. Used when the
``pairwise`` option to :func:`removeIfRedundant` is ``False``.
:arg dtable: The :class:`.DataTable` containing all data
:arg data: ``pandas.DataFrame`` containing the data to check
:arg corrthres: Correlation threshold - see :func:`.redundantColumns`.
:arg nathres: Missingness correlation threshold - see
:func:`.redundantColumns`.
:returns: A sequence of indices denoting the redundant columns.
"""
return core.matrixRedundantColumns(data, corrthres, nathres)
def _pairwiseRemoveIfRedundant(dtable, data, corrthres, nathres=None):
"""Alternative implementation of redundancy check. Used when the
``pairwise`` option to :func:`removeIfRedundant` is ``True``.
:arg dtable: The :class:`.DataTable` containing all data
:arg data: ``pandas.DataFrame`` containing the data to check
:arg corrthres: Correlation threshold - see :func:`.redundantColumns`.
:arg nathres: Missingness correlation threshold - see
:func:`.redundantColumns`.
:returns: A sequence of indices denoting the redundant columns.
"""
ncols = len(data.columns)
# If we are correlating missingness,
# we use the naRedundantColumns function
# to generate all of the column pairs
# we use the naCorrelation function
# to identify all of the column pairs
# which are na-correlated - the pairs
# which fail this test do not need to
# be subjected to the correlation test
# (and therefore pass the redundancy
# check)
if nathres is not None:
colpairs = core.naRedundantColumns(data, nathres)
nacorr = core.naCorrelation(pd.isna(data), nathres)
# As the matrix is symmetric, we can
# drop column pairs where x >= y.
nacorr = np.triu(nacorr, k=1)
colpairs = np.where(nacorr)
colpairs = np.vstack(colpairs).T
# Otherwise we generate an array
# containing indices of all column
# pairs.
else:
xs, ys = np.triu_indices(len(cols), k=1)
xs, ys = np.triu_indices(ncols, k=1)
colpairs = np.vstack((xs, ys)).T
# we need at least
......@@ -175,8 +228,9 @@ def removeIfRedundant(dtable, vids, corrthres, nathres=None):
# evaluate all pairs at once
if not dtable.parallel:
log.debug('Checking %u columns for redundancy', len(cols))
redundant = core.redundantColumns(data, colpairs, corrthres=corrthres)
log.debug('Checking %u columns for redundancy', ncols)
redundant = core.pairwiseRedundantColumns(
data, colpairs, corrthres=corrthres)
# evaluate in parallel
else:
......@@ -188,7 +242,7 @@ def removeIfRedundant(dtable, vids, corrthres, nathres=None):
for i in range(0, len(colpairs), chunksize)]
log.debug('Checking %u columns for redundancy (%u tasks)',
len(cols), len(pairchunks))
ncols, len(pairchunks))
with dtable.pool() as pool:
results = []
......@@ -199,7 +253,7 @@ def removeIfRedundant(dtable, vids, corrthres, nathres=None):
# read-accessible via shared memory.
token = 'task {} / {}'.format(i + 1, len(pairchunks))
result = pool.apply_async(
core.redundantColumns,
core.pairwiseRedundantColumns,
kwds=dict(data=data,
colpairs=chunk,
corrthres=corrthres,
......@@ -207,26 +261,13 @@ def removeIfRedundant(dtable, vids, corrthres, nathres=None):
results.append(result)
# wait for the tasks to complete,
# and gather the results (names of
# redundant columns) into a list
# and gather the results (indices
# of redundant columns) into a list
redundant = []
for result in results:
redundant.extend(result.get())
redundant = util.dedup(sorted(redundant))
if len(redundant) > 0:
log.debug('Dropping %u redundant columns: %s ...',
len(redundant), redundant[:5])
cols = collections.OrderedDict(zip(colnames, cols))
return [cols[r] for r in redundant]
# maximum chunk size (in bytes)
# that the removeIfRedundant function
# will pass to worker processes
removeIfRedundant.CHUNK_SIZE_LIMIT = 750000000
return redundant
@custom.processor(auxvids=['take'])
......
......@@ -11,8 +11,9 @@
:nosignatures:
isSparse
naRedundantColumns
redundantColumns
naCorrelation
pairwiseRedundantColumns
matrixRedundantColumns
binariseCategorical
expandCompound
"""
......@@ -178,63 +179,51 @@ def isSparse(data,
return False, None, None
def naRedundantColumns(data, nathres):
def naCorrelation(namask, nathres, nacounts=None):
"""Compares the missingness correlation of every pair of columns in
``data``.
``namask``.
:arg data: ``pandas.DataFrame`` containing the data
:arg namask: 2D ``numpy`` array of shape ``(nsamples, ncolumns)``
containing binary missingness classification
:arg nathres: Correlation threshold to use for missing values. If
provided, columns must have a correlation greater than
``corrthres`` *and* a missing-value correlation greater
than ``nathres`` to be identified as redundant.
:arg nathres: Threshold above which column pairs should be classed
as correlated.
:arg nacounts: 1D ``numpy`` array containing the umber of missing values
in each column. Calculated if not provided.
:returns: A ``(N, 2)`` ``numpy`` array containing the indices of
all column pairs which had a NA correlation greater than
or equal to ``nathres``.
:returns: A ``(ncolumns, ncolumns)`` boolean ``numpy`` array
containing ``True`` for column pairs which exceed
``nathres``, ``False`` otherwise.
"""
log.debug('Calculating missingness correlation '
'(%u columns)', len(data.columns))
namask = np.asarray(pd.isna(data), dtype=np.float32)
# normalise each column of the namask
# array to unit standard deviation, but
# only for those columns which have a
# stddev greater than 0 (those that
# correspond to columns with either all
# missing values, or no missing values).
nameans = namask.mean(axis=0)
nastds = namask.std( axis=0)
nahasdev = nastds > 0
namask -= nameans
namask[:, nahasdev] /= nastds[nahasdev]
nacorr = np.dot(namask.T, namask) / len(data)
# Insert nacorr=1 for all columns which
# do not have any missing values, as we
# want them to pass the test
nacorr[~nahasdev, :] = 1
nacorr[:, ~nahasdev] = 1
# Generate indices for column pairs
# which exceed the missingness correlation
# threshold. As the matrix is symmetric,
# we can drop column pairs where x >= y.
nacorr = np.abs(np.triu(nacorr, k=1))
nacolpairs = np.where(nacorr >= nathres)
nacolpairs = np.vstack(nacolpairs).T
log.debug('%u column pairs have missingness correlation '
'>= %0.2f', nacolpairs.shape[0], nathres)
return nacolpairs
def redundantColumns(data, colpairs, corrthres, token=None):
'(%u columns)', namask.shape[1])
if nacounts is None:
nacounts = namask.sum(axis=0)
# must be float32 to take
# advantage of parallelism
namask = namask.astype(np.float32)
nacorr = np.corrcoef(namask.T)
nacorr = np.abs(nacorr) > nathres
# columns with no/all missing values
# will contain invalid correlation
# values. But we pass them because
# these results will be combined with
# a data correlation test.
flatmask = (nacounts == 0) | (nacounts == namask.shape[0])
nacorr[flatmask, :] = True
nacorr[:, flatmask] = True
return nacorr
def pairwiseRedundantColumns(data, colpairs, corrthres, token=None):
"""Identifies redundant columns based on their correlation with each
other.
other by comparing each pair of columns one by one.
:arg data: ``pandas.DataFrame`` containing the data
......@@ -246,9 +235,12 @@ def redundantColumns(data, colpairs, corrthres, token=None):
:arg token: Identifier string for log messages.
:returns: List of redundant column names.
:returns: Sequence of redundant column indices.
"""
if len(colpairs) == 0:
return []
if token is None: token = ''
else: token = '[{}] '.format(token)
......@@ -264,7 +256,7 @@ def redundantColumns(data, colpairs, corrthres, token=None):
datai = data.iloc[:, coli]
dataj = data.iloc[:, colj]
corr = datai.corr(dataj)
corr = np.abs(datai.corr(dataj))
# i and j are highly correlated -
# flag the one with more missing
......@@ -273,15 +265,111 @@ def redundantColumns(data, colpairs, corrthres, token=None):
if nacounts[coli] > nacounts[colj]: drop, keep = coli, colj
else: drop, keep = colj, coli
drop = data.columns[drop]
keep = data.columns[keep]
log.debug('%sColumn %s is redundant (correlation with %s: %f)',
token, drop, keep, corr)
token, data.columns[drop], data.columns[keep], corr)
redundant.add(drop)
return list(redundant)
def matrixRedundantColumns(data, corrthres, nathres=None):
"""Identifies redundant columns based on their correlation with each
other using dot products to calculate a correlation matrix.
:arg data: ``pandas.DataFrame`` containing the data
:arg corrthres: Correlation threshold - columns with a correlation greater
than this are identified as redundant.
:arg nathres: Correlation threshold to use for missing values. If
provided, columns must have a correlation greater than
``corrthres`` *and* a missing-value correlation greater
than ``nathres`` to be identified as redundant.
:returns: Sequence of redundant column indices.
"""
if len(data.columns) < 2:
return []
data = data.to_numpy(dtype=np.float32, copy=True)
namask = np.isnan(data)
nacounts = namask.sum(axis=0)
# missingness correlation
if nathres is None: nacorr = None
else: nacorr = naCorrelation(namask, nathres, nacounts)
log.debug('Calculating pairwise correlations '
'(matrix shape: %s)', data.shape)
# zero out nan elements
data[namask] = 0
# p=present elements
namask = (~namask).astype(np.float32)
Ap = Bp = namask
A = B = data
# sum x_i y_i
xy = A.T @ B
# number of items defined both in x and y
n = Ap.T @ Bp
# mean values in x, calculated across items defined both in x and y
# mean values in y, calculated across items defined both in x and y
mx = (A.T @ Bp) / n
my = (Ap.T @ B) / n
# sum x^2_i, calculated across items defined both in x and y
# sum y^2_i, calculated across items defined both in x and y
x2 = (A * A).T @ Bp
y2 = Ap.T @ (B * B)
# sx, sy - standard deviations
# sx = sqrt(x2 - n .* (mx.^2));
# sy = sqrt(y2 - n .* (my.^2));
sx = x2 - n * (mx ** 2)
sx[sx < 0] = 0
sx = np.sqrt(sx)
sy = y2 - n * (my ** 2)
sy[sy < 0] = 0
sy = np.sqrt(sy)
# correlation coefficient
coef = (xy - n * mx * my) / (sx * sy)
# ignore nans/infs, binarise
coef[~np.isfinite(coef)] = 0
coef = np.abs(coef) > corrthres
# columns must have a correlation greater than
# corrthres and a missing-value correlation greater
# than nathres to be identified as redundant.
if nacorr is not None:
coef = coef & nacorr
# generate indices of correlated column
# pairs; we only need to consider one half
# of the triangle
coef = np.triu(coef, k=1)
colpairs = np.vstack(np.where(coef))
# for each correlated pair, we flag the
# one with more missing values as redundant
def mostna(coli, colj):
if nacounts[coli] > nacounts[colj]: return coli
else: return colj
mostna = np.vectorize(mostna, [np.uint32])
redundant = mostna(colpairs[0], colpairs[1])
return np.unique(redundant)
def binariseCategorical(data, minpres=None, take=None, token=None):
"""Takes one or more columns containing categorical data,, and generates a new
set of binary columns (as ``np.uint8``), one for each unique categorical
......
%% Cell type:markdown id: tags:
![win logo](win.png)
# `funpack` (https://git.fmrib.ox.ac.uk/fsl/funpack)
> Paul McCarthy &lt;paul.mccarthy@ndcn.ox.ac.uk&gt;
> ([WIN@FMRIB](https://www.win.ox.ac.uk/))
`funpack` is a command-line program which you can use to extract data from UK
BioBank (and other tabular) data.
You can give `funpack` one or more input files (e.g. `.csv`, `.tsv`), and it
will merge them together, perform some preprocessing, and produce a single
output file.
A large number of rules are built into `funpack` which are specific to the UK
BioBank data set. But you can control and customise everything that `funpack`
does to your data, including which rows and columns to extract, and which
cleaning/processing steps to perform on each column.
`funpack` comes installed with recent versions of
[FSL](https://fsl.fmrib.ox.ac.uk/fsl/fslwiki/). You can also install `funpack`
via `conda`:
> ```
> conda install -c conda-forge fmrib-unpack
> ```
Or using `pip`:
> ```
> pip install fmrib-unpack
> ```
Get command-line help by typing:
> ```
> funpack -h
> ```
**Important** The examples in this notebook assume that you have installed `funpack`
2.2.0.dev0 or newer.
2.2.0 or newer.
%% Cell type:code id: tags:
``` bash
funpack -V
```
%% Cell type:markdown id: tags:
### Contents
1. [Overview](#Overview)
1. [Import](#1.-Import)
2. [Cleaning](#2.-Cleaning)
3. [Processing](#3.-Processing)
4. [Export](#4.-Export)
2. [Examples](#Examples)
3. [Import examples](#Import-examples)
1. [Selecting variables (columns)](#Selecting-variables-(columns))
1. [Selecting individual variables](#Selecting-individual-variables)
2. [Selecting variable ranges](#Selecting-variable-ranges)
3. [Selecting variables with a file](#Selecting-variables-with-a-file)
4. [Selecting variables from pre-defined categories](#Selecting-variables-from-pre-defined-categories)
2. [Selecting subjects (rows)](#Selecting-subjects-(rows))
1. [Selecting individual subjects](#Selecting-individual-subjects)
2. [Selecting subject ranges](#Selecting-subject-ranges)
3. [Selecting subjects from a file](#Selecting-subjects-from-a-file)
4. [Selecting subjects by variable value](#Selecting-subjects-by-variable-value)
5. [Excluding subjects](#Excluding-subjects)
3. [Selecting visits](#Selecting-visits)
1. [Evaluating expressions across visits](#Evaluating-expressions-across-visits)
4. [Merging multiple input files](#Merging-multiple-input-files)
1. [Merging by subject](#Merging-by-subject)
2. [Merging by column](#Merging-by-column)
3. [Naive merging](#Merging-by-column)
4. [Cleaning examples](#Cleaning-examples)
1. [NA insertion](#NA-insertion)
2. [Variable-specific cleaning functions](#Variable-specific-cleaning-functions)
3. [Categorical recoding](#Categorical-recoding)
4. [Child value replacement](#Child-value-replacement)
5. [Processing examples](#Processing-examples)
1. [Sparsity check](#Sparsity-check)
2. [Redundancy check](#Redundancy-check)
3. [Categorical binarisation](#Categorical-binarisation)
6. [Custom cleaning, processing and loading - funpack plugins](#Custom-cleaning,-processing-and-loading---funpack-plugins)
1. [Custom cleaning functions](#Custom-cleaning-functions)
2. [Custom processing functions](#Custom-processing-functions)
3. [Custom file loaders](#Custom-file-loaders)
7. [Miscellaneous topics](#Miscellaneous-topics)
1. [Non-numeric data](#Non-numeric-data)
2. [Dry run](#Dry-run)
3. [Built-in rules](#Built-in-rules)
4. [Using a configuration file](#Using-a-configuration-file)
5. [Working with unknown/uncategorised variables](#Working-with-unknown/uncategorised-variables)
# Overview
`funpack` performs the following steps:
## 1. Import
All data files are loaded in, unwanted columns and subjects are dropped, and
the data files are merged into a single table (a.k.a. data frame). Multiple
files can be merged according to an index column (e.g. subject ID). Or, if the
input files contain the same columns/subjects, they can be naively
concatenated along rows or columns.
## 2. Cleaning
The following cleaning steps are applied to each column:
1. **NA value replacement:** Specific values for some columns are replaced
with NA, for example, variables where a value of `-1` indicates *Do not
know*.
2. **Variable-specific cleaning functions:** Certain columns are
re-formatted; for example, the [ICD10](https://en.wikipedia.org/wiki/ICD-10)
disease codes can be converted to integer representations.
3. **Categorical recoding:** Certain categorical columns are re-coded.