main.py 18.1 KB
Newer Older
1
2
#!/usr/bin/env python
#
3
# main.py - funpack entry point
4
5
6
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
7
"""This module contains the ``funpack`` entry point. """
8

9

10
import multiprocessing     as mp
11
import os.path             as op
12
13
14
15
16
import                        sys
import                        logging
import                        warnings
import                        datetime
import                        calendar
17

Paul McCarthy's avatar
Paul McCarthy committed
18
19
import pandas              as pd

20
21
22
23
24
25
26
27
28
29
30
31
import funpack
import funpack.util        as util
import funpack.icd10       as icd10
import funpack.config      as config
import funpack.custom      as custom
import funpack.dryrun      as dryrun
import funpack.cleaning    as cleaning
import funpack.importing   as importing
import funpack.exporting   as exporting
import funpack.hierarchy   as hierarchy
import funpack.processing  as processing
import funpack.loadtables  as loadtables
32
33


34
35
36
log = logging.getLogger(__name__)


37
def main(argv=None):
38
    """``funpack`` entry point. """
Paul McCarthy's avatar
Paul McCarthy committed
39

40
41
42
43
44
45
    # Make sure built in plugins are
    # registered, as they are queried
    # in the command-line help. Set
    # logging to critical until we've
    # parsed command-line args.
    logging.getLogger().setLevel(logging.CRITICAL)
46
47
    custom.registerBuiltIns()

48
    args, argv = config.parseArgsWithConfigFile(argv)
49
    date = datetime.date.today()
50

51
52
    # Now that args are passed,
    # we can set up logging properly.
Paul McCarthy's avatar
Paul McCarthy committed
53
54
    configLogging(args)

55
    log.info('funpack %s', funpack.__version__)
56
    log.info('Date: %s (%s)', date.today(), calendar.day_name[date.weekday()])
57
    log.info('Command-line arguments %s', ' '.join(argv))
58
    log.debug('Running with the following options')
Paul McCarthy's avatar
Paul McCarthy committed
59
60
    for name, val in args.__dict__.items():
        if val is not None:
61
62
            val = str(val)
            if len(val) <= 30: log.debug('  %s: %s',    name, val)
63
            else:              log.debug('  %s: %s...', name, val[:30])
Paul McCarthy's avatar
Paul McCarthy committed
64

65
    # Load any custom plugins
66
    # that have been specified.
67
68
69
    if args.plugin_file is not None:
        for p in args.plugin_file:
            custom.loadPluginFile(p)
70

71
72
73
74
    # default output format inferred
    # from output filename or, failing
    # that, tsv
    if args.format is None:
75
76
77
        fmt = op.splitext(args.outfile)[1].lower().strip('.')
        if fmt in ('h5', 'hdf'):
            fmt = 'hdf5'
78
79
80
81
        if not custom.exists('exporter', fmt):
            fmt = 'tsv'
        args.format = fmt

82
83
84
85
86
87
88
89
    # error if any loaders/formats are
    # invalid (we can only perform this
    # check after plugins have been
    # loaded)
    if args.loader is not None:
        for f, l in args.loader.items():
            if not custom.exists('loader', l):
                raise ValueError('Unknown loader {} [{}]'.format(l, f))
90
91
    if not custom.exists('exporter', args.format):
        raise ValueError('Unknown output format {}'.format(args.format))
92
93
94
95
96
97
    if args.date_format is not None and \
       not custom.exists('formatter', args.date_format):
        raise ValueError('Unknown date format {}'.format(args.date_format))
    if args.time_format is not None and \
       not custom.exists('formatter', args.time_format):
        raise ValueError('Unknown time format {}'.format(args.time_format))
Paul McCarthy's avatar
Paul McCarthy committed
98
99
    if args.tsv_var_format is not None:
        for v, f in args.tsv_var_format.items():
100
101
102
            if not custom.exists('formatter', f):
                raise ValueError('Unknown formatter {} [{}]'.format(f, v))

103
104
    if args.num_jobs > 1:
        log.debug('Running up to %i jobs in parallel', args.num_jobs)
105
106
107
108
        mgr = mp.Manager()

        # We need to initialise icd10
        # before the worker processes
109
110
        # are created, so its state is
        # shared by all processes.
111
112
        icd10.initialise(mgr)

113
        pool = mp.Pool(args.num_jobs)
114

115
116
    else:
        pool = None
117
        mgr  = None
118

119
    try:
120
        with util.timed(None, log, fmt='Total time: %s (%+iMB)'):
121

122
            dtable, unknowns, uncategorised, drop = doImport(args, pool, mgr)
Paul McCarthy's avatar
Paul McCarthy committed
123
124

            if args.dry_run:
125
                dryrun.doDryRun(dtable, unknowns, uncategorised, drop, args)
Paul McCarthy's avatar
Paul McCarthy committed
126
127
            else:
                doCleanAndProcess(  dtable, args)
128
                doUnknownsExport(   dtable, args, unknowns, uncategorised)
Paul McCarthy's avatar
Paul McCarthy committed
129
130
                doExport(           dtable, args)
                doICD10Export(              args)
131
                doDescriptionExport(dtable, args)
132
                doSummaryExport(    dtable, args)
133
134
135
136
137
138
139

    finally:
        # shutdown the pool gracefully
        if pool is not None:
            pool.close()
            pool.join()
            pool = None
140

141
142
143
    return 0


144
def doImport(args, pool, mgr):
145
146
147
148
149
    """Data import stage.

    :arg args: :class:`argparse.Namespace` object containing command line
               arguments
    :arg pool: :class:`multiprocessing.Pool` object for parallelisation (may
150
151
               be ``None``)
    :arg mgr:  :class:`multiprocessing.Manager` object for parallelisation (may
152
               be ``None``)
153
154
155
156
157
158

    :returns:  A tuple containing:

                - A :class:`.DataTable` containing the data
                - A sequence of :class:`.Column` objects representing the
                  unknown columns.
Paul McCarthy's avatar
Paul McCarthy committed
159
160
161
                - A sequence of :class:`.Column` objects representing columns
                  which are uncategorised, and have no processing or cleaning
                  rules specified on them.
162
163
                - A list of :class:`.Column` objects that were not loaded from
                  each input file.
164
    """
165

166
    with util.timed('Table import', log):
167
        vartable, proctable, cattable, unknowns, uncategorised = \
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
            loadtables.loadTables(
                args.infile,
                args.variable_file,
                args.datacoding_file,
                args.type_file,
                args.processing_file,
                args.category_file,
                noBuiltins=args.no_builtins,
                naValues=args.na_values,
                childValues=args.child_values,
                recoding=args.recoding,
                clean=args.clean,
                typeClean=args.type_clean,
                globalClean=args.global_clean,
                skipProcessing=args.skip_processing,
                prependProcess=args.prepend_process,
                appendProcess=args.append_process,
                sniffers=args.loader,
                indexes=args.index)
187

188
    subjects, exprs = args.subject
189
190
191
    variables       = args.variable
    categories      = args.category
    columns         = args.column
192

193
194
    # Import data
    with util.timed('Data import', log):
195
        dtable, drop = importing.importData(
196
197
198
199
            datafiles=args.infile,
            vartable=vartable,
            proctable=proctable,
            cattable=cattable,
200
            variables=variables,
201
            colnames=columns,
202
            categories=categories,
203
            subjects=subjects,
204
205
            subjectExprs=exprs,
            exclude=args.exclude,
206
            encoding=args.encoding,
207
            indexes=args.index,
208
            trustTypes=args.trust_types,
209
210
            mergeAxis=args.merge_axis,
            mergeStrategy=args.merge_strategy,
211
            indexVisits=args.index_visits,
212
            loaders=args.loader,
213
            pool=pool,
Paul McCarthy's avatar
Paul McCarthy committed
214
215
            mgr=mgr,
            dryrun=args.dry_run)
216

217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
    # Filter unknown/uncategorised
    # column lists to only contain
    # those that were loaded
    allcols       = dtable.dataColumns
    unknowns      = [c for c in unknowns      if c in allcols]
    uncategorised = [c for c in uncategorised if c in allcols]

    # if it appears that we're doing
    # a full run on a large data set,
    # emit warnings about unknown/
    # uncategorised variables.
    bigrun = any((args.variable_file   is not None,
                  args.datacoding_file is not None,
                  args.processing_file is not None,
                  args.category_file   is not None))

    if bigrun:
        for u in unknowns:
            log.warning('Variable %s [file %s, column %s, assigned '
                        'variable ID %s] is unknown.',
                        u.name, u.datafile, u.index, u.vid)
        for u in uncategorised:
            log.warning('Variable %s [file %s, column %s, assigned '
                        'variable ID %s] is uncategorised and does not '
                        'have any cleaning or processing rules set.',
                        u.name, u.datafile, u.index, u.vid)

244
    return dtable, unknowns, uncategorised, drop
245
246


247
def doCleanAndProcess(dtable, args):
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
    """Data cleaning and processing stage.

    :arg dtable: :class:`.DataTable` containing the data
    :arg args:   :class:`argparse.Namespace` object containing command line
                 arguments
    :arg pool:   :class:`multiprocessing.Pool` object for parallelisation (may
                 be ``None``)
    """

    # Clean data (it times each step individually)
    cleaning.cleanData(
        dtable,
        skipNAInsertion=args.skip_insertna,
        skipCleanFuncs=args.skip_clean_funcs,
        skipChildValues=args.skip_childvalues,
        skipRecoding=args.skip_recoding)

    # Process data
    with util.timed('Data processing', log):
        processing.processData(dtable)


270
def doExport(dtable, args):
271
272
273
274
275
276
    """Data export stage.

    :arg dtable: :class:`.DataTable` containing the data
    :arg args:   :class:`argparse.Namespace` object containing command line
                 arguments
    """
277

278
279
280
281
282
283
284
    # If exporting to TSV, and not parallelising,
    # we export the entire file in one go. Because
    # what's the point in chunked export if we're
    # not parallelising across chunks?
    if args.num_jobs <= 1:
        args.num_rows = len(dtable)

285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
    with util.timed('Data export', log):
        exporting.exportData(
            dtable,
            args.outfile,

            # General export options
            fileFormat=args.format,
            dateFormat=args.date_format,
            timeFormat=args.time_format,
            numRows=args.num_rows,

            # TSV options
            sep=args.tsv_sep,
            missingValues=args.tsv_missing_values,
            formatters=args.tsv_var_format,
300
            nonNumericFile=args.non_numeric_file,
301
302
303
304

            # HDF5 options
            key=args.hdf5_key,
            style=args.hdf5_style)
305
306


307
308
309
310
311
312
313
314
315
def doICD10Export(args):
    """If a ``--icd10_map_file`` has been specified, the ICD10 codes present
    in the data (and their converted values) are saved out to the file.
    """
    if args.icd10_map_file is None:
        return

    with util.timed('ICD10 mapping export', log):
        try:
316
317
            ihier = hierarchy.getHierarchyFilePath(name='icd10')
            ihier = hierarchy.loadHierarchyFile(ihier)
318
            icd10.saveCodes(args.icd10_map_file, ihier)
319
320
321
322
323
324

        except Exception as e:
            log.warning('Failed to export ICD10 mappings: {}'.format(e),
                        exc_info=True)


325
326
def doDescriptionExport(dtable, args):
    """If a ``--description_file`` has been specified, a description for every
327
    column is saved out to the file.
328
329
330
331
332
    """
    if args.description_file is None:
        return

    with util.timed('Description export', log):
333
        cols = dtable.dataColumns
