Home Reading and writing files on S3 with Polars
Post
Cancel

Reading and writing files on S3 with Polars

In this post we see how to read and write from a CSV or Parquet file from cloud storage with Polars. We also see how Polars applies query optimisations to reduce the amount of data transferred across the network. We use S3 as the example in this case but the same principles apply to other cloud storage services like Google Cloud Storage or Azure Blob Storage.

Want to accelerate your analysis with Polars? Join over 3,000 learners on my highly-rated Up & Running with Polars course.

Setting up

For this example we create a time series DataFrame with data every minute for 2 years. This produces a DataFrame with just over 1 million rows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from datetime import datetime
import polars as pl
import s3fs

# Start and end dates for the time series
start_datetime = datetime(2020,1,1)
end_datetime = datetime(2022,1,1)
df = (
    pl.DataFrame(
        {
            "datetime": pl.datetime_range(start_datetime, end_datetime, interval="1m", eager=True),
        }
    )
    # Add a column of values
    .with_row_index(name="value")
    .with_columns(
        # Add some more columns with time series features
        pl.col("datetime").dt.hour().alias("hour"),
        pl.col("datetime").dt.day().alias("day"),
    )
    .select(["datetime","hour","day","value"])
)

And this is what the DataFrame looks like:

1
2
3
4
5
6
7
8
9
10
11
12
shape: (5, 2)
┌─────────────────────┬───────┐
│ datetime            ┆ value │
│ ---                 ┆ ---   │
│ datetime[μs]        ┆ u32   │
╞═════════════════════╪═══════╡
│ 2020-01-01 00:00:00 ┆ 0     │
│ 2020-01-01 00:01:00 ┆ 1     │
│ 2020-01-01 00:02:00 ┆ 2     │
│ 2020-01-01 00:03:00 ┆ 3     │
│ 2020-01-01 00:04:00 ┆ 4     │
└─────────────────────┴───────┘

Writing a file to S3

We now write this DataFrame to both a CSV and Parquet file on S3 using s3fs. The s3fs library allows you to write files to S3 with similar syntax to working on a local file system.

1
2
3
4
5
6
7
8
9
bucket_name = "my_bucket"
csv_key = "test_write.csv"
parquet_key = "test_write.parquet"


with fs.open(f"{bucket_name}/{csv_key}", mode="wb") as f:
    df.write_csv(f)
with fs.open(f"{bucket_name}/{parquet_key}", mode="wb") as f:
    df.write_parquet(f)

I recommend using the Parquet format over CSV as it has:

  • a smaller file size due
  • preserves dtypes and
  • makes subsequent reads faster (as we see below).

Reading a file from S3

We can use Polars to read the entire file back from S3. Note that for the Parquet file we need to specify the region of the bucket and we pass this as a storage_options dictionary to the read_parquet function.

1
2
3
4
5
6
7
8
9
10
11
df_csv = pl.read_csv(f"s3://{bucket_name}/{csv_key}")

storage_options = {
    "region": "us-east-2",
}
df_parquet = (
    pl.read_parquet(
        f"s3://{bucket_name}/{parquet_key}", 
        storage_options=storage_options
        )
)

While the CSV and Parquet approach are similar from a user perspective, they work quite different internally.

For a CSV file Polars downloads the remote file into a cache using ffspec and then reads from the cache into a DataFrame. This is a fast approach but it does require that the whole file is downloaded first.

For a Parquet file Polars internally uses the Rust object_store crate. The object_store crate is a wrapper for the Rust implementation of the AWS Software Development Kit (SDK), similar to using the AWS CLI.

By using object_store Polars has more granular control over file interactions, which is valuable for Parquet files. For example, this control allows Polars to first read only the metadata of the Parquet file. This control also allows Polars to download data in parallel by either columns or row groups as described here.

Authentication

To read and write files from S3 you need to authenticate with AWS and you may have noticed the lack of authentication in the code above. This is because Polars has some automatic ways to authenticate with AWS and the other major cloud providers.

In this case of querying a file on S3, for example, Polars checks if the boto3 package is installed. If it is, then Polars uses boto3 methods internally to authenticate with AWS. In the simplest cases this means that boto3 will parse the AWS credentials in your .aws/credentials file and use them to authenticate with AWS. If you have set an AWS_PROFILE environment variable then boto3 will use the credentials in the profile you specify. And if you are using AWS session tokens as part of an AWS Single Sign-On (SSO) setup then boto3 will use these tokens to authenticate.

If you need more control over the authentication details then you can pass a dictionary of options to the storage_options parameter that specify parameters such as the aws_access_key_id, aws_secret_access_key and aws_region:

1
2
3
4
5
6
7
8
9
10
storage_options = {
    "aws_access_key_id": "<secret>",
    "aws_secret_access_key": "<secret>",
    "aws_region": "us-east-1",
}
df = (
    pl.read_csv(f"s3://{bucket}/{csv_key}", 
    storage_options=storage_options
    )
)

Scanning a Parquet file on S3 with query optimisation

With a Parquet file we can scan the file on S3 and build a lazy query. With Parquet we can apply some query optimisations to limit the number of rows and columns we download from S3.

The two query optimisations that are particularly relevant are:

  • predicate pushdown meaning that Polars tries to limit the number of rows to read and
  • projection pushdown meaning that Polars tries to limit the number of columns to read

We do this lazy query with pl.scan_parquet where we only want to read the datetime and value columns and only the rows where the value column is less than 100,000.

1
2
3
4
5
6
7
8
9
10
11
12
13
# The source of the Parquet file on S3
source = f"s3://{bucket_name}/{parquet_key}"
# The storage options for the S3 bucket
storage_options = {
    "region": "us-east-2",
}

df = (
    pl.scan_parquet(source, storage_options=storage_options)
    .filter(pl.col("value") < 100_000)
    .select("datetime","value")
    .collect()
)

In this query we have both predicate pushdown and projection pushdown query optimisations.

For the projection pushdown (i.e. selecting only the columns we need) Polars only reads the columns datetime and value from the Parquet file on S3.

The predicate pushdown (i.e. filtering the rows we read) works in a different way to the projection pushdown. Internally, a Parquet file is divided into row groups. By default in Polars these row groups are 512^2 (about 256k rows). When we write a Parquet file with Polars, then Polars writes metadata about each row group to the file footer. This metadata includes the minimum and maximum values of each column in each row group. When we scan a Parquet file with a filter condition, Polars reads this metadata and uses it to decide which row groups to read. In this case Polars would read only the first row group where the id column is less than 100,000. This can be much faster and more memory efficient than reading the whole file.

Scanning a CSV file on S3 with query optimisation

We can also scan a CSV file in cloud storage:

1
2
3
4
5
6
7
8
9
10
11
12
source_csv = f"s3://{bucket_name}/{csv_key}"

df = (
    # Scan the file on S3
    pl.scan_csv(source_csv, storage_options=storage_options)
    # Apply a filter condition to limit the rows we read
    .filter(pl.col("value") < 100_000)
    # Select only the columns we need
    .select("datetime","value")
    # Evaluate the query
    .collect()
)

As a CSV is a big blog of data without the column and row group structure of a Parquet file, query optimisations cannot be as effective. In this case Polars will have to download the whole CSV file before it can start to filter rows and select the columns. We can see difference in effectiveness between file formats by a performance comparison: on my machine the query on the Parquet file takes about 1 second while the query on the CSV file takes about 5 seconds.

Polars still has come tricks up its sleeve. For example, when Polars downloads the CSV file from the cloud it stores it in a cache for a period of time. This means that if you run another query against the same source within this time period then Polars will use the cached file rather than downloading it again. On my machine, for example, a second run of this query takes 0.5 seconds instead of 5 seconds. We can control the length of time the file is kept in the cache with the file_cache_ttl argument to pl.scan_csv.

For example, to scan a CSV file with a 2 hour cache we would use:

1
2
    pl.scan_csv(source_csv, storage_options=storage_options, file_cache_ttl=7200)
    .collect()

This caching also applies to scanning NDJSON files with pl.scan_ndjson from the cloud

Wrap-up

In this post we have seen how to read and write files from S3 with Polars.

Again, I invite you to take my Polars course if you want to learn the Polars API and how to use Polars in the real world.

Next steps

Want to know more about Polars for high performance data science? Then you can:

This post is licensed under CC BY 4.0 by the author.