From 7414c862db1c7f10a00d465b3d86ef9f2f4c952b Mon Sep 17 00:00:00 2001 From: Paul McCarthy <pauldmccarthy@gmail.com> Date: Mon, 9 Mar 2020 10:42:57 +0000 Subject: [PATCH] Nearly there - just need a mac version of memusage --- advanced_topics/07_threading.ipynb | 148 ++++++++++++++++------------- advanced_topics/07_threading.md | 144 ++++++++++++++++------------ 2 files changed, 164 insertions(+), 128 deletions(-) diff --git a/advanced_topics/07_threading.ipynb b/advanced_topics/07_threading.ipynb index 6f2b57d..c602e47 100644 --- a/advanced_topics/07_threading.ipynb +++ b/advanced_topics/07_threading.ipynb @@ -11,14 +11,12 @@ "[`threading`](https://docs.python.org/3/library/threading.html) module, and\n", "true parallelism in the\n", "[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)\n", - "and\n", - "[`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html)\n", - "modules. If you want to be impressed, skip straight to the section on\n", + "module. If you want to be impressed, skip straight to the section on\n", "[`multiprocessing`](todo).\n", "\n", "\n", "> *Note*: If you are familiar with a \"real\" programming language such as C++\n", - "> or Java, you will be disappointed with the native support for parallelism in\n", + "> or Java, you might be disappointed with the native support for parallelism in\n", "> Python. Python threads do not run in parallel because of the Global\n", "> Interpreter Lock, and if you use `multiprocessing`, be prepared to either\n", "> bear the performance hit of copying data between processes, or jump through\n", @@ -374,6 +372,11 @@ "the `threading` module, and a powerful higher-level API.\n", "\n", "\n", + "> Python also provides the\n", + "> [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html)\n", + "> module, which offers a simpler alternative API to `multiprocessing`. It\n", + "> offers no functionality over `multiprocessing`, so is not covered here.\n", + "\n", "### `threading`-equivalent API\n", "\n", "\n", @@ -413,6 +416,21 @@ "`map`, `starmap` and `apply_async` methods.\n", "\n", "\n", + "\n", + "The `Pool` class is a context manager, so can be used in a `with` statement,\n", + "e.g.:\n", + "\n", + "> ```\n", + "> with mp.Pool(processes=16) as pool:\n", + "> # do stuff with the pool\n", + "> ```\n", + "\n", + "It is possible to create a `Pool` outside of a `with` statement, but in this\n", + "case you must ensure that you call its `close` mmethod when you are finished.\n", + "Using a `Pool` in a `with` statement is therefore recommended, because you know\n", + "that it will be shut down correctly, even in the event of an error.\n", + "\n", + "\n", "> The best number of processes to use for a `Pool` will depend on the system\n", "> you are running on (number of cores), and the tasks you are running (e.g.\n", "> I/O bound or CPU bound).\n", @@ -459,13 +477,14 @@ "\n", "imgfiles = ['{:02d}.nii.gz'.format(i) for i in range(20)]\n", "\n", - "p = mp.Pool(processes=16)\n", - "\n", "print('Crunching images...')\n", "\n", - "start = time.time()\n", - "results = p.map(crunchImage, imgfiles)\n", - "end = time.time()\n", + "start = time.time()\n", + "\n", + "with mp.Pool(processes=16) as p:\n", + " results = p.map(crunchImage, imgfiles)\n", + "\n", + "end = time.time()\n", "\n", "print('Total execution time: {:0.2f} seconds'.format(end - start))" ] @@ -505,15 +524,16 @@ " ['t2_{:02d}.nii.gz'.format(i) for i in range(10)]\n", "modalities = ['t1'] * 10 + ['t2'] * 10\n", "\n", - "pool = mp.Pool(processes=16)\n", - "\n", "args = [(f, m) for f, m in zip(imgfiles, modalities)]\n", "\n", "print('Crunching images...')\n", "\n", - "start = time.time()\n", - "results = pool.starmap(crunchImage, args)\n", - "end = time.time()\n", + "start = time.time()\n", + "\n", + "with mp.Pool(processes=16) as pool:\n", + " results = pool.starmap(crunchImage, args)\n", + "\n", + "end = time.time()\n", "\n", "print('Total execution time: {:0.2f} seconds'.format(end - start))" ] @@ -574,24 +594,24 @@ "t1s = ['{:02d}_t1.nii.gz'.format(i) for i in range(20)]\n", "std = 'MNI152_T1_2mm.nii.gz'\n", "\n", - "pool = mp.Pool(processes=16)\n", - "\n", "print('Running structural-to-standard registration '\n", " 'on {} subjects...'.format(len(t1s)))\n", "\n", "# Run linear registration on all the T1s.\n", - "#\n", - "# We build a list of AsyncResult objects\n", - "linresults = [pool.apply_async(linear_registration, (t1, std))\n", - " for t1 in t1s]\n", - "\n", - "# Then we wait for each job to finish,\n", - "# and replace its AsyncResult object\n", - "# with the actual result - an affine\n", - "# transformation matrix.\n", "start = time.time()\n", - "for i, r in enumerate(linresults):\n", - " linresults[i] = r.get()\n", + "with mp.Pool(processes=16) as pool:\n", + "\n", + " # We build a list of AsyncResult objects\n", + " linresults = [pool.apply_async(linear_registration, (t1, std))\n", + " for t1 in t1s]\n", + "\n", + " # Then we wait for each job to finish,\n", + " # and replace its AsyncResult object\n", + " # with the actual result - an affine\n", + " # transformation matrix.\n", + " for i, r in enumerate(linresults):\n", + " linresults[i] = r.get()\n", + "\n", "end = time.time()\n", "\n", "print('Linear registrations completed in '\n", @@ -599,14 +619,16 @@ "\n", "# Run non-linear registration on all the T1s,\n", "# using the linear registrations to initialise.\n", - "nlinresults = [pool.apply_async(nonlinear_registration, (t1, std, aff))\n", - " for (t1, aff) in zip(t1s, linresults)]\n", - "\n", - "# Wait for each non-linear reg to finish,\n", - "# and store the resulting warp field.\n", "start = time.time()\n", - "for i, r in enumerate(nlinresults):\n", - " nlinresults[i] = r.get()\n", + "with mp.Pool(processes=16) as pool:\n", + " nlinresults = [pool.apply_async(nonlinear_registration, (t1, std, aff))\n", + " for (t1, aff) in zip(t1s, linresults)]\n", + "\n", + " # Wait for each non-linear reg to finish,\n", + " # and store the resulting warp field.\n", + " for i, r in enumerate(nlinresults):\n", + " nlinresults[i] = r.get()\n", + "\n", "end = time.time()\n", "\n", "print('Non-linear registrations completed in '\n", @@ -714,7 +736,9 @@ "metadata": {}, "outputs": [], "source": [ - "import time\n", + "import time\n", + "import multiprocessing as mp\n", + "import numpy as np\n", "\n", "memusage('before creating data')\n", "\n", @@ -734,22 +758,24 @@ " time.sleep(1)\n", " return data[offset:offset + nelems].sum()\n", "\n", + "# Generate an offset into the data for each job -\n", + "# we will call process_chunk for each offset\n", + "offsets = range(0, len(data), nelems)\n", + "\n", "# Create our worker process pool\n", - "pool = mp.Pool(4)\n", + "with mp.Pool(4) as pool:\n", "\n", - "# Generate an offset into the data for each\n", - "# job, and call process_chunk for each offset\n", - "offsets = range(0, len(data), nelems)\n", - "results = pool.map_async(process_chunk, offsets)\n", + " results = pool.map_async(process_chunk, offsets)\n", "\n", - "# Wait for all of the jobs to finish\n", - "elapsed = 0\n", - "while not results.ready():\n", - " memusage('after {} seconds'.format(elapsed))\n", - " time.sleep(1)\n", - " elapsed += 1\n", + " # Wait for all of the jobs to finish\n", + " elapsed = 0\n", + " while not results.ready():\n", + " memusage('after {} seconds'.format(elapsed))\n", + " time.sleep(1)\n", + " elapsed += 1\n", + "\n", + " results = results.get()\n", "\n", - "results = results.get()\n", "print('Total sum: ', sum(results))\n", "print('Sanity check:', data.sum())" ] @@ -773,7 +799,12 @@ " > data[offset:offset + nelems] += 1\n", " > ```\n", "\n", - "2. Re-run the code block, and watch what happens to the memory usage.\n", + "2. Restart the Jupyter notebook kernel (*Kernel -> Restart*) - this example is\n", + " somewhat dependent on the behaviour of the Python garbage collector, so it\n", + " helps to start afresh\n", + "\n", + "\n", + "2. Re-run the two code blocks, and watch what happens to the memory usage.\n", "\n", "\n", "What happened? Well, you are seeing [copy-on-write](wiki-copy-on-write) in\n", @@ -951,9 +982,8 @@ "\n", " # Create a pool of worker\n", " # processes and run the jobs.\n", - " pool = mp.Pool(processes=nprocs)\n", - "\n", - " pool.starmap(process_chunk, args)\n", + " with mp.Pool(processes=nprocs) as pool:\n", + " pool.starmap(process_chunk, args)\n", "\n", " return outdata" ] @@ -971,8 +1001,7 @@ "metadata": {}, "outputs": [], "source": [ - "data = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64)\n", - "\n", + "indata = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64)\n", "outdata = process_dataset(data)\n", "\n", "print('Input')\n", @@ -981,19 +1010,6 @@ "print('Output')\n", "print(outdata)" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## The `concurrent.futures` library\n", - "\n", - "\n", - "The `concurrent.futures` module provides a simpler alternative API to the\n", - "`multiprocessing` module - it focuses specifically on asynchronous execution\n", - "of tasks, so can be used instead of the `Pool.map_async` and\n", - "`Pool.apply_async` methods." - ] } ], "metadata": {}, diff --git a/advanced_topics/07_threading.md b/advanced_topics/07_threading.md index 25cac53..f46afaa 100644 --- a/advanced_topics/07_threading.md +++ b/advanced_topics/07_threading.md @@ -5,14 +5,12 @@ The Python language has built-in support for multi-threading in the [`threading`](https://docs.python.org/3/library/threading.html) module, and true parallelism in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) -and -[`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) -modules. If you want to be impressed, skip straight to the section on +module. If you want to be impressed, skip straight to the section on [`multiprocessing`](todo). > *Note*: If you are familiar with a "real" programming language such as C++ -> or Java, you will be disappointed with the native support for parallelism in +> or Java, you might be disappointed with the native support for parallelism in > Python. Python threads do not run in parallel because of the Global > Interpreter Lock, and if you use `multiprocessing`, be prepared to either > bear the performance hit of copying data between processes, or jump through @@ -303,6 +301,11 @@ from. It provides two APIs - a "traditional" equivalent to that provided by the `threading` module, and a powerful higher-level API. +> Python also provides the +> [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) +> module, which offers a simpler alternative API to `multiprocessing`. It +> offers no functionality over `multiprocessing`, so is not covered here. + ### `threading`-equivalent API @@ -342,6 +345,21 @@ use its methods to automatically parallelise tasks. The most useful are the `map`, `starmap` and `apply_async` methods. + +The `Pool` class is a context manager, so can be used in a `with` statement, +e.g.: + +> ``` +> with mp.Pool(processes=16) as pool: +> # do stuff with the pool +> ``` + +It is possible to create a `Pool` outside of a `with` statement, but in this +case you must ensure that you call its `close` mmethod when you are finished. +Using a `Pool` in a `with` statement is therefore recommended, because you know +that it will be shut down correctly, even in the event of an error. + + > The best number of processes to use for a `Pool` will depend on the system > you are running on (number of cores), and the tasks you are running (e.g. > I/O bound or CPU bound). @@ -383,13 +401,14 @@ def crunchImage(imgfile): imgfiles = ['{:02d}.nii.gz'.format(i) for i in range(20)] -p = mp.Pool(processes=16) - print('Crunching images...') -start = time.time() -results = p.map(crunchImage, imgfiles) -end = time.time() +start = time.time() + +with mp.Pool(processes=16) as p: + results = p.map(crunchImage, imgfiles) + +end = time.time() print('Total execution time: {:0.2f} seconds'.format(end - start)) ``` @@ -421,15 +440,16 @@ imgfiles = ['t1_{:02d}.nii.gz'.format(i) for i in range(10)] + \ ['t2_{:02d}.nii.gz'.format(i) for i in range(10)] modalities = ['t1'] * 10 + ['t2'] * 10 -pool = mp.Pool(processes=16) - args = [(f, m) for f, m in zip(imgfiles, modalities)] print('Crunching images...') -start = time.time() -results = pool.starmap(crunchImage, args) -end = time.time() +start = time.time() + +with mp.Pool(processes=16) as pool: + results = pool.starmap(crunchImage, args) + +end = time.time() print('Total execution time: {:0.2f} seconds'.format(end - start)) ``` @@ -482,24 +502,24 @@ def nonlinear_registration(src, ref, affine): t1s = ['{:02d}_t1.nii.gz'.format(i) for i in range(20)] std = 'MNI152_T1_2mm.nii.gz' -pool = mp.Pool(processes=16) - print('Running structural-to-standard registration ' 'on {} subjects...'.format(len(t1s))) # Run linear registration on all the T1s. -# -# We build a list of AsyncResult objects -linresults = [pool.apply_async(linear_registration, (t1, std)) - for t1 in t1s] - -# Then we wait for each job to finish, -# and replace its AsyncResult object -# with the actual result - an affine -# transformation matrix. start = time.time() -for i, r in enumerate(linresults): - linresults[i] = r.get() +with mp.Pool(processes=16) as pool: + + # We build a list of AsyncResult objects + linresults = [pool.apply_async(linear_registration, (t1, std)) + for t1 in t1s] + + # Then we wait for each job to finish, + # and replace its AsyncResult object + # with the actual result - an affine + # transformation matrix. + for i, r in enumerate(linresults): + linresults[i] = r.get() + end = time.time() print('Linear registrations completed in ' @@ -507,14 +527,16 @@ print('Linear registrations completed in ' # Run non-linear registration on all the T1s, # using the linear registrations to initialise. -nlinresults = [pool.apply_async(nonlinear_registration, (t1, std, aff)) - for (t1, aff) in zip(t1s, linresults)] - -# Wait for each non-linear reg to finish, -# and store the resulting warp field. start = time.time() -for i, r in enumerate(nlinresults): - nlinresults[i] = r.get() +with mp.Pool(processes=16) as pool: + nlinresults = [pool.apply_async(nonlinear_registration, (t1, std, aff)) + for (t1, aff) in zip(t1s, linresults)] + + # Wait for each non-linear reg to finish, + # and store the resulting warp field. + for i, r in enumerate(nlinresults): + nlinresults[i] = r.get() + end = time.time() print('Non-linear registrations completed in ' @@ -606,7 +628,9 @@ of memory usage as the task progresses: ``` -import time +import time +import multiprocessing as mp +import numpy as np memusage('before creating data') @@ -626,22 +650,24 @@ def process_chunk(offset): time.sleep(1) return data[offset:offset + nelems].sum() +# Generate an offset into the data for each job - +# we will call process_chunk for each offset +offsets = range(0, len(data), nelems) + # Create our worker process pool -pool = mp.Pool(4) +with mp.Pool(4) as pool: -# Generate an offset into the data for each -# job, and call process_chunk for each offset -offsets = range(0, len(data), nelems) -results = pool.map_async(process_chunk, offsets) + results = pool.map_async(process_chunk, offsets) -# Wait for all of the jobs to finish -elapsed = 0 -while not results.ready(): - memusage('after {} seconds'.format(elapsed)) - time.sleep(1) - elapsed += 1 + # Wait for all of the jobs to finish + elapsed = 0 + while not results.ready(): + memusage('after {} seconds'.format(elapsed)) + time.sleep(1) + elapsed += 1 + + results = results.get() -results = results.get() print('Total sum: ', sum(results)) print('Sanity check:', data.sum()) ``` @@ -662,7 +688,12 @@ data? Go back to the code block above and: > data[offset:offset + nelems] += 1 > ``` -2. Re-run the code block, and watch what happens to the memory usage. +2. Restart the Jupyter notebook kernel (*Kernel -> Restart*) - this example is + somewhat dependent on the behaviour of the Python garbage collector, so it + helps to start afresh + + +2. Re-run the two code blocks, and watch what happens to the memory usage. What happened? Well, you are seeing [copy-on-write](wiki-copy-on-write) in @@ -827,9 +858,8 @@ def process_dataset(data): # Create a pool of worker # processes and run the jobs. - pool = mp.Pool(processes=nprocs) - - pool.starmap(process_chunk, args) + with mp.Pool(processes=nprocs) as pool: + pool.starmap(process_chunk, args) return outdata ``` @@ -839,8 +869,7 @@ Now we can call our `process_data` function just like any other function: ``` -data = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64) - +indata = np.array(np.arange(64).reshape((4, 4, 4)), dtype=np.float64) outdata = process_dataset(data) print('Input') @@ -849,12 +878,3 @@ print(data) print('Output') print(outdata) ``` - - -## The `concurrent.futures` library - - -The `concurrent.futures` module provides a simpler alternative API to the -`multiprocessing` module - it focuses specifically on asynchronous execution -of tasks, so can be used instead of the `Pool.map_async` and -`Pool.apply_async` methods. -- GitLab