In this post we see how to read and write from a CSV or Parquet file from cloud storage with Polars. We also see how Polars applies query optimisations to reduce the amount of data transferred across the network. We use S3 as the example in this case but the same principles apply to other cloud storage services like Google Cloud Storage or Azure Blob Storage.
Want to accelerate your analysis with Polars? Join over 2,000 learners on my highly-rated Up & Running with Polars course.
Setting up
For this example we create a time series DataFrame
with data every minute for 2 years. This produces a DataFrame
with just over
1 million rows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from datetime import datetime
import polars as pl
import s3fs
# Start and end dates for the time series
start_datetime = datetime(2020,1,1)
end_datetime = datetime(2022,1,1)
df = (
pl.DataFrame(
{
"datetime": pl.datetime_range(start_datetime, end_datetime, interval="1m", eager=True),
}
)
# Add a column of values
.with_row_index(name="value")
.with_columns(
# Add some more columns with time series features
pl.col("datetime").dt.hour().alias("hour"),
pl.col("datetime").dt.day().alias("day"),
)
.select(["datetime","hour","day","value"])
)
And this is what the DataFrame
looks like:
1
2
3
4
5
6
7
8
9
10
11
12
shape: (5, 2)
┌─────────────────────┬───────┐
│ datetime ┆ value │
│ --- ┆ --- │
│ datetime[μs] ┆ u32 │
╞═════════════════════╪═══════╡
│ 2020-01-01 00:00:00 ┆ 0 │
│ 2020-01-01 00:01:00 ┆ 1 │
│ 2020-01-01 00:02:00 ┆ 2 │
│ 2020-01-01 00:03:00 ┆ 3 │
│ 2020-01-01 00:04:00 ┆ 4 │
└─────────────────────┴───────┘
Writing a file to S3
We now write this DataFrame
to both a CSV and Parquet file on
S3 using s3fs
. The s3fs
library allows you to write files to S3 with similar syntax to working on a local file system.
1
2
3
4
5
6
7
8
9
bucket_name = "my_bucket"
csv_key = "test_write.csv"
parquet_key = "test_write.parquet"
with fs.open(f"{bucket_name}/{csv_key}", mode="wb") as f:
df.write_csv(f)
with fs.open(f"{bucket_name}/{parquet_key}", mode="wb") as f:
df.write_parquet(f)
I recommend using the Parquet format over CSV as it has:
- a smaller file size due
- preserves dtypes and
- makes subsequent reads faster (as we see below).
Reading a file from S3
We can use Polars to read the entire file back from S3. Note that for the Parquet file we need to specify the region
of the bucket and we pass this as a storage_options
dictionary to the read_parquet
function.
1
2
3
4
5
6
7
8
9
10
11
df_csv = pl.read_csv(f"s3://{bucket_name}/{csv_key}")
storage_options = {
"region": "us-east-2",
}
df_parquet = (
pl.read_parquet(
f"s3://{bucket_name}/{parquet_key}",
storage_options=storage_options
)
)
While the CSV and Parquet approach are similar from a user perspective, they work quite different internally.
For a CSV file Polars downloads the remote file into a cache using ffspec and then reads from the cache into a DataFrame. This is a fast approach but it does require that the whole file is downloaded first.
For a Parquet file Polars internally uses the Rust object_store
crate. The object_store
crate is a wrapper for the
Rust implementation of the AWS Software Development Kit (SDK), similar to using the AWS CLI.
By using object_store
Polars has more granular control over file interactions, which is valuable for Parquet files. For example, this control allows Polars to first read only the metadata of the Parquet file. This control also allows Polars to download data in parallel by either columns or row groups as described here.
Authentication
To read and write files from S3 you need to authenticate with AWS and you may have noticed the lack of authentication in the code above. This is because Polars has some automatic ways to authenticate with AWS and the other major cloud providers.
In this case of querying a file on S3, for example, Polars checks if the boto3
package is installed. If it is,
then Polars uses boto3
methods internally to authenticate with AWS. In the simplest cases this means that boto3
will parse the
AWS credentials in your .aws/credentials
file and use them to authenticate with AWS. If you have set an AWS_PROFILE
environment variable then boto3
will use the credentials in the profile you specify. And if you are using AWS session tokens
as part of an AWS Single Sign-On (SSO) setup then boto3
will use these tokens to authenticate.
If you need more control over the authentication details then you can pass a dictionary of options to the storage_options
parameter
that specify parameters such as the aws_access_key_id
, aws_secret_access_key
and aws_region
:
1
2
3
4
5
6
7
8
9
10
storage_options = {
"aws_access_key_id": "<secret>",
"aws_secret_access_key": "<secret>",
"aws_region": "us-east-1",
}
df = (
pl.read_csv(f"s3://{bucket}/{csv_key}",
storage_options=storage_options
)
)
Scanning a Parquet file on S3 with query optimisation
With a Parquet file we can scan the file on S3 and build a lazy query. With Parquet we can apply some query optimisations to limit the number of rows and columns we download from S3.
The two query optimisations that are particularly relevant are:
- predicate pushdown meaning that Polars tries to limit the number of rows to read and
- projection pushdown meaning that Polars tries to limit the number of columns to read
We do this lazy query with pl.scan_parquet
where we only want to read the datetime
and value
columns and only the rows where the value
column is less than 100,000.
1
2
3
4
5
6
7
8
9
10
11
12
13
# The source of the Parquet file on S3
source = f"s3://{bucket_name}/{parquet_key}"
# The storage options for the S3 bucket
storage_options = {
"region": "us-east-2",
}
df = (
pl.scan_parquet(source, storage_options=storage_options)
.filter(pl.col("value") < 100_000)
.select("datetime","value")
.collect()
)
In this query we have both predicate pushdown and projection pushdown query optimisations.
For the projection pushdown (i.e. selecting only the columns we need) Polars only reads the columns datetime
and value
from the Parquet file on S3.
The predicate pushdown (i.e. filtering the rows we read) works in a different way to the projection pushdown. Internally, a Parquet file is divided into row groups. By default in Polars these row groups are 512^2 (about 256k rows). When we write a Parquet file with Polars,
then Polars writes metadata about each row group to the file footer. This metadata includes the minimum and maximum values of each column in each row group. When we scan a Parquet file with a filter condition, Polars reads this metadata and uses it to decide
which row groups to read. In this case Polars would read only the first row group where the id
column is less than 100,000. This can be much faster and more memory efficient than reading the whole file.
Scanning a CSV file on S3 with query optimisation
We can also scan a CSV file in cloud storage:
1
2
3
4
5
6
7
8
9
10
11
12
source_csv = f"s3://{bucket_name}/{csv_key}"
df = (
# Scan the file on S3
pl.scan_csv(source_csv, storage_options=storage_options)
# Apply a filter condition to limit the rows we read
.filter(pl.col("value") < 100_000)
# Select only the columns we need
.select("datetime","value")
# Evaluate the query
.collect()
)
As a CSV is a big blog of data without the column and row group structure of a Parquet file, query optimisations cannot be as effective. In this case Polars will have to download the whole CSV file before it can start to filter rows and select the columns. We can see difference in effectiveness between file formats by a performance comparison: on my machine the query on the Parquet file takes about 1 second while the query on the CSV file takes about 5 seconds.
Polars still has come tricks up its sleeve. For example, when Polars downloads the CSV file from the cloud it stores it in a cache
for a period of time. This means that if you run another query against the same source within this time period then Polars will use the cached file rather than downloading it again. On my machine, for example, a second run of this query takes 0.5 seconds instead of 5 seconds.
We can control the length of time the file is kept in the cache with the file_cache_ttl
argument to pl.scan_csv
.
For example, to scan a CSV file with a 2 hour cache we would use:
1
2
pl.scan_csv(source_csv, storage_options=storage_options, file_cache_ttl=7200)
.collect()
This caching also applies to scanning NDJSON files with
pl.scan_ndjson
from the cloud
Wrap-up
In this post we have seen how to read and write files from S3 with Polars.
Again, I invite you to take my Polars course if you want to learn the Polars API and how to use Polars in the real world.
Next steps
Want to know more about Polars for high performance data science? Then you can: