Header image: Photo by Pawel Czerwinski on Unsplash

I’ve released a new package with an implementation of a pipe feature for python - function-pipes. It can be used as a package, or as it’s only dependent on the standard library can just be copied as a file and dropped into a project.

There’s a few different packages that already do this, but mine works better with type checkers and should generally have less overhead than other methods.

I have overthought what is quite a simple function, so I am writing up my overthinking. Read on for adventures in type checking and marginal Python efficiency gains.

Why do we need a pipe?

R has a very nice syntax for passing the result of a function as the argument into another function.

This means instead of:

summary(head(iris))

You can do:

iris |> head() |> summary()

In the magrittr version of the pipe (%>%) the value being piped through can be moved to different positions using . as a stand-in for the value, while the base R version cannot move the value around.

This idea of a pipe is quite useful, and there have been a few different attempts to write packages that port it to Python.

I wasn’t quite happy with any of these (as apparently neither were the previous people who all wrote their own), and so tried to piece together an approach that I liked.

The rough criteria were:

  • It should look more like Python than another language.
  • It should not require learning a lot of new things to understand and use.
  • It should work well with type-checking.
  • It should have as few overhead costs to use as possible.

The API of the pipe

Looking through existing packages, there were four general approaches.

Add new syntax

One library (robinhilliard/pipes) handles the idea of a pipe by using a rarely used operator as a stand-in, and then using a decorator to rewrite the AST tree of the function so that it looks like Python expects.


@rewrite_pipes
def wrapper(value):
  return value >> func

result = wrapper(value)

This is one of the options that produces the most efficient final code, because as far as python is concerned, it is just syntactic sugar for the hard to read but basic approach.

The drawback is that it has to run inside a decorated function (and so can’t be used everywhere), and that linters and type-checkers don’t understand it.

Comparator Class approach

This is the approach in the pipe21 library, and it looks quite nice! It also has the advantage where the function itself is about 10 lines of code, and can easily be dropped in a module.

result = value | Pipe(func)

This approach works using the \_\_ror\_\_ comparison method. While comparisons are normally handled by passing the right-hand item into the comparison of the left-hand item, if there is no good comparison there, it looks for the reverse function on the right-hand item. In this case, the usually unused reverse or. This means that the smart functions in the Pipe class can be introduced after the value, and at any point, the chain can stop and returns the correct value.

The Pipe21 library has the advantage of being able to describe in a very tiny way. It’s less than 10 lines that can be copied into a project.

There are problems with this approach. The | (especially in typed applications) is becoming more popular for its intended ‘or’ use, which does not have a direction in the way the pipe does. It’s use in this way is not consistent with other ways of using the operator. This approach can be type-hinted, but it runs into a specific problem with lambdas I’ll explain further down.

The similar approach of using an Infix operator to create the function can be a bit more explicit about the direction by making the operation itself a class that sits between two values, but is then introducing quite a lot of abstraction and repetitive syntax.

result = value | pipe_to | func

Container Class approach

This uses method chaining to add new functions in a way that isn’t dissimilar to how pandas addresses it. A container object takes in the value, and then successive functions are applied to it using a pipe method call.

result = Pipe(value).pipe(func).end()

The big problem with this approach is there is no way of the Pipe knowing when the last entry is, and so the value has to be accessed explicitly with an end function. This version can work OK with type checkers, including lambdas.

(note: I’ve lost the link to the blog post that discussed this version, will re-add if I find later.)

A function approach

This approach is very simple. A pipe function takes the value and a list of functions. The function then returns the final results. The very simple version of this (four lines) can be found in the functoolz library.

result = pipe(value, func)

This doesn’t rely on non-python syntax, and is easy to follow. Although the way you have to do it is tricky, it can be written in a way that type-checkers can understand. I’m building on this approach.

How to handle extra arguments

A problem with pipes is what to do when a function needs extra arguments, or the current value not in the first position.

I had a working approach that looked like this, and built on the functools partial to have a placeholder value that could be filled in with the current value (as with the magrittr library). My approach was similar to the way the pipetools library uses it’s X-object - having a stand-in object that can be replaced later.

pipe(value, pipe.func(function_that_expects_the_value_as_keyword, foo=pipe.value))

This solved the problem - but is requiring people to understand a few new things, and it cannot be made to play well with type checkers. I eventually concluded that Python has an ok tool for exactly this situation - lambdas - which also have the advantage of being understandable by type checkers already.

pipe(value, lambda x: function_that_expects_the_value_as_keyword(foo=x). 

So at the user facing end there is no clever stuff to learn. Just a function that takes a value and a list of functions. If you need to add arguments, use a lambda.

The clever bits are then all behind the scenes, with the only bit of non-standard python being the idea of the pipe function.

Type checking

I quite like type checking, and even if it’s not good for all projects, I think that a basic function like a pipe should play nicely.

Most of the approaches above can be type-hinted, but the Comparator class one only in a way that is incompatible with lambda - which shows an interesting limit of how the type checkers work.

Function class

This isn’t a class, and so doesn’t inherit from Generic to define the arguments. This approach isn’t elegant, but works.

A basic version that knows it wants a value and then callables can be written like this:


def pipe(value: Any, *funcs: Callable[[Any], Any]) -> Any:
  ...

This also gets across the important point the functions expect only one real parameter, but it does not get the chain aspect - that the input of the first function is the same as the initial value, and the input of the second function is the output of the first. For this we can’t use the *args approach, and need to talk about each function individually. Here is the simplist example:


InputType = TypeVar("InputType")
Output1 = TypeVar("Output1")
Output2 = TypeVar("Output2")

def pipe(value: InputType, op1: [[InputType], Output1], op2: [[Output1], Output2]) -> Output2
  ...

How do you scale this? You just keep adding overload options:


InputType = TypeVar("InputType")
Output1 = TypeVar("Output1")
Output2 = TypeVar("Output2")
Output3 = TypeVar("Output3")

@overload
def pipe(value: InputType, op1: [[InputType], Output1], op2: [[Output1], Output2]) -> Output2
  ...
  
@overload
def pipe(value: InputType, op1: [[InputType], Output1], op2: [[Output1], Output2], op2: [[Output2], Output3]) -> Output3
  ...
  
def pipe(value: Any, *funcs: Any): # type: ignore
  

This gets annoying to do manually, so I’ve made a jinja template that automatically creates it.

This approach does mean there is a hard limit to the number of functions, but in practice the circumstances where I’ve wanted to use a pipe rarely goes past 5 or 6. I’ve set the limit in the library to 20. In principle, you could have a final fallback option for an infinite pipe, but I’ve chosen not to do this.

Container class

This approach works great for typing, the trick is that the pipe method returns (or pretends to return) a new instance of the Pipe object. Then the instance only has to know about the class of the input method, and will take the input method from the output of the function if one is available.

from typing import TypeVar, Generic, Callable
from __future__ import annotations

InputValue = TypeVar("InputValue")
OutputValue = TypeVar("OutputValue")


class Pipe(Generic[InputValue]):
    def __init__(self, value: InputValue):
        self.value = value

    def pipe(self, func: Callable[[InputValue], OutputValue]) -> Pipe[OutputValue]:
        return Pipe(func(self.value))

    def end(self) -> InputValue:
        return self.value

Comparator class

This approach can even be typed in a way that will work for pre-defined functions and classes.

I = TypeVar("I")
T = TypeVar("T")
P = ParamSpec("P")

class Pipe(Generic[I, P, T]):

    @overload
    def __init__(self, f: Callable[[I], T], *args: Any, **kwargs: Any):
        ...

    @overload
    def __init__(self, f: Callable[Concatenate[I, P], T], *args: P.args, **kwargs: P.kwargs):
        ...
    
    def __init__(self, f: Any, *args: Any, **kwargs: Any):
        self.f = f
        self.args = args
        self.kwargs = kwargs

    def __ror__(self, other: I) -> T:
        return self.f(other, *self.args, **self.kwargs)

The different overload options are handling a situation where additional arguments for the the function can be passed to the Pipe. So this would be typed correctly, if the foo argument had an extra_param:

result = "" | Pipe(foo, extra_param=True)

Static type checkers can handle the following, because the str class can say what the output is in advance.

result = 5 | Pipe(str) # result is string

However, this doesn’t work for lambdas, because all the parameters of a lambda are unknown. This was OK for the previous example because the Pipe object had encountered the input type first (when the Pipe was created), could feed this into the lambda and it could infer the output value. In this case the object encounters the lambda when the instance of Pipe is created, before it later encounters the 5 as part of the comparison. This means that the lambda ‘x’ is unknown, and so the value returned by the lambda is also unknown.

You can fix this by adding some typing around the lambda, but this is again having to learn something new and adding extra layers to simple calls.

SingleInput = TypeVar("SingleInput")
LambdaOutput = TypeVar("LambdaOutput")


class TypedPipe(Generic[SingleInput, LambdaOutput]):
    def __init__(self, f: Callable[[SingleInput], LambdaOutput]):
        self.f = f

    def __ror__(self, other: SingleInput) -> LambdaOutput:
        return self.f(other)


t = range(5) | Pipe(str) | Pipe(",".join) | TypedPipe[str, str](lambda x: x + "hello")
# t is str

While the function approach has the ugliest typing approach, it still works, and means a relatively simple syntax can be used. Importantly, this syntax can be optimized far more other approaches.

Optimizing the pipe

Having got the basic approach, I had a think about if there was anyway of reducing the overhead of using a pipe.

My final approach generates large amounts of boilerplate code to avoid loops and value assignments and includes a decorator that rewrites the AST tree to speed up the process.

The most basic version of a pipe function looks like this:


def pipe(value, *funcs):
   for f in funcs:
      value = f(value)
   return value

This is very compact, but has a speed penalty over what the hard-to-parse but original code would be.

For instance, rewriting d(c(b(a(value)))) as pipe(value,a,b,c,d) requires going through a loop, which unpacks each function to f, and keeps updating value. The original just keeps passing the value up without assigning it to any intermediate variables. As calling the function at all has an overhead - this is would be good to reduce.

One way of addressing this would be:

def pipe(value, op0, op1, op2, op3):
  return op3(op2(op1(op0(value)))

This only has the overhead of the function call, but has much less value assignments.

I tried a few different approaches to making this approach for variable length pipes, and the fastest turned out to be:

def pipe(value, op0, op1=None, op2=None, op3=None):
  if not op1:
    return op0(value)
  if not op2:
    return op1(op9(value))
  if not op3:
    return op2(op1(op0(value)))

Here unpacking the values in the function signature turned out to be faster than unpacking a tuple later on, and a basic truthy comparison was better than an explicit comparison to None. The new switch case syntax does let you decide cases based on the length of a function, but this was slower than this approach.

Like the type hinting, this is inelegant, but can be easily generated through a jinja template. There is some unnecessary overhead where you are assigning values you never use to the later functions, but setting this at 20 was still quicker than using the basic looped version of the pipe. There’s a test that checks this approach is faster than the basic method.

You could make this even quicker by providing pipe2, pipe3, pipe4 methods that are fixed length, but that seemed to be adding extra things to learn. Instead, where performance is absolutely required, we can just abstract all the entire process and do the same thing by a more complicated route.

AST rewrite

Way above, I liked to the robinhilliard/pipes library, which rewrites the AST of functions to make the >> operator work like a pipe.

This version does not work for type checking because it is introducing unexpected syntax into python. However, if the rewrite is taking something that type-checking does understand, but rewriting it to be faster behind the scenes, this approach play well.

Taking the basic code from that library, I created a new set of rewrite rules that reorder how functions are called:


@fast_pipes
def func():
  return pipe(value, a,b,c,d,e)

# is equiv to

def func():
  return e(d(c(b(a(value)))))

I then took this a bit further. As using a lambda function introduces an extra function call into the pipe, why not expand those out at the same time? I added some new rewrite rules that expressed the effect of a lambda in the pipe in a more basic way and avoided some extra function calls.

@fast_pipes
def func():
  return pipe(value, lambda x: foo(value_slot=x))  

# is equiv to...

def func():
   return foo(value_slot=value)

Where the value is used multiple times in the lambda, it introduces a hippo operator to cache the value of the previous step:

@fast_pipes
def func():
  return pipe(value, bar, lambda x: foo(value_slot=x) + x)  

# is equiv to...

def func():
   return foo(value_slot=(v := bar(value))) + v

This is horrible to read if you actually wrote it that way, but is a useful efficiency gain in the rewrite.

Testing

This function is now solving the same problem three different ways. It has to work through type hinting logic, it has to work through normal python logic, and the rewritten AST logic has to work. Typehinting testing is done through pytest-pyright. The package contains tests that rewritten functions are functionally equivalent and that they run faster than the un-rewritten functions.

Learning is fun

One thing I learned looking at all the different approaches is that no one likes using anyone else’s pipe library and everyone has their own approach. This is mine, I learned a lot along the way, if other people find it useful, that’s nice.