{
  "$type": "site.standard.document",
  "canonicalUrl": "https://rednafi.com/python/concurrent-futures/",
  "description": "Master Python's concurrent.futures module for easy threading and multiprocessing. Learn ThreadPoolExecutor and ProcessPoolExecutor with examples.",
  "path": "/python/concurrent-futures/",
  "publishedAt": "2020-04-21T00:00:00.000Z",
  "site": "at://did:plc:fgtm2c26vfcj74rfmeggbyqj/site.standard.publication/3mnl6f7ob462z",
  "tags": [
    "Python",
    "Concurrency"
  ],
  "textContent": "Writing concurrent code in Python can be tricky. Before you even start, you have to worry\nabout all this icky stuff like whether the task at hand is I/O or CPU bound or whether\nputting the extra effort to achieve concurrency is even going to give you the boost you\nneed. Also, the presence of Global Interpreter Lock, [GIL] foists further limitations on\nwriting truly concurrent code. But for the sake of sanity, you can oversimplify it like this\nwithout being blatantly incorrect:\n\n> In Python, if the task at hand is I/O bound, you can use the standard library's\n> threading module or if the task is CPU bound then multiprocessing module can be your\n> friend. These APIs give you a lot of control and flexibility but they come at the cost of\n> having to write relatively low-level verbose code that adds extra layers of complexity on\n> top of your core logic. Sometimes when the target task is complicated, it's often\n> impossible to avoid complexity while adding concurrency. However, a lot of simpler tasks\n> can be made concurrent without adding too much verbosity.\n\nPython standard library also houses a module called the concurrent.futures. This module\nwas added in Python 3.2 for providing the developers a high-level interface to launch\nasynchronous tasks. It's a generalized abstraction layer on top of threading and\nmultiprocessing modules for providing an interface to run tasks concurrently using pools\nof threads or processes. It's the perfect tool when you just want to run a piece of eligible\ncode concurrently and don't need the added modularity that the threading and\nmultiprocessing APIs expose.\n\nAnatomy of concurrent.futures\n\nFrom the official docs,\n\n> The concurrent.futures module provides a high-level interface for asynchronously executing\n> callables.\n\nWhat it means is you can run your subroutines asynchronously using either threads or\nprocesses through a common high-level interface. Basically, the module provides an abstract\nclass called Executor. You can't instantiate it directly, rather you need to use one of\ntwo subclasses that it provides to run your tasks.\n\nInternally, these two classes interact with the pools and manage the workers. Futures are\nused for managing results computed by the workers. To use a pool of workers, an application\ncreates an instance of the appropriate executor class and then submits them for it to run.\nWhen each task is started, a Future instance is returned. When the result of the task is\nneeded, an application can use the Future object to block until the result is available.\nVarious APIs are provided to make it convenient to wait for tasks to complete, so that the\nFuture objects don't need to be managed directly.\n\nExecutor objects\n\nSince both ThreadPoolExecutor and ProcessPoolExecutor have the same API interface, in\nboth cases I'll primarily talk about two methods that they provide. Their descriptions have\nbeen collected from the official docs verbatim.\n\nsubmit(fn, \\args, \\\\kwargs)\n\nSchedules the callable, fn, to be executed as fn(args *kwargs) and returns a Future\nobject representing the execution of the callable.\n\nmap(func, \\iterables, timeout=None, chunksize=1)\n\nSimilar to map(func, iterables) except:\n\n- the iterables are collected immediately rather than lazily;\n- func is executed asynchronously and several calls to func may be made concurrently.\n\n    The returned iterator raises a concurrent.futures.TimeoutError if __next__() is\n    called and the result isn't available after timeout seconds from the original call to\n    Executor.map(). Timeout can be an int or a float. If timeout is not specified or\n    None, there is no limit to the wait time.\n\n    If a func call raises an exception, then that exception will be raised when its value is\n    retrieved from the iterator.\n\n    When using ProcessPoolExecutor, this method chops iterables into a number of chunks\n    which it submits to the pool as separate tasks. The (approximate) size of these chunks\n    can be specified by setting chunksize to a positive integer. For very long iterables,\n    using a large value for chunksize can significantly improve performance compared to\n    the default size of 1. With ThreadPoolExecutor, chunksize has no effect.\n\nGeneric workflows for running tasks concurrently\n\nA lot of my scripts contains some variants of the following:\n\nHere, get_tasks returns an iterable that contains the target tasks or arguments on which a\nparticular task function needs to applied. Tasks are usually blocking callables and they run\none after another, with only one task running at a time. The logic is simple to reason with\nbecause of its sequential execution flow. This is fine when the number of tasks is small or\nthe execution time requirement and complexity of the individual tasks is low. However, this\ncan quickly get out of hands when the number of tasks is huge or the individual tasks are\ntime consuming.\n\nA general rule of thumb is using ThreadPoolExecutor when the tasks are primarily I/O\nbound, like - sending multiple http requests to many urls, saving a large number of files to\ndisk etc. ProcessPoolExecutor should be used in tasks that are primarily CPU bound, like -\nrunning callables that are computation heavy, applying pre-process methods over a large\nnumber of images, manipulating many text files at once etc.\n\nRunning tasks with executor.submit\n\nWhen you have a number of tasks, you can schedule them in one go and wait for them all to\ncomplete and then you can collect the results.\n\nHere you start by creating an Executor, which manages all the tasks that are running -\neither in separate processes or threads. Using the with statement creates a context manager,\nwhich ensures any stray threads or processes get cleaned up via calling the\nexecutor.shutdown() method implicitly when you're done.\n\nIn real code, you'd would need to replace the Executor with ThreadPoolExecutor or a\nProcessPoolExecutor depending on the nature of the callables. Then a set comprehension has\nbeen used here to start all the tasks. The executor.submit() method schedules each task.\nThis creates a Future object, which represents the task to be done. Once all the tasks have\nbeen scheduled, the method concurrent.futures_as_completed() is called, which yields the\nfutures as they're done – that is, as each task completes. The fut.result() method gives\nyou the return value of perform(task), or throws an exception in case of failure.\n\nThe executor.submit() method schedules the tasks asynchronously and doesn't hold any\ncontexts regarding the original tasks. So if you want to map the results with the original\ntasks, you need to track those yourself.\n\nNotice the variable futures where the original tasks are mapped with their corresponding\nfutures using a dictionary.\n\nRunning tasks with executor.map\n\nAnother way the results can be collected in the same order they're scheduled is via using\nexecutor.map() method.\n\nNotice how the map function takes the entire iterable at once. It spits out the results\nimmediately rather than lazily and in the same order they're scheduled. If any unhandled\nexception occurs during the operation, it'll also be raised immediately and the execution\nwon't go any further.\n\nIn Python 3.5+, executor.map() receives an optional argument: chunksize. While using\nProcessPoolExecutor, for very long iterables, using a large value for chunksize can\nsignificantly improve performance compared to the default size of 1. With\nThreadPoolExecutor, chunksize has no effect.\n\nA few real world examples\n\nBefore proceeding with the examples, let's write a small decorator that'll be helpful to\nmeasure and compare the execution time between concurrent and sequential code.\n\nThe decorator can be used like this:\n\nThis will print out the name of the method and how long it took to execute it.\n\nDownload & save files from URLs with multi-threading\n\nFirst, let's download some pdf files from a bunch of URLs and save them to the disk. This is\npresumably an I/O bound task and we'll be using the ThreadPoolExecutor class to carry out\nthe operation. But before that, let's do this sequentially first.\n\nIn the above code snippet, I have primary defined two functions. The download_one function\ndownloads a pdf file from a given URL and saves it to the disk. It checks whether the file\nin URL has an extension and in the absence of an extension, it raises RunTimeError. If an\nextension is found in the file name, it downloads the file chunk by chunk and saves to the\ndisk. The second function download_all just iterates through a sequence of URLs and\napplies the download_one function on each of them. The sequential code takes about 22.8\nseconds to run. Now let's see how our threaded version of the same code performs.\n\nThe concurrent version of the code takes only about 1/4 th the time of it's sequential\ncounterpart. Notice in this concurrent version, the download_one function is the same as\nbefore but in the download_all function, a ThreadPoolExecutor context manager wraps the\nexecutor.map() method. The download_one function is passed into the map along with the\niterable containing the URLs. The timeout parameter determines how long a thread will\nspend before giving up on a single task in the pipeline. The max_workers means how many\nworker you want to deploy to spawn and manage the threads. A general rule of thumb is using\n2  multiprocessing.cpu_count() + 1. My machine has 6 physical cores with 12 threads. So\n13 is the value I chose.\n\n> Note: You can also try running the above functions with ProcessPoolExecutor via the same\n> interface and notice that the threaded version performs slightly better than due to the\n> nature of the task.\n\nThere is one small problem with the example above. The executor.map() method returns a\ngenerator which allows to iterate through the results once ready. That means if any error\noccurs inside map, it's not possible to handle that and resume the generator after the\nexception occurs. From [PEP-255]:\n\n> If an unhandled exception-- including, but not limited to, StopIteration --is raised by,\n> or passes through, a generator function, then the exception is passed on to the caller in\n> the usual way, and subsequent attempts to resume the generator function raise\n> StopIteration. In other words, an unhandled exception terminates a generator's useful\n> life.\n\nTo get around that, you can use the executor.submit() method to create futures,\naccumulated the futures in a list, iterate through the futures and handle the exceptions\nmanually. See the following example:\n\nThe above snippet should print out similar messages as before.\n\nRunning multiple CPU bound subroutines with multi-processing\n\nThe following example shows a CPU bound hashing function. The primary function will\nsequentially run a compute intensive hash algorithm multiple times. Then another function\nwill again run the primary function multiple times. Let's run the function sequentially\nfirst.\n\nIf you analyze the hash_one and hash_all functions, you can see that together, they are\nactually running two compute intensive nested for loops. The above code takes roughly 18\nseconds to run in sequential mode. Now let's run it parallelly using ProcessPoolExecutor.\n\nIf you look closely, even in the concurrent version, the for loop in hash_one function\nis running sequentially. However, the other for loop in the hash_all function is being\nexecuted through multiple processes. Here, I have used 10 workers and a chunksize of 2. The\nnumber of workers and chunksize were adjusted to achieve maximum performance. As you can see\nthe concurrent version of the above CPU intensive operation is about 11 times faster than\nits sequential counterpart.\n\nAvoiding concurrency pitfalls\n\nSince the concurrent.futures provides such a simple API, you might be tempted to apply\nconcurrency to every simple tasks at hand. However, that's not a good idea. First, the\nsimplicity has its fair share of constraints. In this way, you can apply concurrency only to\nthe simplest of the tasks, usually mapping a function to an iterable or running a few\nsubroutines simultaneously. If your task at hand requires queuing, spawning multiple threads\nfrom multiple processes then you will still need to resort to the lower level threading\nand multiprocessing modules.\n\nAnother pitfall of using concurrency is deadlock situations that might occur while using\nThreadPoolExecutor. When a callable associated with a Future waits on the results of\nanother Future, they might never release their control of the threads and cause deadlock.\nLet's see a slightly modified example from the official docs.\n\nIn the above example, function wait_on_b depends on the result (result of the Future\nobject) of function wait_on_a and at the same time the later function's result depends on\nthat of the former function. So the code block in the context manager will never execute due\nto having inter dependencies. This creates the deadlock situation. Let's explain another\ndeadlock situation from the official docs.\n\nThe above situation usually happens when a subroutine produces nested Future object and\nruns on a single thread. In the function wait_on_future, the executor.submit(pow, 5, 2)\ncreates another Future object. Since I'm running the entire thing using a single thread,\nthe internal future object is blocking the thread and the external executor.submit()\nmethod inside the context manager can not use any threads. This situation can be avoided\nusing multiple threads but in general, this is a bad design itself.\n\nThen there're situations when you might be getting lower performance with concurrent code\nthan its sequential counterpart. This could happen for multiple reasons:\n\n1. Threads were used to perform CPU bound tasks.\n2. Multiprocessing were used to perform I/O bound tasks.\n3. The tasks were too trivial to justify using either threads or multiple processes.\n\nSpawning and squashing multiple threads or processes bring extra overheads. Usually threads\nare much faster than processes to spawn and squash. However, using the wrong type of\nconcurrency can actually slow down your code rather than making it any performant. Below is\na trivial example where both ThreadPoolExecutor and ProcessPoolExecutor perform worse\nthan their sequential counterpart.\n\nThe above examples verifies whether a number in a list is prime or not. We ran the function\non 1000 numbers to determine if they're prime or not. The sequential version took roughly\n67ms to do that. However, look below where the threaded version of the same code takes more\nthan double the time (140ms) to so the same task.\n\nThe multiprocessing version of the same code is even slower. The tasks doesn't justify\nopening so many processes.\n\nAlthough intuitively, it may seem like the task of checking prime numbers should be a CPU\nbound operation, it's also important to determine if the task itself is computationally\nheavy enough to justify spawning multiple threads or processes. Otherwise, you might end up\nwith complicated code that performs worse than the naive solution.\n\nFurther reading\n\n- [concurrent.futures - official docs]\n- [Easy concurrency in Python]\n- [Adventures in Python with concurrent.futures]\n\n\n\n\n[gil]:\n    https://wiki.python.org/moin/GlobalInterpreterLock\n\n[pep-255]:\n    https://www.python.org/dev/peps/pep-0255/#specification-generators-and-exception-propagation\n\n[concurrent.futures - official docs]:\n    https://docs.python.org/3/library/concurrent.futures.html\n\n[easy concurrency in python]:\n    http://pljung.de/posts/easy-concurrency-in-python/\n\n[adventures in python with concurrent.futures]:\n    https://alexwlchan.net/2019/10/adventures-with-concurrent-futures/",
  "title": "Effortless concurrency with Python's concurrent.futures"
}