I have used
tqdm
library a lot. I find it very useful to monitor execution of expensive operations, and trivial to setup in for
loops. But, what happen if we want to parallelize our workloads? It becomes very unintuitive to set up, but, as I wanted to share today, is not that easy. In fact, this will help us understand better how parallelism works.
Setup Example
I am going to use several tools to showcase this. If you want to follow along this is the requirements.txt file
tqdm==4.64.1 numpy==1.24.1 pandas==1.5.3 dask[dataframe,distributed]==2023.1.1. # We will use this later
The Base Case
Let’s crate our use case first
Let’s say that we have a DataFrame of groups, names, and scores. We want to get maximun name and score for each group. Let’s generate our data and our business logic function.
import string import random import pandas as pd import numpy as np np.random.seed(42) def generate_random_data(n: int) -> pd.DataFrame: return pd.DataFrame( { "group": np.random.choice(["a", "b", "c"], n), "name": ["".join(random.choices(string.ascii_uppercase, k=5)) for _ in range(n)], "score": np.random.normal(size=n), } ) def get_name_max_score(df): max_score_index = df.score.idxmax() return df.loc[max_score_index, ["name", "score"]].to_dict()
We can apply the
get_name_max_score
to the whole DataFrame or to a group.>>> print(get_name_max_score(generate_random_data(100))) group name weight y_pred y_true 0 SL YMBBE 1.0 2.315956 0.044806 1 SU JTFOJ 2.0 -1.545811 -1.232602 2 VC AWTDC 0.1 1.170951 -0.591009 3 OO GLBOG 2.0 0.622591 -1.514391 4 CF TAHCK 2.0 1.696233 0.013500 >>> print(get_name_max_score(df)) {'name': 'MDRQN', 'error': 13.827179631775369}
To apply it to all the groups, we want to accumulate that into a list
>>> results = [] >>> for g, df_g in df.groupby("group"): ... results.append(get_name_max_score(df_g)) >>> print(len(results)) 676 >>> print(results[0]) {'name': 'OCWGY', 'error': 9.567390469558628}
This computation can take some time, so let’s add the progress bar
>>> from tqdm.auto import tqdm >>> for g, df_g in tqdm(df.groupby("group")): ... results.append(get_name_max_score(df_g)) 100%|██████████| 676/676 [00:01<00:00, 377.20it/s] >>> print(len(results)) 676 >>> print(results[0]) {'name': 'OCWGY', 'error': 9.567390469558628}
Going Parallel
We are going to use the native python library
concurrent.futures
to parallelize this operations. We need to define a pool of executors for our operations and launch the computation in parallel distributing the workload between our executors. This is quite easy to do with the executor.map
method>>> from concurrent.futures import ThreadPoolExecutor >>> with ThreadPoolExecutor() as executor: ... params = [df_g for g, df_g in df.groupby("group")] ... parallel_results = executor.map(get_name_max_error, params) >>> print(parallel_results) <generator object Executor.map.<locals>.result_iterator at 0x17706d7e0>
As you can see the method is returning a generator object. We can iterate over it or cast it to a
list
>>> results = list(parallel_results) >>> print(len(results)) 676 >>> print(results[0]) {'name': 'OCWGY', 'error': 9.567390469558628}
The first result still the same. Can we guarantee that now in a parallel execution?
Adding tqdm
for Parallel Work
The challenge here becomes “where can I monitor the completion of my tasks?”. In the sequential case it was clear. Now we have parallelize our workload between our workers. Our tasks can finish in different moments and simultaneously. Let’s explore our options:
tqdm
on the generator
The
parallel_results
object is a generator that we can cast to a list. This means that we wrap it with tqdm
>>> with ThreadPoolExecutor() as executor: ... params = [df_g for g, df_g in df.groupby("group")] ... results = list( ... tqdm( ... executor.map(get_name_max_error, params), ... total=len(params), ... ) ... ) ... 100%|██████████| 676/676 [00:00<00:00, 771.81it/s]
That looks like it worked. We have also increased the number of iteration per second from 377 to 770. This means that the parallel execution is doing something.
For most purposes this is good enough. However, there is an important catch here.
The generator is returning the results in the original order. However, we might not need that. In fact, we might want to get the a result record as soon as possible.
For
tqdm
and monitoring perspective, this means that this is not providing an accurate representation of the execution speed. Some tasks might be finished sooner but we don’t know until we arrive at that task with the generator.futures
+ as_completed
+ tqdm
The concept of future is a bit weird for python users. It represents a result that is not guaranteed to be be completed at the moment. The generator items that the
executor.map
returned is a type of future.We can access and consult this futures directly. We need to change our implementation to use
executor.
submit
which will just launch one computation instead of multiple ones. We can store the resulted future into a list but, how do we know when they are completed?We will use the
as_completed
method. This method will return a generator that will return the results as soon as they are completed. Here we can use tqdm
again to monitor the execution progress.In the end, the setup is very similar to the previous one. What has changed?
>>> from concurrent.futures import Future, ThreadPoolExecutor, as_completed >>> with ThreadPoolExecutor() as executor: ... futures = [] ... results = [] ... for g, df_g in df.groupby("group"): ... futures.append(executor.submit(get_name_max_error, df_g)) ... for future in tqdm(as_completed(futures), total=len(futures)): ... results.append(future.result()) 100%|██████████| 676/676 [00:00<00:00, 1629.06it/s] >>> results = list(parallel_results) >>> print(len(results)) 676 >>> print(results[0]) {'name': 'CRYSU', 'error': 10.58879967892412}
So a couple of things:
- The loop looks way faster with 1629.06 iteration per second. However this is not quite real. In this scenario the computation is quite fast so by the time we reach the
tqdm
to monitor the execution some futures may be already completed. This will not make a significant difference for more expensive operations.
- The first result is different than before! This is because the first result to complete was not the first result submitted. The previous method kept the original order while this one not.
Depending of your use case you might want one or the other. But at least we know what is happening behind the scenes.
Bonus Track: Using Dask progress bar
Dask is a popular pandas like parallelization framework. We can take this same use case and parallelize it with Dask DataFrames.
>>> import pandas as pd >>> import dask.dataframe as dd >>> ddf = dd.from_pandas(df, npartitions=4)
We can use the built in
tqdm.dask.TqdmCallback
to monitor the execution.>>> with TqdmCallback(desc="compute"): ... results: pd.Series = ddf.groupby("group").apply(get_name_max_error, meta=pd.Series([{}])).compute() compute: 100%|█████████████████████| 18/18 [00:05<00:00, 3.09it/s]
Just keep in mind that this changes the results and now we have a pandas Series. Also, the progress bar is showing the progress of the compute graph. As you can see there were 18 steps in that graph because they depend on the partitions of the data and the operations applied over the partitions.