334
335
336

        try:
            with open(args.description_file, 'wt') as f:
337
338
339
                for col in cols:
                    desc = generateDescription(dtable, col)
                    f.write('{}\t{}\n'.format(col.name, desc))
340
341
342
343
344
345

        except Exception as e:
            log.warning('Failed to export descriptions: {}'.format(e),
                        exc_info=True)


346
347
348
349
350
351
352
353
354
355
def generateDescription(dtable, col):
    """Called by :func:`doDescriptionExport`. Generates and returns a
    suitable description for the given column.

    :arg dtable: :class:`.Datatable` instance
    :arg col:    :class:`.Column` instance
    """
    vartable = dtable.vartable
    desc     = vartable.loc[col.vid, 'Description']

Paul McCarthy's avatar
Paul McCarthy committed
356
    if pd.isna(desc) or (desc == col.name):
357
358
359
360
361
362
363
364
365
366
367
368
369
370
        desc = 'n/a'

    # If metadata has been added to the column,
    # we add it to the description. See the
    # binariseCategorical processing function
    # for an example of this.
    if col.metadata is not None:
        suffix = ' ({})'.format(col.metadata)
    else:
        suffix = ' ({}.{})'.format(col.visit, col.instance)

    return '{}{}'.format(desc, suffix)


371
def doUnknownsExport(dtable, args, unknowns, uncategorised):
372
373
374
    """If the ``--unknown_vars_file`` argument was used, the unknown/
    unprocessed columns are saved out to a file.

375
376
377
378
379
380
381
382
    :arg dtable:        :class:`.DataTable` containing the data
    :arg args:          :class:`argparse.Namespace` object containing command
                        line arguments
    :arg unknowns:      List of :class:`.Column` objects representing the
                        unknown columns.
    :arg uncategorised: A sequence of :class:`.Column` objects representing
                        columns which are uncategorised, and have no processing
                        or cleaning rules specified on them.
383
384
    """

385
    if args.unknown_vars_file is None:
386
387
        return

388
389
390
    if len(unknowns) + len(uncategorised) == 0:
        return

391
392
    # Save unknown/uncategorised
    # vars list to file columns:
393
394
    #  - name      - column name
    #  - file      - originating input file
395
    #  - class     - unknown or uncategorised
396
397
    #  - exported  - whether column passed processing and was exported
    allcols     = list(dtable.dataColumns)
398
    allunknowns = list(unknowns + uncategorised)
399

400
401
402
403
404
405
406
    names       = [u.name            for u in allunknowns]
    files       = [u.datafile        for u in allunknowns]
    classes     = ['unknown'         for u in unknowns] + \
                  ['uncategorised'   for u in uncategorised]
    exported    = [int(u in allcols) for u in allunknowns]
    rows        = ['{}\t{}\t{}\t{}'.format(n, f, c, e)
                   for n, f, c, e in zip(names, files, classes, exported)]
407

408
    log.debug('Saving unknown/uncategorised variables to %s',
409
410
411
412
              args.unknown_vars_file)

    try:
        with open(args.unknown_vars_file, 'wt') as f:
413
            f.write('name\tfile\tclass\texported\n')
414
415
416
417
418
419
420
421
            f.write('\n'.join(rows))

    except Exception as e:
        log.warning('Error saving unknown variables to {}: '
                    '{}'.format(args.unknown_vars_file, e),
                    exc_info=True)


422
423
424
425
426
427
428
429
430
def doSummaryExport(dtable, args):
    """If a ``--summary_file`` has been specified, a summary of the cleaning
    steps that have been applied to each variable are saved out to the file.
    """
    if args.summary_file is None:
        return

    vartable = dtable.vartable
    vids     = sorted(dtable.variables)[1:]
431
    sumdf    = pd.DataFrame(columns=['ID', 'NAValues',
432
433
434
435
436
437
                                     'RawLevels', 'NewLevels',
                                     'ParentValues', 'ChildValues',
                                     'Clean', 'Flags']).set_index('ID')

    with util.timed('Summary export', log):
        for vid in vids:
