Home Crucial parameters for streaming in Polars
Post
Cancel

Crucial parameters for streaming in Polars

In this post we see how Polars sets some crucial parameters that affect streaming mode. Understanding these concepts is important if you want to optimize the performance of a large streaming query or handle out-of-memory errors on a streaming query.

Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course

In streaming mode Polars processes data in chunks rather than all at once. This allows Polars to handle larger-than-memory datasets without much additional effort from you. This raises the question - how does Polars decide how big the chunks should be?

When streaming mode was first introduced the answer was simple as the chunk size was set to 50000 rows. More recently, the chunk size has been changed to be a function of the number of columns in the dataframe and the number of threads available.

Creating a simple example dataset

We will create a simple test set for this of 3 CSVs with random data:

1
2
3
4
5
6
7
8
9
10
11
import polars as pl
import numpy as np

for i in range(3):
    (
        pl.DataFrame(
            np.random.standard_normal((10,3))
        )
        .with_row_count('id')
        .write_csv(f"test_{i}.csv")
    )

Each DataFrame looks something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shape: (10, 4)
┌─────┬───────────┬───────────┬───────────┐
│ id  ┆ column_0  ┆ column_1  ┆ column_2  │
│ --- ┆ ---       ┆ ---       ┆ ---       │
│ i64 ┆ f64       ┆ f64       ┆ f64       │
╞═════╪═══════════╪═══════════╪═══════════╡
│ 0   ┆ 1.189349  ┆ -0.288161 ┆ -0.156299 │
│ 1   ┆ 0.602317  ┆ -0.108463 ┆ -1.029294 │
│ 2   ┆ 0.230017  ┆ 0.18799   ┆ 1.251513  │
│ 3   ┆ -0.706367 ┆ 0.407965  ┆ -0.509364 │
│ …   ┆ …         ┆ …         ┆ …         │
│ 6   ┆ -0.996497 ┆ 0.359672  ┆ -0.382435 │
│ 7   ┆ 0.310096  ┆ -1.165665 ┆ 1.444398  │
│ 8   ┆ -1.427343 ┆ -0.762879 ┆ -0.077449 │
│ 9   ┆ 1.248644  ┆ 0.498989  ┆ 0.96604   │
└─────┴───────────┴───────────┴───────────┘

We use a simple query where we group by the id column and sum the floating-point columns:

1
2
3
4
5
6
7
(
    pl.scan_csv("test_*.csv")
    .groupby('id')
    .agg(
        pl.col(pl.Float64).sum()
    )
)

As we use pl.scan_csv this is a lazy query and so nothing happens at this point.

For any lazy query we can see the optimised query plan using the explain method. For this query with no arguments passed to explain we see:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 print(
        (
            pl.scan_csv("test_*.csv")
            .groupby('id')
            .agg(
                pl.col(pl.Float64).sum()
            )
            .explain()
        )
 )
 AGGREGATE
	[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
	FAST_PROJECT: [column_0, column_1, column_2, id]
  UNION:
  PLAN 0:

      CSV SCAN test_0.csv
      PROJECT */4 COLUMNS
  PLAN 1:

      CSV SCAN test_1.csv
      PROJECT */4 COLUMNS
  PLAN 2:

      CSV SCAN test_2.csv
      PROJECT */4 COLUMNS
  END UNION

We read this query plan from the bottom up. -

  • The CSV SCAN nodes read the CSV files and the PROJECT part selects the columns we want (all 4 columns in this case).
  • The UNION node combines the results from the three CSV files into a single DataFrame.
  • The AGGREGATE node does the groupby and sum.

This is the query plan that Polars will exectute if we use the default non-streaming engine with .collect().

Is my query running in streaming mode?

What would happen if were to try executing the query in streaming mode with .collect(streaming=True)? We can see the query plan for this by passing streaming=True to the explain method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 print(
    pl.scan_csv("test*.csv")
    .groupby("id")
    .agg(
        pl.col(pl.Float64).sum()
    )
    .explain(streaming=True)
)
STREAMING CHUNK SIZE: 12500 rows
--- PIPELINE
AGGREGATE
	[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
	FAST_PROJECT: [column_0, column_1, column_2, id]
  UNION:
  PLAN 0:

      CSV SCAN test_0.csv
      PROJECT */4 COLUMNS
  PLAN 1:

      CSV SCAN test_1.csv
      PROJECT */4 COLUMNS
  PLAN 2:

      CSV SCAN test_2.csv
      PROJECT */4 COLUMNS
  END UNION  --- END PIPELINE

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

The --- PIPELINE and --- END PIPELINE lines show us that the query will be executed in streaming mode. The STREAMING CHUNK SIZE tells us that Polars will process the data in chunks of (up to) 12500 rows.

Be aware - this functionality for explain(streaming=True) is experimental and still in alpha stage.

How is the chunk size calculated?

A key parameter affecting performance in streaming mode is the chunk size. The chunk size is the number of rows that Polars processes in each batch. At the top of the streaming query plan we see the chunk size is 12500 rows. This value is set by Polars based on:

  • the number of threads Polars will use and
  • the number of columns in the dataset

We see below how we can alter the chunk size, but first we will look at how Polars currently calculates the default chunk size for each query.

The chunk size formula is a function of the number of columns and the number of threads. We start by looking at the number of threads.

How many threads?

Before we look at the chunk size formula we need to understand how many threads Polars will use. By default the number of threads is set equal to the number of CPUs on your machine (or perhaps the maximum number of CPUs available inside a Docker container).

You can check the maximum number of threads Polars will use with the pl.threadpool_size()function

1
2
import polars as pl
pl.threadpool_size()

On my machine this gives a value of 12, unless I’m in Docker where I’ve set the max CPUs to 7.

You can overwrite the maximum number of threads by setting the POLARS_MAX_THREADS environment variable.

1
2
3
import os
os.environ["POLARS_MAX_THREADS"] = "4"
import polars as pl

Be aware - you must set this environment variable before importing Polars. If you set it after importing Polars it will have no effect.

In my case I find that the default number of threads works well and I almost never change it. However, you may want to reduce the number of threads if you are running other CPU intensive tasks at the same time as Polars.

If you have a dedicated machine for your queries you may want to increase the number of threads. My experiments on this are inconclusive - in some cases I’ve found that increasing the number of threads improves performance and in other cases it doesn’t have much effect or disimproves performance 🤷.

Chunk size formula

The current formula for setting the chunk size is set out here but it boils down to this:

1
2
        thread_factor = max(12 / n_threads, 1)
        STREAMING_CHUNK_SIZE = max(50_000 / n_cols * thread_factor, 1000))

The thread_factor is set to be the maximum 1 and 12/n_threads. The thread_factor will be 1 if you have 12 or more threads and greater than this if you have fewer threads.

The chunk size is set to be the greater of:

  • 1000 or
  • 50000 divided by the number of columns in the DataFrame multiplied by n_threads.

In my case with 12 threads the thread_factor is 1 and so the chunk size if simply 50,000 divided by the number of columns in the LazyFrame. As my LazyFrame has 4 columns then the chunk size is 12,500 rows.

I expect this formula to evolve over time as Polars is optimized for different use cases.

Setting chunk size manually

The optimal value for chunk size will depend on your data, query and hardware. You can set the chunk size manually with the pl.Config.set_streaming_chunk_size function. In this example I set it to 50,000 rows:

1
pl.Config.set_streaming_chunk_size(50000)

Unlike the POLARS_MAX_THREADS environment variable, you can set the chunk size at any time. For example, if I set this to 50,000 rows and then run explain(streaming=True) again I get the following output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
STREAMING CHUNK SIZE: 50000 rows
--- PIPELINE
AGGREGATE
	[col("column_0").sum(), col("column_1").sum(), col("column_2").sum()] BY [col("id")] FROM
	FAST_PROJECT: [column_0, column_1, column_2, id]
  UNION:
  PLAN 0:

      CSV SCAN test_0.csv
      PROJECT */4 COLUMNS
  PLAN 1:

      CSV SCAN test_1.csv
      PROJECT */4 COLUMNS
  PLAN 2:

      CSV SCAN test_2.csv
      PROJECT */4 COLUMNS
  END UNION  --- END PIPELINE

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

Here we see that the streaming chunk size is now 50,000 rows.

If you are working with large datasets I recommend you experiment with different chunk sizes to find the optimal value for your data and hardware - this can make a major difference to performance.

Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course )

For more on streaming check out these posts:

or this video where I process a 30 Gb dataset on a not-very-impressive laptop.

Next steps

Want to know more about Polars for high performance data science? Then you can:

This post is licensed under CC BY 4.0 by the author.