In a recent post I set out the key ideas behind streaming mode in Polars. However, streaming is still a developing part of Polars and not all operations support streaming.
Be aware: the Polars developers are working on a new streaming engine that will be more flexible and allow more queries to be executed in streaming mode. I will update this post when the new streaming engine is released.
Want to get going with Polars? Check out my Polars course here
My approach is to define a simple DataFrame
with 2 floating point columns which I then convert to a LazyFrame
(see code at the bottom of the post). I then try to call every expression on the LazyFrame
. I print the streaming query plan and check whether the expression works in streaming mode or not. I’m working with Polars version 0.20.5 here.
This isn’t a comprehensive test of what works and doesn’t but should help give you an intuition for what works and what doesn’t. The expressions that currently work in streaming mode with these floating point columns include:
'__abs__, __add__, __and__, __eq__, __floordiv__, __ge__, __getstate__, __gt__, __init__, __init_subclass__, __invert__, __le__, __lt__, __mod__, __mul__, __ne__, __neg__, __or__, __pos__, __pow__, __sizeof__, __truediv__, __xor__, abs, add, alias, and_, apply, arccos, arccosh, arcsin, arcsinh, arctan, arctanh, cast, cbrt, ceil, clip, cos, cosh, cot, degrees, eq, eq_missing, exp, fill_nan, fill_null, floor, floordiv, gather_every, ge, gt, hash, inspect, is_between, is_finite, is_in, is_infinite, is_nan, is_not, is_not_nan, is_not_null, is_null, keep_name, le, log, log10, log1p, lower_bound, lt, mod, mul, ne, ne_missing, not_, or_, pow, prefix, radians, rechunk, reinterpret, replace, round, round_sig_figs, set_sorted, shrink_dtype, sign, sin, sinh, sqrt, sub, suffix, tan, tanh, to_physical, truediv, upper_bound, xor'
Note that map_elements
isn’t yet includes in my list but should also work in streaming mode.
In contrast I find that the following expressions don’t work in streaming mode with the floating point columns:
'__array_ufunc__, agg_groups, all, any, append, approx_n_unique, arg_max, arg_min, arg_sort, arg_true, arg_unique, backward_fill, bottom_k, count, cum_count, cum_max, cum_min, cum_prod, cum_sum, cumcount, cummax, cummin, cumprod, cumsum, cumulative_eval, diff, dot, drop_nans, drop_nulls, entropy, ewm_mean, ewm_std, ewm_var, explode, extend_constant, filter, first, flatten, forward_fill, gather, get, head, hist, implode, interpolate, is_duplicated, is_first, is_first_distinct, is_last, is_last_distinct, is_unique, kurtosis, last, len, limit, max, mean, median, min, mode, n_unique, nan_max, nan_min, null_count, over, pct_change, peak_max, peak_min, product, quantile, rank, repeat_by, reshape, reverse, rle, rle_id, sample, search_sorted, shift, shift_and_fill, shuffle, skew, slice, sort, sort_by, std, sum, tail, take, top_k, unique, unique_counts, value_counts, var'
So what do we learn?
We can use this to understand a bit more about why an expression might appear in the first streaming list or the second non-streaming list:
An expression is much easier to do in streaming batches if the expression doesn’t require information from other rows to calculate. For example, add
is easy to do batchwise because we can just add the values in each row together.
In contrast, cumsum
is harder to do batchwise because we need to know the value of the previous row to calculate the value of the current row. At some point the previous row would be in a different batch from the first row.
So we see that the first list has more elementwise operations and the second list has more operations that require information from other rows.
Of course, this challenge of using information from other rows doesn’t mean that these expressions won’t make their way into the streaming engine in the future. It just means that the expressions are not supported right now.
If you are interested in the methodology see my python script at the bottom of the post.
Next steps
I’ll update this post as I cover more use cases such as what happens with different dtypes such as strings and categoricals and what happens with methods like filter
,group_by
and join
(all of which have at least partial support for streaming). If you have any questions or comments then get in touch on social media (links below).
If you would like more detailed support on working with Polars then I provide consulting on optimising your data processing pipelines with Polars. You can also check out my online course to get you up-and-running with Polars by clicking on the bear below
Want to know more about Polars for high performance data science? Then you can:
- get in touch to discuss your data processing challenges
- follow me on bluesky
- follow me on twitter
- connect with me at linkedin
- check out my youtube videos
Here’s my methodology for testing which expressions work in streaming mode:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import polars as pl
import numpy as np
df = pl.DataFrame({"a": [0.0, 1.0], "b": [2.0, 3.0]}).lazy()
# Make a list of expressions
exprs = dir(pl.Expr)
# Define bespoke operations for expressions that
# can't just be called as pl.col("a").expr()
bespoke_expr_dict = {
"__add__": pl.col("a").__add__(pl.col("b")),
"__and__": pl.col("a").__and__(pl.col("b")),
"__array_ufunc__": np.cos(pl.col("a")),
"__eq__": pl.col("a").__eq__(pl.col("b")),
"__floordiv__": pl.col("a").__floordiv__(pl.col("b")),
"__ge__": pl.col("a").__ge__(pl.col("b")),
"__gt__": pl.col("a").__gt__(pl.col("b")),
"__le__": pl.col("a").__le__(pl.col("b")),
"__lt__": pl.col("a").__lt__(pl.col("b")),
"__mod__": pl.col("a").__mod__(pl.col("b")),
"__mul__": pl.col("a").__mul__(pl.col("b")),
"__ne__": pl.col("a").__ne__(pl.col("b")),
"__or__": pl.col("a").__or__(pl.col("b")),
"__pow__": pl.col("a").__pow__(2),
"__truediv__": pl.col("a").__truediv__(pl.col("b")),
"__xor__": pl.col("a").__xor__(pl.col("b")),
"add": pl.col("a").add(pl.col("b")),
"alias": pl.col("a").alias("foo"),
"append": pl.col("a").append(pl.col("b")),
"apply": pl.col("a").apply(lambda x: x),
"cast": pl.col("a").cast(pl.Int32),
"cumulative_eval": pl.col("a").cumulative_eval(pl.col("b")),
"dot": pl.col("a").dot(pl.col("b")),
"eq": pl.col("a").eq(pl.col("b")),
"eq_missing": pl.col("a").eq_missing(pl.col("b")),
"ewm_mean": pl.col("a").ewm_mean(1),
"ewm_std": pl.col("a").ewm_std(1),
"ewm_var": pl.col("a").ewm_var(1),
"extend_constant": pl.col("a").extend_constant(1, n=1),
"fill_nan": pl.col("a").fill_nan(1),
"fill_null": pl.col("a").fill_null(1),
"filter": pl.col("a").filter(pl.col("b")),
"floordiv": pl.col("a").floordiv(pl.col("b")),
"gather": pl.col("a").gather(1),
"gather_every": pl.col("a").gather_every(1),
"ge": pl.col("a").ge(pl.col("b")),
"get": pl.col("a").get(1),
"gt": pl.col("a").gt(pl.col("b")),
"is_between": pl.col("a").is_between(1, 2),
"is_in": pl.col("a").is_in([1, 2]),
"le": pl.col("a").le(pl.col("b")),
"lt": pl.col("a").lt(pl.col("b")),
"mod": pl.col("a").mod(pl.col("b")),
"mul": pl.col("a").mul(pl.col("b")),
"ne": pl.col("a").ne(pl.col("b")),
"ne_missing": pl.col("a").ne_missing(pl.col("b")),
"over": pl.col("a").over(pl.col("b")),
"pow": pl.col("a").pow(2),
"prefix": pl.col("a").prefix("foo"),
"quantile": pl.col("a").quantile(0.5),
"repeat_by": pl.col("a").repeat_by(2),
"replace": pl.col("a").replace(2, 100),
"reshape": pl.col("a").reshape((2, 1)),
"round_sig_figs": pl.col("a").round_sig_figs(2),
"search_sorted": pl.col("a").search_sorted(0),
"shift_and_fill": pl.col("a").shift(1, fill_value=1),
"slice": pl.col("a").slice(1, 2),
"sort_by": pl.col("a").sort_by(pl.col("b")),
"sub": pl.col("a").sub(pl.col("b")),
"suffix": pl.col("a").suffix("foo"),
"take": pl.col("a").take(2),
"truediv": pl.col("a").truediv(pl.col("b")),
"xor": pl.col("a").xor(pl.col("b")),
}
# Make a list of query plan
plans = []
# Loop through all expressions
for expr in exprs:
try:
# Try to call each expression naively
plan = df.select(getattr(pl.col("a"), expr)()).explain(streaming=True)
plans.append({"expr": expr, "plan": plan, "failed": None})
except Exception as e:
# If the naive call fails, try to call the expression with bespoke_expr_dict
if expr in bespoke_expr_dict:
try:
plan = df.select(bespoke_expr_dict[expr]).explain(streaming=True)
plans.append({"expr": expr, "plan": plan, "failed": None})
except Exception as e:
plans.append({"expr": expr, "plan": None, "failed": e})
# Make a dataframe and add a streaming boolean column
plans_df = (
pl.DataFrame(plans)
.with_columns(pl.col("plan").str.starts_with("--- STREAMING").alias("streams"))
.sort("streams")
)
# Make a summary dataframe
summary_df = (
plans_df.group_by("streams")
.agg(pl.col("expr"))
.with_columns(pl.col("expr").list.join(separator=", "))
)