438
439
440
441
442
            sumdf.at[vid, 'NAValues']     = vartable.at[vid, 'NAValues']
            sumdf.at[vid, 'RawLevels']    = vartable.at[vid, 'RawLevels']
            sumdf.at[vid, 'NewLevels']    = vartable.at[vid, 'NewLevels']
            sumdf.at[vid, 'ParentValues'] = vartable.at[vid, 'ParentValues']
            sumdf.at[vid, 'ChildValues']  = vartable.at[vid, 'ChildValues']
443
444
445

            clean = vartable.at[vid, 'Clean']
            if pd.notna(clean):
446
                sumdf.at[vid, 'Clean'] = list(clean.values())
447

448
449
450
451
452
453
454
455
456
457
458
459
            flagstr  = []
            cols     = dtable.columns(vid)
            colflags = {c : dtable.getFlags(c) for c in cols}
            flags    = set.union(*colflags.values())

            for flag in flags:
                if all([flag in colflags[c] for c in cols]):
                    flagstr.append(flag)
                else:
                    names = [c.name for c in cols if flag in colflags[c]]
                    flagstr.append('{} [{}]'.format(flag, ', '.join(names)))

460
            sumdf.at[vid, 'Flags'] = ';'.join(flagstr)
461

462
        sumdf.to_csv(args.summary_file, sep='\t')
463

464

465
def configLogging(args):
466
    """Configures ``funpack`` logging.
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490

    :arg args: ``argparse.Namespace`` object containing parsed command line
               arguments.
    """

    # Custom log handler which
    # colours messages
    class LogHandler(logging.StreamHandler):

        def emit(self, record):

            levelno = record.levelno

            if   levelno >= logging.WARNING:  colour = '\x1b[31;1m'
            elif levelno >= logging.INFO:     colour = '\x1b[39;1m'
            elif levelno >= logging.DEBUG:    colour = '\x1b[90;1m'
            else:                             colour = ''

            # Reset terminal attributes
            # after each message.
            record.msg = '{}{}\x1b[0m'.format(colour, record.msg)

            return super(LogHandler, self).emit(record)

491
    logger = logging.getLogger('funpack')
492
493
    fmt    = logging.Formatter('%(asctime)s '
                               '%(levelname)8.8s '
494
495
496
                               '%(filename)20.20s '
                               '%(lineno)4d: '
                               '%(funcName)-15.15s - '
497
498
                               '%(message)s',
                               '%H:%M:%S')
499
500
501

    if args.log_file is None: handler = LogHandler()
    else:                     handler = logging.FileHandler(args.log_file)
502
503
504
505
506
507
508
509
510

    handler.setFormatter(fmt)
    logger.addHandler(handler)

    # configure verbosity
    if   args.quiet:      loglevel = logging.CRITICAL
    elif args.noisy == 0: loglevel = logging.INFO
    else:                 loglevel = logging.DEBUG

511
    logging.getLogger('funpack').setLevel(loglevel)
512

513
    if args.quiet or args.noisy < 3:
514
515
        warnings.filterwarnings('ignore',  module='pandas')
        warnings.filterwarnings('ignore',  module='numpy')
Paul McCarthy's avatar
Paul McCarthy committed
516
        warnings.filterwarnings('ignore',  module='tables')
517
518

    if args.noisy == 1:
519
520
521
522
        makequiet = ['funpack.expression',
                     'funpack.custom',
                     'funpack.cleaning_functions',
                     'funpack.processing_functions']
523
    elif args.noisy == 2:
524
525
        makequiet = ['funpack.expression',
                     'funpack.custom']
526
527
528
529
530
531
532
    else:
        makequiet = []

    for mod in makequiet:
        logging.getLogger(mod).setLevel(logging.INFO)


533
534
if __name__ == '__main__':
    sys.exit(main())