The easy way to concurrency and parallelism with Python stdlib
Because life doesn't have to be hard all the time
Summary
Concurrency and parallelism are hard, plus Python could do better in that regard.
Yet there are quite a few problems that are not falling in that category: scraping the web, avoiding to block a UI, zipping many files…
For those, Python actually comes with pretty decent tools: the pool executors.
You can distribute work to a bunch of process workers or thread workers with a few lines of code:
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=5) as executor:
executor.submit(do_something_blockint)
This simple approach only covers a limited use cases, but for those, it works surprisingly well.
What's more, those use cases are the ones you are most like to have to solve by yourselves, as the other ones often have infra or libs solutions already.
The 10 types of people
The material about concurrency and parallelism is generally written about two types of people: beginners that have no idea what they are talking about and hyper specialized people.
The second camp will tell you about the very hard problem they solved with it, like processing a 64 GB vector matrix with 21 workers in 2us, or creating a server that handles millions of connections so that people can share their cat picture 300ms faster.
The first one will just repeat what they read without thinking, usually applying what the second one said but bringing their conclusion to their level of problems.
This can give you the impression that concurrency and parallelism need to be complicated with statements like:
sharing resources is hard;
Python is slow;
the GIL neuters threads;
process intercommunication is costly;
etc.
While all this is true, it doesn't paint the whole picture:
Python is really good at delegating tasks to the infrastructure like your DB, your cache store, your reverse proxy, etc. They will do the concurrency and paraelelism.
There are a lot of solved problems in many spaces, at least at the scale you are likely going to encounter. E.G: WSGI servers will pop several processes to deal with concurrent requests while long tasks will be sent to a queue. This works very decently for the vast majority of web projects.
While we can't solve P = NP in theory, IRL we are usually happy to just give a partial solution to it and call it a day. Or a product. It's the same for concurrency and parallelism: simplify the problem or the requirements, and you may very well make your client happy with a basic solution.
But above all, many day-to-day problems are just... not that complicated.
And it's likely that you can solve them efficiently with what Python provides for threading or multiprocessing.
Of course, you can argue that dealing with threads and processes in itself is hard, and again, you would be right.
But the Python standard library comes with a beautiful abstraction for them I see too few people use: the pool executors.
So this article is about them, and how adding them to your toolbox will make a whole class of tasks solvable with little efforts.
How to use the pool executors
If you read Asyncio, twisted, tornado, gevent walk into a bar, you may remember this little script that makes a GET request to a bunch of URLs, loan the page and print the title:
import re
import time
from urllib.request import Request, urlopen
URLs = [
"https://www.bitecode.dev/p/relieving-your-python-packaging-pain",
"https://www.bitecode.dev/p/hype-cycles",
"https://www.bitecode.dev/p/why-not-tell-people-to-simply-use",
"https://www.bitecode.dev/p/nobody-ever-paid-me-for-code",
"https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager",
"https://www.bitecode.dev/p/the-costly-mistake-so-many-makes",
"https://www.bitecode.dev/p/the-weirdest-python-keyword",
]
title_pattern = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE)
user_agent = (
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0"
)
def fetch_url(url):
start_time = time.time()
headers = {"User-Agent": user_agent}
with urlopen(Request(url, headers=headers)) as response:
html_content = response.read().decode("utf-8")
match = title_pattern.search(html_content)
title = match.group(1) if match else "Unknown"
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken ({url}): {elapsed_time:.4f} seconds\n", end="")
return title
def main():
for url in URLs:
title = fetch_url(url)
print(f"URL: {url}\nTitle: {title}", flush=True)
if __name__ == "__main__":
# Let's time how long all this takes
global_start_time = time.time()
main()
global_elapsed_time = time.time() - global_start_time
print(f"Total time taken for all URLs: {global_elapsed_time:.4f} seconds")
Because it's blocking on every request, waiting for the network to answer before starting the next requests, the global execution time is the sum of the time each request takes:
Time taken (https://www.bitecode.dev/p/relieving-your-python-packaging-pain): 0.1869 seconds
URL: https://www.bitecode.dev/p/relieving-your-python-packaging-pain
Title: Relieving your Python packaging pain - Bite code!
Time taken (https://www.bitecode.dev/p/hype-cycles): 0.1888 seconds
URL: https://www.bitecode.dev/p/hype-cycles
Title: XML is the future - Bite code!
Time taken (https://www.bitecode.dev/p/why-not-tell-people-to-simply-use): 0.1875 seconds
URL: https://www.bitecode.dev/p/why-not-tell-people-to-simply-use
Title: Why not tell people to "simply" use pyenv, poetry or anaconda
Time taken (https://www.bitecode.dev/p/nobody-ever-paid-me-for-code): 0.2070 seconds
URL: https://www.bitecode.dev/p/nobody-ever-paid-me-for-code
Title: Nobody ever paid me for code - Bite code!
Time taken (https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager): 0.6739 seconds
URL: https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager
Title: Python cocktail: mix a context manager and an iterator in equal parts
Time taken (https://www.bitecode.dev/p/the-costly-mistake-so-many-makes): 0.3111 seconds
URL: https://www.bitecode.dev/p/the-costly-mistake-so-many-makes
Title: The costly mistake so many make with numpy and pandas
Time taken (https://www.bitecode.dev/p/the-weirdest-python-keyword): 0.3057 seconds
URL: https://www.bitecode.dev/p/the-weirdest-python-keyword
Title: The weirdest Python keyword - Bite code!
Total time taken for all URLs: 2.0623 seconds
While we have shown you can get good perfs on this problem using asyncio or gevent, the simplest tool for the job with decent performances would be a ThreadPoolExecutor.
The code is mostly the same. One import is added at the top:
from concurrent.futures import ThreadPoolExecutor, as_completed
Then only the main()
function changes:
def main():
with ThreadPoolExecutor(max_workers=len(URLs)) as executor:
tasks = {}
for url in URLs:
future = executor.submit(fetch_url, url)
tasks[future] = url
for future in as_completed(tasks):
title = future.result()
url = tasks[future]
print(f"URL: {url}\nTitle: {title}")
Let's get into the details on how it works:
with ThreadPoolExecutor(max_workers=len(URLs)) as executor:
This creates the pool executors with queues workers:
The pool is an object that will start, hold and manage x threads transparently for you. Here, the number of threads is equal to the number of URLs because that's how many requests we want to be in parallel.
It will create, manage and hold queues for each thread to send work to them, and get results from them.
When the context manager exits, it waits for the threads to finish all tasks, and close them.
Then:
tasks = {}
for url in URLs:
future = executor.submit(fetch_url, url)
tasks[future] = url
This submits the function fetch_url
to the executor so that it will put it in a worker queue of tasks to executes. Note we don't do fetch_url(url)
, as we don't want to call the function ourselves. We pass independently the function and the argument.
This submission returns a Future
object to us. Futures are holding a reference to the task you just submitted, and will let you get the result later on when it's ready.
Because the task as asynchronous, we don't know in which order they will finish, so we make a dict of the futures and the URL they are working on for printing the result later on.
This loops blocks very little.
Finally:
for future in as_completed(tasks):
title = future.result()
url = tasks[future]
print(f"URL: {url}\nTitle: {title}")
We use as_complete()
to loop on the futures (remember looping on a dict gives you the keys) as the task they refer to is finished. This loop is blocking until all tasks are completed.
future.result()
will get us the result that fetch_url
returned or raise the exception that fetch_url
encountered during execution.
We use our dictionary to find out which future maps to which URL, since they arrive in the order they finish, not the order we put them in.
The result is much faster:
Time taken (https://www.bitecode.dev/p/nobody-ever-paid-me-for-code): 0.2688 seconds
URL: https://www.bitecode.dev/p/nobody-ever-paid-me-for-code
Title: Nobody ever paid me for code - Bite code!
Time taken (https://www.bitecode.dev/p/relieving-your-python-packaging-pain): 0.2901 seconds
Time taken (https://www.bitecode.dev/p/the-weirdest-python-keyword): 0.2881 seconds
URL: https://www.bitecode.dev/p/relieving-your-python-packaging-pain
Title: Relieving your Python packaging pain - Bite code!
URL: https://www.bitecode.dev/p/the-weirdest-python-keyword
Title: The weirdest Python keyword - Bite code!
Time taken (https://www.bitecode.dev/p/the-costly-mistake-so-many-makes): 0.3892 seconds
URL: https://www.bitecode.dev/p/the-costly-mistake-so-many-makes
Title: The costly mistake so many make with numpy and pandas
Time taken (https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager): 0.4014 seconds
URL: https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager
Title: Python cocktail: mix a context manager and an iterator in equal parts
Time taken (https://www.bitecode.dev/p/why-not-tell-people-to-simply-use): 0.4482 seconds
URL: https://www.bitecode.dev/p/why-not-tell-people-to-simply-use
Title: Why not tell people to "simply" use pyenv, poetry or anaconda
Time taken (https://www.bitecode.dev/p/hype-cycles): 0.4617 seconds
URL: https://www.bitecode.dev/p/hype-cycles
Title: XML is the future - Bite code!
Total time taken for all URLs: 0.4644 seconds
When a request is waiting on the network, another thread is executing.
The whole script:
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import Request, urlopen
URLs = [
"https://www.bitecode.dev/p/relieving-your-python-packaging-pain",
"https://www.bitecode.dev/p/hype-cycles",
"https://www.bitecode.dev/p/why-not-tell-people-to-simply-use",
"https://www.bitecode.dev/p/nobody-ever-paid-me-for-code",
"https://www.bitecode.dev/p/python-cocktail-mix-a-context-manager",
"https://www.bitecode.dev/p/the-costly-mistake-so-many-makes",
"https://www.bitecode.dev/p/the-weirdest-python-keyword",
]
title_pattern = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE)
# We'll pretend to be Firefox or substack is going to kick us
user_agent = (
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/116.0"
)
def fetch_url(url):
start_time = time.time()
headers = {"User-Agent": user_agent}
with urlopen(Request(url, headers=headers)) as response:
html_content = response.read().decode("utf-8")
match = title_pattern.search(html_content)
title = match.group(1) if match else "Unknown"
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken ({url}): {elapsed_time:.4f} seconds\n", end="")
return title
def main():
with ThreadPoolExecutor(max_workers=len(URLs)) as executor:
tasks = {}
for url in URLs:
future = executor.submit(fetch_url, url)
tasks[future] = url
for future in as_completed(tasks):
title = future.result()
url = tasks[future]
print(f"URL: {url}\nTitle: {title}")
if __name__ == "__main__":
global_start_time = time.time()
main()
global_elapsed_time = time.time() - global_start_time
print(f"Total time taken for all URLs: {global_elapsed_time:.4f} seconds")
What would a version with multiprocessing look like? Pretty much the same, but, we use ProcessPoolExecutor
instead.
So we change the import:
from concurrent.futures import ProcessPoolExecutor, as_completed
And the call:
with ProcessPoolExecutor(max_workers=5) as executor:
Note that here the number of workers maps to the number of CPU cores I want to dedicate to the program. Processes are way more expensive than threads, as each starts a new Python instance.
What are the pool executors good for?
Using separate processes or GIL locked threads have a limited set of good use cases, and using a pool executor on top of that shrinks this group to an even smaller niche.
Thread pools are good for:
Tasks (network, file, etc.) that needs less than 10_000 I/O interactions per second. The number is higher than you would expect, because threads are surprisingly cheap nowadays, and you can spawn a lot of them without bloating memory too much. The limit is more the price of context switching. This is not a scientific number, it's a general direction that you should challenge by measuring your own particular case.
When you need to share data between the tasks.
When you are not CPU bound.
When you are OK to execute tasks a bit slower to you ensure you are not blocking any of them (E.G: user UI and a long calculation).
When you are CPU bound, but the CPU calculations are delegating to a C extension that releases the GIL, such as numpy. Free parallelism on the cheap, yeah!
E.G: a web scraper, a GUI to zip files, a development server, sending emails without blocking web page rendering, etc.
Process pools are good for:
When you don't need to share data between tasks.
When you are CPU bound.
When you don't have too many tasks to run at the same time.
When you need true parallelism and want to exercise your juicy cores.
Both are bad if you need to cancel tasks, collaborate a lot between tasks, deal precisely with the task lifecycle, needs a huge number of workers or want to milk out every single bit of perfs. You won’t get nowhere near Rust level of speed.
It’s still very useful, though.
Tips
if __name__ == "__main__"
is important for multiprocessing because it will spawn a new Python, that will import the module. You don't want this module to spawn a new Python that imports the module that will spawn a new Python...If the function to submit to the executor has complicated arguments to be passed to it, use a
lambda
orfunctools.partial
.max_worker = 1
is a very nice way to get a poor man’s task queue.
How would you implement rate limiting in this? It would kind of involve shared state between all threads, no? Or can you somehow apply a limit to the executor?