In this post we see how Polars sets some crucial parameters that affect streaming mode. Understanding these concepts is important if you want to optimize the performance of a large streaming query or handle out-of-memory errors on a streaming query.
Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course
In streaming mode Polars processes data in chunks rather than all at once. This allows Polars to handle larger-than-memory datasets without much additional effort from you. This raises the question - how does Polars decide how big the chunks should be?
When streaming mode was first introduced the answer was simple as the chunk size was set to 50000 rows. More recently, the chunk size has been changed to be a function of the number of columns in the dataframe and the number of threads available.
Creating a simple example dataset
We will create a simple test set for this of 3 CSVs with random data:
1
2
3
4
5
6
7
8
9
10
11
import polars as pl
import numpy as np
for i in range(3):
(
pl.DataFrame(
np.random.standard_normal((10,3))
)
.with_row_count('id')
.write_csv(f"test_{i}.csv")
)
Each DataFrame
looks something like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shape: (10, 4)
┌─────┬───────────┬───────────┬───────────┐
│ id ┆ column_0 ┆ column_1 ┆ column_2 │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 ┆ f64 │
╞═════╪═══════════╪═══════════╪═══════════╡
│ 0 ┆ 1.189349 ┆ -0.288161 ┆ -0.156299 │
│ 1 ┆ 0.602317 ┆ -0.108463 ┆ -1.029294 │
│ 2 ┆ 0.230017 ┆ 0.18799 ┆ 1.251513 │
│ 3 ┆ -0.706367 ┆ 0.407965 ┆ -0.509364 │
│ … ┆ … ┆ … ┆ … │
│ 6 ┆ -0.996497 ┆ 0.359672 ┆ -0.382435 │
│ 7 ┆ 0.310096 ┆ -1.165665 ┆ 1.444398 │
│ 8 ┆ -1.427343 ┆ -0.762879 ┆ -0.077449 │
│ 9 ┆ 1.248644 ┆ 0.498989 ┆ 0.96604 │
└─────┴───────────┴───────────┴───────────┘
We use a simple query where we group by the id
column and sum the floating-point columns:
1
2
3
4
5
6
7
(
pl.scan_csv("test_*.csv")
.groupby('id')
.agg(
pl.col(pl.Float64).sum()
)
)
As we use pl.scan_csv
this is a lazy query and so nothing happens at this point.
For any lazy query we can see the optimised query plan using the explain
method. For this query with no arguments passed to explain
we see:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
print(
(
pl.scan_csv("test_*.csv")
.groupby('id')
.agg(
pl.col(pl.Float64).sum()
)
.explain()
)
)
AGGREGATE
[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
FAST_PROJECT: [column_0, column_1, column_2, id]
UNION:
PLAN 0:
CSV SCAN test_0.csv
PROJECT */4 COLUMNS
PLAN 1:
CSV SCAN test_1.csv
PROJECT */4 COLUMNS
PLAN 2:
CSV SCAN test_2.csv
PROJECT */4 COLUMNS
END UNION
We read this query plan from the bottom up. -
- The
CSV SCAN
nodes read the CSV files and thePROJECT
part selects the columns we want (all 4 columns in this case). - The
UNION
node combines the results from the three CSV files into a singleDataFrame
. - The
AGGREGATE
node does the groupby and sum.
This is the query plan that Polars will exectute if we use the default non-streaming engine with .collect()
.
Is my query running in streaming mode?
What would happen if were to try executing the query in streaming mode with .collect(streaming=True)
?
We can see the query plan for this by passing streaming=True
to the explain
method:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
print(
pl.scan_csv("test*.csv")
.groupby("id")
.agg(
pl.col(pl.Float64).sum()
)
.explain(streaming=True)
)
STREAMING CHUNK SIZE: 12500 rows
--- PIPELINE
AGGREGATE
[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
FAST_PROJECT: [column_0, column_1, column_2, id]
UNION:
PLAN 0:
CSV SCAN test_0.csv
PROJECT */4 COLUMNS
PLAN 1:
CSV SCAN test_1.csv
PROJECT */4 COLUMNS
PLAN 2:
CSV SCAN test_2.csv
PROJECT */4 COLUMNS
END UNION --- END PIPELINE
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
The --- PIPELINE
and --- END PIPELINE
lines show us that the query will be executed in streaming mode.
The STREAMING CHUNK SIZE
tells us that Polars will process the data in chunks of (up to) 12500 rows.
Be aware - this functionality for explain(streaming=True)
is experimental and still in alpha stage.
How is the chunk size calculated?
A key parameter affecting performance in streaming mode is the chunk size. The chunk size is the number of rows that Polars processes in each batch. At the top of the streaming query plan we see the chunk size is 12500 rows. This value is set by Polars based on:
- the number of threads Polars will use and
- the number of columns in the dataset
We see below how we can alter the chunk size, but first we will look at how Polars currently calculates the default chunk size for each query.
The chunk size formula is a function of the number of columns and the number of threads. We start by looking at the number of threads.
How many threads?
Before we look at the chunk size formula we need to understand how many threads Polars will use. By default the number of threads is set equal to the number of CPUs on your machine (or perhaps the maximum number of CPUs available inside a Docker container).
You can check the maximum number of threads Polars will use with the pl.threadpool_size()
function
1
2
import polars as pl
pl.threadpool_size()
On my machine this gives a value of 12, unless I’m in Docker where I’ve set the max CPUs to 7.
You can overwrite the maximum number of threads by setting the POLARS_MAX_THREADS
environment variable.
1
2
3
import os
os.environ["POLARS_MAX_THREADS"] = "4"
import polars as pl
Be aware - you must set this environment variable before importing Polars. If you set it after importing Polars it will have no effect.
In my case I find that the default number of threads works well and I almost never change it. However, you may want to reduce the number of threads if you are running other CPU intensive tasks at the same time as Polars.
If you have a dedicated machine for your queries you may want to increase the number of threads. My experiments on this are inconclusive - in some cases I’ve found that increasing the number of threads improves performance and in other cases it doesn’t have much effect or disimproves performance 🤷.
Chunk size formula
The current formula for setting the chunk size is set out here but it boils down to this:
1
2
thread_factor = max(12 / n_threads, 1)
STREAMING_CHUNK_SIZE = max(50_000 / n_cols * thread_factor, 1000))
The thread_factor
is set to be the maximum 1 and 12/n_threads
. The thread_factor
will be 1 if you have 12 or more threads and greater than this if you have fewer threads.
The chunk size is set to be the greater of:
- 1000 or
- 50000 divided by the number of columns in the
DataFrame
multiplied byn_threads
.
In my case with 12 threads the thread_factor
is 1 and so the chunk size if simply 50,000 divided by the number of columns in the LazyFrame
. As my LazyFrame
has 4 columns then the chunk size is 12,500 rows.
I expect this formula to evolve over time as Polars is optimized for different use cases.
Setting chunk size manually
The optimal value for chunk size will depend on your data, query and hardware. You can set the chunk size manually with the pl.Config.set_streaming_chunk_size
function. In this example I set it to 50,000 rows:
1
pl.Config.set_streaming_chunk_size(50000)
Unlike the POLARS_MAX_THREADS
environment variable, you can set the chunk size at any time. For example, if I set this to 50,000 rows and then run explain(streaming=True)
again I get the following output:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
STREAMING CHUNK SIZE: 50000 rows
--- PIPELINE
AGGREGATE
[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
FAST_PROJECT: [column_0, column_1, column_2, id]
UNION:
PLAN 0:
CSV SCAN test_0.csv
PROJECT */4 COLUMNS
PLAN 1:
CSV SCAN test_1.csv
PROJECT */4 COLUMNS
PLAN 2:
CSV SCAN test_2.csv
PROJECT */4 COLUMNS
END UNION --- END PIPELINE
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
Here we see that the streaming chunk size is now 50,000 rows.
If you are working with large datasets I recommend you experiment with different chunk sizes to find the optimal value for your data and hardware - this can make a major difference to performance.
Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course )
For more on streaming check out these posts:
- using streaming mode defensively
- writing large queries to Parquet with streaming
- working with multiple large files
- ordering of
unique
andgroupby
and their relationship to streaming (https://www.rhosignal.com/posts/polars-ordering/)
or this video where I process a 30 Gb dataset on a not-very-impressive laptop.
Next steps
Want to know more about Polars for high performance data science? Then you can: