Polars can significantly accelerate Parquet file reads. In this post, we demonstrate how to leverage Polars query optimizations to enhance the efficiency of reading from a Parquet file.
Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course
But what are Parquet files?
Apache Parquet is a file format designed for efficient data storage and retrieval of analytics data. Think of it as a smart filing cabinet that organizes data in a way that can make it super fast to access and analyze large datasets. However, in order for reading data from the file to be fast you need a smart tool to read it. That’s where Polars comes in.
Reading subsets of columns
Imagine you’re working with a dataset of customer transactions:
1
2
3
4
5
6
7
transactions = {
'customer_id': [1001, 1002, 1003],
'timestamp': ['2024-01-01 10:00:00', '2024-01-01 10:05:00'],
'product_id': ['A123', 'B456', 'C789'],
'amount': [99.99, 50.00, 75.50],
'store_location': ['NY', 'CA', 'TX']
}
With CSV files, if you only need to analyze the customer_id
and amount
columns, you still need to read the entire file. Parquet, however, lets you read just the columns you need.
This means that the following Polars query will only read the customer_id
and amount
columns from the Parquet file:
1
2
3
4
5
6
import polars as pl
df = (
pl.scan_parquet('transactions.parquet')
.select(['customer_id','amount'])
.collect()
)
This query optimization is what’s known as a projection pushdown. The good news is that Polars can still do projection pushdown even if your Parquet file is stored in a cloud storage service like AWS S3 or Google Cloud Storage.
Reading subsets of rows
A lesser-known advantage of Parquet files is that the data storage is divided into row groups, so that we can limit the data we read from the file to in cases where we only need a subset of the rows.
This means that the following Polars query will only read the row groups that contain rows where the customer_id
is less than 10000:
1
2
3
4
5
df = (
pl.scan_parquet('transactions.parquet')
.filter(pl.col('customer_id') < 10000)
.collect()
)
This query is an example of what is referred to as a predicate pushdown query optimization. The optimization is called pushdown because the predicate is pushed down to the Parquet reader, rather than waiting for the full file to be read into a DataFrame
and then filtering the rows. Again, Polars can still do this for Parquet files in cloud storage.
There are some things to be aware of if you want to take advantage of the predicate pushdown over row groups…
Firstly, in order for Polars to be able to identify which row groups to read, the Parquet file must have been written with row group statistics metadata. Polars has done this by default for some time now but not all Parquet writers write statistics by default. The statistics metadata is pretty simple by the way - the statistics simply keep track of the max and min values of each column in each row group.
Secondly, you will only get a speed-up from this query optimization if the data you are filtering for is concentrated in a subset of the row groups. If the customer_ids
less than 10000 are spread across all row groups then you won’t see any speed-up. You can enforce this clustering for a column by sorting the data before writing it to Parquet.
Thirdly, in my experience you will only get a performance improvement from row group filtering if your data set is sufficiently large. The default row group size in Polars is 512^2 - about 250k rows. So your dataset needs to have at least a few million rows to see a performance improvement from row group filtering. I have tried experimenting with much smaller row group size and then writing a filter which targets these small row groups, but I didn’t see much improvement over the default row group size. This reflects the overhead of other parts of the query processing.
Controlling parallelism
Polars reads a Parquet file in parallel. It can do this by either reading columns in parallel or row groups in parallel. So how does it choose which to do?
The basic rule is that Polars counts how many columns and row groups there are and then parallelizes the reading of the larger of the two. So if there are 10 row groups and 20 columns, Polars will read the columns in parallel. If there are 20 row groups and 10 columns, Polars will read the row groups in parallel. This is a solid basic strategy but it isn’t guaranteed to be the fastest for your data.
We can instead tell Polars to do parallelism by columns
or row_groups
by setting the parallel
argument in the scan_parquet
function. For example, to read columns in parallel:
1
2
3
4
df = (
pl.scan_parquet('transactions.parquet', parallel='columns')
.collect()
)
There is a also a new alternative strategy called prefiltered
, which is a combination of column and row group parallelism. The prefiltered
strategy first evaluates the pushed-down predicates in parallel and determines which rows need to read. Then, this strategy parallelizes over both the columns and the row groups while filtering out rows that do not need to be read.
In some cases with large files and significant filtering the prefiltered
can provide a significant speedup. In other cases, the prefiltered
strategy may be slower. You can see for yourself by setting the parallel
argument in the scan_parquet
function.
1
2
3
4
df = (
pl.scan_parquet('transactions.parquet', parallel='prefiltered')
.collect()
)
Overall, if querying a large Parquet file is a bottleneck in your pipeline it may be worth experimenting with this argument to see if you can get a speed-up.
Get in touch on social media if you have any questions or comments.
Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course
Next steps
Want to know more about Polars for high performance data science? Then you can: