diff --git a/advanced_topics/07_threading.ipynb b/advanced_topics/07_threading.ipynb index 621921ea336826467a646f240384a1cda1e3b234..6f2b57dcea2a03c380781d03941baa64ee9ffad7 100644 --- a/advanced_topics/07_threading.ipynb +++ b/advanced_topics/07_threading.ipynb @@ -621,7 +621,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Sharing data between processes\n", + "## Sharing data between processes\n", "\n", "\n", "When you use the `Pool.map` method (or any of the other methods we have shown)\n", @@ -675,6 +675,9 @@ "[wiki-fork]: https://en.wikipedia.org/wiki/Fork_(system_call)\n", "\n", "\n", + "### Read-only sharing\n", + "\n", + "\n", "Let's see this in action with a simple example. We'll start by defining a\n", "little helper function which allows us to track the total memory usage, using\n", "the unix `free` command:" @@ -729,7 +732,7 @@ "# starting from the specified offset\n", "def process_chunk(offset):\n", " time.sleep(1)\n", - " return data[offset:offset + nelems].mean()\n", + " return data[offset:offset + nelems].sum()\n", "\n", "# Create our worker process pool\n", "pool = mp.Pool(4)\n", @@ -747,7 +750,8 @@ " elapsed += 1\n", "\n", "results = results.get()\n", - "print('Total sum:', sum(results))" + "print('Total sum: ', sum(results))\n", + "print('Sanity check:', data.sum())" ] }, { @@ -757,15 +761,35 @@ "You should be able to see that only one copy of `data` is created, and is\n", "shared by all of the worker processes without any copying taking place.\n", "\n", - "So if you only need read-only acess ...\n", + "So things are reasonably straightforward if you only need read-only acess to\n", + "your data. But what if your worker processes need to be able to modify the\n", + "data? Go back to the code block above and:\n", + "\n", + "1. Modify the `process_chunk` function so that it modifies every element of\n", + " its assigned portion of the data before calculating and returning the sum.\n", + " For example:\n", + "\n", + " > ```\n", + " > data[offset:offset + nelems] += 1\n", + " > ```\n", + "\n", + "2. Re-run the code block, and watch what happens to the memory usage.\n", "\n", - "But what if your worker processes need ...\n", "\n", - "Go back to the code block above and ...\n", + "What happened? Well, you are seeing [copy-on-write](wiki-copy-on-write) in\n", + "action. When the `process_chunk` is invoked, it is given a reference to the\n", + "original data array in the memory space of the parent process. But as soon as\n", + "an attempt is made to modify it, a copy of the data, in the memory space of\n", + "the child process, is created. The modifications are then applied to this\n", + "child process, and not to the original copy. So the total memory usage has\n", + "blown out to twice as much as before, and the changes made by each child\n", + "process are being lost!\n", "\n", "\n", + "[wiki-copy-on-write]: https://en.wikipedia.org/wiki/Copy-on-write\n", "\n", "\n", + "### Read/write sharing\n", "\n", "\n", "> If you have worked with a real programming language with true parallelism\n", @@ -776,27 +800,32 @@ "> coding in *Java* instead of Python. Ugh. I need to take a shower.\n", "\n", "\n", - "\n", - "\n", - "The `multiprocessing` module provides the [`Value`, `Array`, and `RawArray`\n", + "In order to truly share memory between multiple processes, the\n", + "`multiprocessing` module provides the [`Value`, `Array`, and `RawArray`\n", "classes](https://docs.python.org/3/library/multiprocessing.html#shared-ctypes-objects),\n", "which allow you to share individual values, or arrays of values, respectively.\n", "\n", "\n", "The `Array` and `RawArray` classes essentially wrap a typed pointer (from the\n", - "built-in [`ctypes`](https://docs.python.org/3/library/ctypes.html) module)\n", - "to a block of memory. We can use the `Array` or `RawArray` class to share a\n", - "Numpy array between our worker processes. The difference between an `Array`\n", - "and a `RawArray` is that the former offers synchronised (i.e. process-safe)\n", - "access to the shared memory. This is necessary if your child processes will be\n", - "modifying the same parts of your data.\n", + "built-in [`ctypes`](https://docs.python.org/3/library/ctypes.html) module) to\n", + "a block of memory. We can use the `Array` or `RawArray` class to share a Numpy\n", + "array between our worker processes. The difference between an `Array` and a\n", + "`RawArray` is that the former offers low-level synchronised\n", + "(i.e. process-safe) access to the shared memory. This is necessary if your\n", + "child processes will be modifying the same parts of your data.\n", + "\n", + "\n", + "> If you need fine-grained control over synchronising access to shared data by\n", + "> multiple processes, all of the [synchronisation\n", + "> primitives](https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes)\n", + "> from the `multiprocessing` module are at your disposal.\n", "\n", "\n", - "Due to the way that shared memory works, in order to share a Numpy array\n", - "between different processes you need to structure your code so that the\n", - "array(s) you want to share are accessible at the _module level_. Furthermore,\n", - "we need to make sure that our input and output arrays are located in shared\n", - "memory - we can do this via the `Array` or `RawArray`.\n", + "The requirements for sharing memory between processes still apply here - we\n", + "need to make our data accessible at the *module level*, and we need to create\n", + "our data before creating the `Pool`. And to achieve read and write capability,\n", + "we also need to make sure that our input and output arrays are located in\n", + "shared memory - we can do this via the `Array` or `RawArray`.\n", "\n", "\n", "As an example, let's say we want to parallelise processing of an image by\n", @@ -882,11 +911,18 @@ " # Make the input/output data\n", " # accessible to the process_chunk\n", " # function. This must be done\n", - " # *before* the worker pool is created.\n", + " # *before* the worker pool is\n", + " # created - even though we are\n", + " # doing things differently to the\n", + " # read-only example, we are still\n", + " # making the data arrays accessible\n", + " # at the *module* level, so the\n", + " # memory they are stored in can be\n", + " # shared with the child processes.\n", " process_chunk.input_data = sindata\n", " process_chunk.output_data = soutdata\n", "\n", - " # number of boxels to be computed\n", + " # number of voxels to be computed\n", " # by each worker process.\n", " nvox = int(data.size / nprocs)\n", "\n", @@ -905,12 +941,9 @@ " # process a list of indices, which\n", " # specify the data items which that\n", " # worker process needs to compute.\n", - " xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", - " [xs[nvox * nprocs:]]\n", - " ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", - " [ys[nvox * nprocs:]]\n", - " zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \\\n", - " [zs[nvox * nprocs:]]\n", + " xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [xs[nvox * nprocs:]]\n", + " ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [ys[nvox * nprocs:]]\n", + " zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [zs[nvox * nprocs:]]\n", "\n", " # Build the argument lists for\n", " # each worker process.\n", @@ -918,7 +951,7 @@ "\n", " # Create a pool of worker\n", " # processes and run the jobs.\n", - " pool = mp.Pool(processes=nprocs)\n", + " pool = mp.Pool(processes=nprocs)\n", "\n", " pool.starmap(process_chunk, args)\n", "\n", @@ -948,6 +981,19 @@ "print('Output')\n", "print(outdata)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The `concurrent.futures` library\n", + "\n", + "\n", + "The `concurrent.futures` module provides a simpler alternative API to the\n", + "`multiprocessing` module - it focuses specifically on asynchronous execution\n", + "of tasks, so can be used instead of the `Pool.map_async` and\n", + "`Pool.apply_async` methods." + ] } ], "metadata": {}, diff --git a/advanced_topics/07_threading.md b/advanced_topics/07_threading.md index a021b6d9652a14368d67dc5a03c58e78b9bafc4e..25cac532b66d08c9c2caab7e5600291a0c753a08 100644 --- a/advanced_topics/07_threading.md +++ b/advanced_topics/07_threading.md @@ -526,7 +526,7 @@ for t1, result in zip(t1s, nlinresults): ``` -### Sharing data between processes +## Sharing data between processes When you use the `Pool.map` method (or any of the other methods we have shown) @@ -580,6 +580,9 @@ any copying required. [wiki-fork]: https://en.wikipedia.org/wiki/Fork_(system_call) +### Read-only sharing + + Let's see this in action with a simple example. We'll start by defining a little helper function which allows us to track the total memory usage, using the unix `free` command: @@ -621,7 +624,7 @@ memusage('after creating data') # starting from the specified offset def process_chunk(offset): time.sleep(1) - return data[offset:offset + nelems].mean() + return data[offset:offset + nelems].sum() # Create our worker process pool pool = mp.Pool(4) @@ -639,23 +642,44 @@ while not results.ready(): elapsed += 1 results = results.get() -print('Total sum:', sum(results)) +print('Total sum: ', sum(results)) +print('Sanity check:', data.sum()) ``` You should be able to see that only one copy of `data` is created, and is shared by all of the worker processes without any copying taking place. -So if you only need read-only acess ... +So things are reasonably straightforward if you only need read-only acess to +your data. But what if your worker processes need to be able to modify the +data? Go back to the code block above and: + +1. Modify the `process_chunk` function so that it modifies every element of + its assigned portion of the data before calculating and returning the sum. + For example: + + > ``` + > data[offset:offset + nelems] += 1 + > ``` -But what if your worker processes need ... +2. Re-run the code block, and watch what happens to the memory usage. -Go back to the code block above and ... +What happened? Well, you are seeing [copy-on-write](wiki-copy-on-write) in +action. When the `process_chunk` is invoked, it is given a reference to the +original data array in the memory space of the parent process. But as soon as +an attempt is made to modify it, a copy of the data, in the memory space of +the child process, is created. The modifications are then applied to this +child process, and not to the original copy. So the total memory usage has +blown out to twice as much as before, and the changes made by each child +process are being lost! +[wiki-copy-on-write]: https://en.wikipedia.org/wiki/Copy-on-write +### Read/write sharing + > If you have worked with a real programming language with true parallelism > and shared memory via within-process multi-threading, feel free to take a @@ -665,27 +689,32 @@ Go back to the code block above and ... > coding in *Java* instead of Python. Ugh. I need to take a shower. - - -The `multiprocessing` module provides the [`Value`, `Array`, and `RawArray` +In order to truly share memory between multiple processes, the +`multiprocessing` module provides the [`Value`, `Array`, and `RawArray` classes](https://docs.python.org/3/library/multiprocessing.html#shared-ctypes-objects), which allow you to share individual values, or arrays of values, respectively. The `Array` and `RawArray` classes essentially wrap a typed pointer (from the -built-in [`ctypes`](https://docs.python.org/3/library/ctypes.html) module) -to a block of memory. We can use the `Array` or `RawArray` class to share a -Numpy array between our worker processes. The difference between an `Array` -and a `RawArray` is that the former offers synchronised (i.e. process-safe) -access to the shared memory. This is necessary if your child processes will be -modifying the same parts of your data. +built-in [`ctypes`](https://docs.python.org/3/library/ctypes.html) module) to +a block of memory. We can use the `Array` or `RawArray` class to share a Numpy +array between our worker processes. The difference between an `Array` and a +`RawArray` is that the former offers low-level synchronised +(i.e. process-safe) access to the shared memory. This is necessary if your +child processes will be modifying the same parts of your data. -Due to the way that shared memory works, in order to share a Numpy array -between different processes you need to structure your code so that the -array(s) you want to share are accessible at the _module level_. Furthermore, -we need to make sure that our input and output arrays are located in shared -memory - we can do this via the `Array` or `RawArray`. +> If you need fine-grained control over synchronising access to shared data by +> multiple processes, all of the [synchronisation +> primitives](https://docs.python.org/3/library/multiprocessing.html#synchronization-between-processes) +> from the `multiprocessing` module are at your disposal. + + +The requirements for sharing memory between processes still apply here - we +need to make our data accessible at the *module level*, and we need to create +our data before creating the `Pool`. And to achieve read and write capability, +we also need to make sure that our input and output arrays are located in +shared memory - we can do this via the `Array` or `RawArray`. As an example, let's say we want to parallelise processing of an image by @@ -758,11 +787,18 @@ def process_dataset(data): # Make the input/output data # accessible to the process_chunk # function. This must be done - # *before* the worker pool is created. + # *before* the worker pool is + # created - even though we are + # doing things differently to the + # read-only example, we are still + # making the data arrays accessible + # at the *module* level, so the + # memory they are stored in can be + # shared with the child processes. process_chunk.input_data = sindata process_chunk.output_data = soutdata - # number of boxels to be computed + # number of voxels to be computed # by each worker process. nvox = int(data.size / nprocs) @@ -781,12 +817,9 @@ def process_dataset(data): # process a list of indices, which # specify the data items which that # worker process needs to compute. - xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ - [xs[nvox * nprocs:]] - ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ - [ys[nvox * nprocs:]] - zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + \ - [zs[nvox * nprocs:]] + xs = [xs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [xs[nvox * nprocs:]] + ys = [ys[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [ys[nvox * nprocs:]] + zs = [zs[nvox * i:nvox * i + nvox] for i in range(nprocs)] + [zs[nvox * nprocs:]] # Build the argument lists for # each worker process. @@ -794,7 +827,7 @@ def process_dataset(data): # Create a pool of worker # processes and run the jobs. - pool = mp.Pool(processes=nprocs) + pool = mp.Pool(processes=nprocs) pool.starmap(process_chunk, args) @@ -816,3 +849,12 @@ print(data) print('Output') print(outdata) ``` + + +## The `concurrent.futures` library + + +The `concurrent.futures` module provides a simpler alternative API to the +`multiprocessing` module - it focuses specifically on asynchronous execution +of tasks, so can be used instead of the `Pool.map_async` and +`Pool.apply_async` methods.