Home Which operations work in streaming mode in Polars?
Post
Cancel

Which operations work in streaming mode in Polars?

In a recent post I set out the key ideas behind streaming mode in Polars. However, streaming is still a developing part of Polars and not all operations support streaming.

Be aware: the Polars developers are working on a new streaming engine that will be more flexible and allow more queries to be executed in streaming mode. I will update this post when the new streaming engine is released.

Want to get going with Polars? Check out my Polars course here

My approach is to define a simple DataFrame with 2 floating point columns which I then convert to a LazyFrame (see code at the bottom of the post). I then try to call every expression on the LazyFrame. I print the streaming query plan and check whether the expression works in streaming mode or not. I’m working with Polars version 0.20.5 here.

This isn’t a comprehensive test of what works and doesn’t but should help give you an intuition for what works and what doesn’t. The expressions that currently work in streaming mode with these floating point columns include:

'__abs__, __add__, __and__, __eq__, __floordiv__, __ge__, __getstate__, __gt__, __init__, __init_subclass__, __invert__, __le__, __lt__, __mod__, __mul__, __ne__, __neg__, __or__, __pos__, __pow__, __sizeof__, __truediv__, __xor__, abs, add, alias, and_, apply, arccos, arccosh, arcsin, arcsinh, arctan, arctanh, cast, cbrt, ceil, clip, cos, cosh, cot, degrees, eq, eq_missing, exp, fill_nan, fill_null, floor, floordiv, gather_every, ge, gt, hash, inspect, is_between, is_finite, is_in, is_infinite, is_nan, is_not, is_not_nan, is_not_null, is_null, keep_name, le, log, log10, log1p, lower_bound, lt, mod, mul, ne, ne_missing, not_, or_, pow, prefix, radians, rechunk, reinterpret, replace, round, round_sig_figs, set_sorted, shrink_dtype, sign, sin, sinh, sqrt, sub, suffix, tan, tanh, to_physical, truediv, upper_bound, xor'

Note that map_elements isn’t yet includes in my list but should also work in streaming mode.

In contrast I find that the following expressions don’t work in streaming mode with the floating point columns: '__array_ufunc__, agg_groups, all, any, append, approx_n_unique, arg_max, arg_min, arg_sort, arg_true, arg_unique, backward_fill, bottom_k, count, cum_count, cum_max, cum_min, cum_prod, cum_sum, cumcount, cummax, cummin, cumprod, cumsum, cumulative_eval, diff, dot, drop_nans, drop_nulls, entropy, ewm_mean, ewm_std, ewm_var, explode, extend_constant, filter, first, flatten, forward_fill, gather, get, head, hist, implode, interpolate, is_duplicated, is_first, is_first_distinct, is_last, is_last_distinct, is_unique, kurtosis, last, len, limit, max, mean, median, min, mode, n_unique, nan_max, nan_min, null_count, over, pct_change, peak_max, peak_min, product, quantile, rank, repeat_by, reshape, reverse, rle, rle_id, sample, search_sorted, shift, shift_and_fill, shuffle, skew, slice, sort, sort_by, std, sum, tail, take, top_k, unique, unique_counts, value_counts, var'

So what do we learn?

We can use this to understand a bit more about why an expression might appear in the first streaming list or the second non-streaming list:

An expression is much easier to do in streaming batches if the expression doesn’t require information from other rows to calculate. For example, add is easy to do batchwise because we can just add the values in each row together.

In contrast, cumsum is harder to do batchwise because we need to know the value of the previous row to calculate the value of the current row. At some point the previous row would be in a different batch from the first row.

So we see that the first list has more elementwise operations and the second list has more operations that require information from other rows.

Of course, this challenge of using information from other rows doesn’t mean that these expressions won’t make their way into the streaming engine in the future. It just means that the expressions are not supported right now.

If you are interested in the methodology see my python script at the bottom of the post.

Next steps

I’ll update this post as I cover more use cases such as what happens with different dtypes such as strings and categoricals and what happens with methods like filter,group_by and join (all of which have at least partial support for streaming). If you have any questions or comments then get in touch on social media (links below).

If you would like more detailed support on working with Polars then I provide consulting on optimising your data processing pipelines with Polars. You can also check out my online course to get you up-and-running with Polars by clicking on the bear below

Want to know more about Polars for high performance data science? Then you can:

Here’s my methodology for testing which expressions work in streaming mode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import polars as pl
import numpy as np

df = pl.DataFrame({"a": [0.0, 1.0], "b": [2.0, 3.0]}).lazy()
# Make a list of expressions
exprs = dir(pl.Expr)
# Define bespoke operations for expressions that
# can't just be called as pl.col("a").expr()
bespoke_expr_dict = {
    "__add__": pl.col("a").__add__(pl.col("b")),
    "__and__": pl.col("a").__and__(pl.col("b")),
    "__array_ufunc__": np.cos(pl.col("a")),
    "__eq__": pl.col("a").__eq__(pl.col("b")),
    "__floordiv__": pl.col("a").__floordiv__(pl.col("b")),
    "__ge__": pl.col("a").__ge__(pl.col("b")),
    "__gt__": pl.col("a").__gt__(pl.col("b")),
    "__le__": pl.col("a").__le__(pl.col("b")),
    "__lt__": pl.col("a").__lt__(pl.col("b")),
    "__mod__": pl.col("a").__mod__(pl.col("b")),
    "__mul__": pl.col("a").__mul__(pl.col("b")),
    "__ne__": pl.col("a").__ne__(pl.col("b")),
    "__or__": pl.col("a").__or__(pl.col("b")),
    "__pow__": pl.col("a").__pow__(2),
    "__truediv__": pl.col("a").__truediv__(pl.col("b")),
    "__xor__": pl.col("a").__xor__(pl.col("b")),
    "add": pl.col("a").add(pl.col("b")),
    "alias": pl.col("a").alias("foo"),
    "append": pl.col("a").append(pl.col("b")),
    "apply": pl.col("a").apply(lambda x: x),
    "cast": pl.col("a").cast(pl.Int32),
    "cumulative_eval": pl.col("a").cumulative_eval(pl.col("b")),
    "dot": pl.col("a").dot(pl.col("b")),
    "eq": pl.col("a").eq(pl.col("b")),
    "eq_missing": pl.col("a").eq_missing(pl.col("b")),
    "ewm_mean": pl.col("a").ewm_mean(1),
    "ewm_std": pl.col("a").ewm_std(1),
    "ewm_var": pl.col("a").ewm_var(1),
    "extend_constant": pl.col("a").extend_constant(1, n=1),
    "fill_nan": pl.col("a").fill_nan(1),
    "fill_null": pl.col("a").fill_null(1),
    "filter": pl.col("a").filter(pl.col("b")),
    "floordiv": pl.col("a").floordiv(pl.col("b")),
    "gather": pl.col("a").gather(1),
    "gather_every": pl.col("a").gather_every(1),
    "ge": pl.col("a").ge(pl.col("b")),
    "get": pl.col("a").get(1),
    "gt": pl.col("a").gt(pl.col("b")),
    "is_between": pl.col("a").is_between(1, 2),
    "is_in": pl.col("a").is_in([1, 2]),
    "le": pl.col("a").le(pl.col("b")),
    "lt": pl.col("a").lt(pl.col("b")),
    "mod": pl.col("a").mod(pl.col("b")),
    "mul": pl.col("a").mul(pl.col("b")),
    "ne": pl.col("a").ne(pl.col("b")),
    "ne_missing": pl.col("a").ne_missing(pl.col("b")),
    "over": pl.col("a").over(pl.col("b")),
    "pow": pl.col("a").pow(2),
    "prefix": pl.col("a").prefix("foo"),
    "quantile": pl.col("a").quantile(0.5),
    "repeat_by": pl.col("a").repeat_by(2),
    "replace": pl.col("a").replace(2, 100),
    "reshape": pl.col("a").reshape((2, 1)),
    "round_sig_figs": pl.col("a").round_sig_figs(2),
    "search_sorted": pl.col("a").search_sorted(0),
    "shift_and_fill": pl.col("a").shift(1, fill_value=1),
    "slice": pl.col("a").slice(1, 2),
    "sort_by": pl.col("a").sort_by(pl.col("b")),
    "sub": pl.col("a").sub(pl.col("b")),
    "suffix": pl.col("a").suffix("foo"),
    "take": pl.col("a").take(2),
    "truediv": pl.col("a").truediv(pl.col("b")),
    "xor": pl.col("a").xor(pl.col("b")),
}
# Make a list of query plan
plans = []
# Loop through all expressions
for expr in exprs:
    try:
        # Try to call each expression naively
        plan = df.select(getattr(pl.col("a"), expr)()).explain(streaming=True)
        plans.append({"expr": expr, "plan": plan, "failed": None})
    except Exception as e:
        # If the naive call fails, try to call the expression with bespoke_expr_dict
        if expr in bespoke_expr_dict:
            try:
                plan = df.select(bespoke_expr_dict[expr]).explain(streaming=True)
                plans.append({"expr": expr, "plan": plan, "failed": None})
            except Exception as e:
                plans.append({"expr": expr, "plan": None, "failed": e})
# Make a dataframe and add a streaming boolean column
plans_df = (
    pl.DataFrame(plans)
    .with_columns(pl.col("plan").str.starts_with("--- STREAMING").alias("streams"))
    .sort("streams")
)
# Make a summary dataframe
summary_df = (
    plans_df.group_by("streams")
    .agg(pl.col("expr"))
    .with_columns(pl.col("expr").list.join(separator=", "))
)

This post is licensed under CC BY 4.0 by the author.