Functional Pipelines

Functional Pipelines
A pipeline, yesterday

Before we begin, two bits of houskeeping:

  • Firstly, I've been up against mega-deadlines for the last few weeks, so I missed a post last week. I can't guarantee that I won't miss some more until this work is done.
  • Secondly, I've realised my writing style goes off exploring some weird diversion, then returns to the actual point after the digression. I can't really apologise for it, as it's how I think and who I am. This post is particularly tangential; please stick with it, as I'm quite proud of my learning here.

A brief history lesson

Here's an incoherent and inaccurate timeline.

  • 1997: I first got paid for writing code (in Delphi)
  • 1988: I wrote my first programme (in BASIC)[1]
  • 1993: I read about the "dependency protocol" in Smalltalk
  • 2024: I realised that event handlers are just streams

What does that last one even mean?

Let's find out.

Working Effectively with Legacy Code

I've been reading "Working Effectively with Legacy Code" by Michael Feathers. It's a pretty old book, from the late 90s/early 2000s, about how to wrap tests around legacy code so you can then have confidence to refactor it to make it more understandable and then add features without breaking things.

Apart from the fact that I bought the Kindle edition, which is incredibly hard to read, one of the issues with the book is a lot of it feels quite out of date. Not the techniques, just the code samples.

It's mainly written in Java, a very verbose language. Although that's not the problem (unlike the C++ examples which are impossible to read).

The issue is it has code like this (I've translated it into ruby because I can't be arsed with Java).

def do_something_with_a_list_of_items items
  matching_items = []
  items.each do |item|
    if item.matches_some_criteria?
      matching_items << item 
    end
  end
  results = []
  matching_items do |item|
    results << item.do_something
  end
  return results
end

So this function is given a list of items. It goes through that list finding all items that match some_criteria and calls do_something on them, whilst ignoring the ones that do not match.

A few years ago I would have thought "that's a bit ugly" but I would have approached the code in the same way.

But now I would write it like this:

def do_something_with items
  items.select { |i| i.matches_some_criteria? }
       .collect { |i| i.do_something } 
end

The end result is the same, but the way the code is structured is what functional programming people would call a "pipeline".

We have a collection and we build a new collection by filtering the items from the original. Then we build a third collection by calling do_something on each item in the filtered results and collecting the results.

Is this better? Is it more readable?

A while back I would have said no as the first version is more explicit about what it's doing with the items in each collection.

Now I think it's both more readable, as it's more concise. And it's a significantly better implementation because it's using a pipeline.

But first, here's yet another digression.

Unix pipes

When you're using a unix machine, there are a whole load of very small command line tools built in. Things like cat and grep and sed and awk.

Each of these tools does very little on its own. In fact, they're practically useless.

But as soon as you pipe the inputs and outputs together, they become incredibly powerful.

Let's say you've got a Rails app on a server somewhere, and you want to see how many times the PeopleController's show action was called. You could load the log file into your favourite editor (as long as the editor can handle big files) and do a "find" and use the editor's "here's how many results I found" panel.

Or you could pipe the grep command into the wc command.
grep -o "PeopleController#show" log/production.log | wc -l

grep goes through your file looking for any text that matches PeopleController#show and outputs it to STDOUT. The pipe feeds that into the wc command, which reads from STDIN and counts the number of lines it has received. grep returns one line for every instance of your chosen phrase then wc counts them. grep is optimised for hunting through huge files and is extremely efficient at its one job. wc is optimised for counting and is extremely efficient at its one job.

Two commands piped together to get a useful result.

You can build something even more useful by piping together multiple commands.

cat log/production.log | grep -E '.*GET\s"(.*)"' | awk '{print $3}' | sort | uniq -c | sort -n | tail -r -n 5

Here, we're using

  • cat to output our log file
  • grep to only output lines that contain the text GET "some/uri"
  • awk to extract and output the actual URI from the line
  • sort to output identical URIs on consecutive lines
  • uniq -c to count how many URIs are next to each other, outputting the count and the URI as a single line
  • sort -n to output the lines with the highest count first
  • tail -r -n 5 to output the first 5 lines of the resulting data

And with that one line of shell script we have a log file analyser that shows which pages on your site have received the most visits.

By the way, when I ran this on my production server it didn't work. The BSD version of tail (as found on the Mac) includes the -r option while the GNU version does not. And, my server logs are actually in a different format to my development logs, as I use lograge to make the logfiles more concise.

On my server (which uses dokku to handle deployments) I changed the command to this:

dokku logs app -n 50000 | grep -E '.path=\/.\s' | awk 'print $11' | sed 's/path=//' | sort | uniq -c | sort -n | tail -n 5 | sort -rn

However, making this change was easy to do.

I only ever needed to concentrate on one stage of the pipeline at a time. As I was experimenting with each stage, I could rerun the previous stages again to get some input data. Or, even better, I could store the previous output in a temporary file and then cat that file into the stage I was working on. Almost like test-driven development, where you set up a known state, run your code, then check the outputs. But on the command line.

The key to this is

  • each individual tool does one thing and one thing well
  • each tool is designed to take its input from STDIN and feed it to STDOUT
  • we can then "pipe" the output from one command as the input to another command, whose output is then used the input to the next command
  • at no point are any of the inputs to the previous command altered by any of the subsequent commands

This final point is very important. The output of each stage is immutable, which is why I was safe to experiment as I was building the server version of my pipeline.

Multiple pipes in a line ... it's a pipeline.

Enumerables

I first learnt about each from Smalltalk. Like ruby, Smalltalk has blocks and Smalltalk's collections use blocks to "visit" each item in the collection in turn. In fact, it's commonly known as the "visitor pattern".

But each is only the beginning. Ruby's select (also known as filter) and collect (also known as map)[2] give these collections some real power.

When I rewrote Michael Feather's code above, we are given an input collection, we filter it (using select) then gather the results of a method call (using collect).

It's the exact same principle as the unix pipes, shown above.

Technically, in Michael Feather's code, he's doing the same thing. It's just he's explicitly creating and filling some intermediate collections - matching_items and results.

So if his version is doing exactly the same thing as my version why have I come to the conclusion that the "pipelined" version is better?

Because in the original version, the developer is responsible for creating and filling the intermediate collections. In the pipelined version, the code has that responsibility.

Functional programming

I've never really got functional programming. I've always been an OO person.

But these pipelines show why it's so powerful.

With unix, the sort command is not just a very small command, it's also incredibly optimised and well understood. It's a black box that takes input in a particular format and produces output in a different format. Given the same inputs it will always produce the same outputs. And the implementation, which is completely hidden from us, is very efficient, using multiple threads if needed and ensuring that it can process huge files without using tons of RAM.

By contrast, when we use each to iterate through every item in the collection, it's up to us to decide whether to add the item to the matching_items collection. We are taking responsibility for the results of the filtering operation. matching_items is a collection that we created and we are the ones who control what goes into it.

However, if we use select we're saying "I don't care how you do it, just give me some results". select is responsible for the results collection and can build it any way it sees fit. It has become a black box that takes a given input and produces a given output.

Just like sort, it could spawn multiple threads to do the calculations in parallel. Or even send the filtering calculation over the network so that the load is spread across multiple machines[3]. And once select has received its responses, it can choose the best way to reassemble those responses into a new collection. Maybe, instead of an Array it uses a LinkedList (because it knows it's only going to be appending items). This will be significantly faster than allocating and reallocating fixed size blocks of memory.

The point I'm trying to get to is we often believe that the code we're writing is unique and unlike anything that's ever happened before. But most things we end up doing are actually just combinations of very simple, fundamental, operations.

If we think about things in terms of those operations and pipe them together in the correct sequence, we can make use of optimisations and algorithms, designed many years ago, with millions of hours of battle-testing and optimisations against them.

Plus, as each stage never alters the output of the previous stage it makes it trivial to experiment and test without breaking anything. Just like the set up code in your test suite.

What does this have to do with event handling?

So I've been on this journey of getting my head around pipelines and functional programming. And it's been quite a revelation.

When I read about the dependency protocol in Smalltalk, I never stopped to think about it again. I always envisaged it as two objects loosely tying themselves together - the looseness of the connection being the major advantage.

Again, in ruby, you might have something like this.

@my_source_object = SomeObservable.new
@my_listener = SomeObserver.new

@my_source_object.add_observer "some_event", @my_listener

So far so good.

But @my_listener doesn't want to just observe @my_source_object.

What if there's @my_second_source_object that we need to observe as well? We just add another add_observer call, of course. However, that gets messy very quickly, as now @my_listener needs to know the lifecycles of all these source objects. It needs to hook in whenever a new source is created (so it can subscribe) and then get notified when that source destroyed (so it can unsubscribe)[4]?

However, if you think of events as streams, passing through a pipeline, this kind of processing becomes much easier.

@my_pipeline = SomePipeline.new 

@my_first_source = SomeObserver.new
@my_first_source.add_observer "some_event", @my_pipeline

@my_listener = SomeObserver.new 
@my_pipeline.add_observer "some_event", @my_listener

@my_second_source = SomeObserver.new
@my_second_source.add_observer "some_event", @my_pipeline

All we've done here is stick a "pipeline" into the middle. But it means that @my_listener will automatically receive events from both @my_first_source and @my_second_source. And this pipeline also means we start to gain those same benefits as the unix pipes.

Emoji all the animals!

Let's crawl down the pipe in a different direction.

Imagine we're writing something that observes keypresses. Maybe we're wanting to take each keypress and display it on some other device - like a big screen in another room.

So we write an observer that receives notifications from the keyboard and, in response, displays those keystrokes on the display device.

class KeyboardObserver < SomeAbstractObserver
  def initialize source:, display_device: 
    @display_device = display_device 
    source.add_observer "keypress", self
  end

  def on_keypress_event keystroke
    @display_device.show keystroke
  end
end
  
@keyboard = SomeObservableKeyboardDevice.new 
@display_device = SomeDisplayDevice.new 

@my_keyboard_observer = SomeObserver.new source: @keyboard, display_device: @display_device

But our boss comes along and says "You know what would be fantastic? If the user types 'cat' we could replace it with a 😺".

How would we approach this? We could examine each incoming keystroke as they arrive[5].

Is the keystroke a "C"?
  No - send it to the display and go back to the start
  Yes - do not send it to the display
    Is the keystroke an "A"?
      No - send "C" and then the current keystroke to the display and go back to the start
      Yes - do not send it to the display
        Is the keystroke a "T"?
          No - send "C", "A" and then the current keystroke to the display and go back to the start
          Yes - send "😺" to the display and go back to the start

We update our observer to implement this simple algorithm, using a buffer of received characters, so we can examine where we are in the little flowchart as new keystroke events arrive.

We put it into production and everyone loves it.

Now our boss comes along and says "The cat thing was such a success, we need to do the same with dogs".

In the past, before I understood this pipeline stuff, I would have added the dog handling code into the same observer as the cat code. We would effectively add another branch into that flowchart - Is the keystroke a "D"? - which takes us down a different path.

This makes the code a bit messier as not only do we need to keep track of where our current position within the "CAT" or "DOG" sequence, but we also need to keep track of which sequence we're actually in.

Of course, I (nearly) always write tests so I can be confident that the code works correctly, but it's starting to look a bit less readable now.

Then our boss says "Elephants! We need elephants".

So that adds a third branch to our algorithm. Luckily, this time it's pretty easy - another sequence that works the same way as "DOG"; it just happens to be longer. In fact, I realise that my boss is probably come back with even more animals so I put the sequences into an array, making it really simple to add new ones in.

Until the boss excitedly starts yelling "I've got it! Capybaras! Everyone loves capybaras[6]".

I think "yeah, that's easy, it's just another sequence". But then the realisation dawns.

"CAT" and "CAPYBARA" both start with "CA". Meaning that our algorithm could potentially be in two branches at once.

We either have to keep track of multiple branches, or we need to rewrite the algorithm so we can branch when we're already part way through a sequence.

What started out as a simple little observer has become a monster. And the code is getting less and less readable as we go.

But what if we used a pipeline?

We keep the very plain and simple KeyboardObserver implementation above - but we rename it to DisplayWriter. It's job is not to observe the keyboard, but to write characters to the display device.

Then we add in a new observer class, the EmojiRewriter[7]:

class EmojiRewriter < SomeAbstractObserver
  def initialize source:, sequence:, emoji:
    @sequence = sequence
    @emoji = emoji
    @buffer = ""
    source.add_observer "keypress", self
  end

  def on_keypress_event keystroke
    if keystroke == @sequence[@buffer.length]
      @buffer += keystroke
      broadcast emoji if @buffer == @sequence
    else
      @buffer.dup.each { |buffered_keystroke| broadcast buffered_keystroke } 
      broadcast keystroke
    end
  end

  private 
  
  def broadcast keystroke
    notify_observers "keypress", keystroke
    @buffer = ""
  end
end

class DisplayWriter < SomeAbstractObserver
  def initialize source:, display_device: 
    @display_device = display_device 
    source.add_observer "keypress", self
  end

  def on_keypress_event keystroke
    @display_device.show keystroke
  end
end

Then we build a pipeline (noting that each stage uses the previous stage as its source):

@keyboard = SomeObservableKeyboardDevice.new 
@display_device = SomeDisplayDevice.new 

@cat_rewriter = EmojiRewriter.new source: @keyboard, sequence: "CAT", emoji: "😺"

@dog_rewriter = EmojiRewriter.new source: @cat_rewriter, sequence: "DOG", emoji: "🐶"

@elephant_rewriter = EmojiRewriter.new source: @dog_rewriter, sequence: "ELEPHANT", emoji: "🐘"

@capybara_rewriter = EmojiRewriter.new source: @elephant_rewriter, sequence: "CAPYBARA", emoji: "🐹"

@display_writer = DisplayWriter.new source: @capybara_rewriter, display_device: @display_device

The keyboard produces "key pressed" events.

The events go through the pipeline - cats, dogs, elephants and capybaras[8] - being rewritten if necessary or being passed along unchanged if not.

The events finally reach the display_writer at the end of the pipeline.

The display writer outputs whichever keystroke it has received - it may be an actual letter or it may be an emoji. The display writer does not care what it receives because all the processing was done earlier. It can concentrate on its one job.

Each stage in the pipeline is extremely simple and optimised for its own job.

We can easily reuse the EmojiRewriter code to deal with lots of different sequences and emojis without adding any complexity at all. When the boss comes along wanting us to add porpoises 🐬 and bees 🐝, it's nothing to worry about. One line of code, one additional stage in the pipeline, per animal.

And, when the pipeline code is laid out as above, it's very easy to understand what's going on.

All this simplicity and reliability comes from looking at things in a different way. We break down what we're trying to do into their fundamental operations. Then we sequence them together into a pipeline, where each operation acts as a self-contained unit.

Just like unix pipes.


  1. 10 print "Baz is cool"
    20 goto 10

    Probably ↩︎

  2. I'm sure Python people are screaming with rage at the fact that these methods have multiple names. But I love it, because it means I can pick the one that makes the most sense for the given situation.

    In fact, the other day, I built a pipeline that used both collect and map in the same sequence, because they were the correct words to use in each individual context. The programming language is there to be used by humans, we're not there to be defined by the programming language.

    The actual code was this:
    files.collect(&:filename).map(&:to_s)

    • collect all the filename objects and convert (map) them into strings.
    ↩︎
  3. In fact, this is how Hadoop and "big data" databases do their work ↩︎

  4. This is a major source of memory leaks. Observers maintain a reference back to an Observable, so some garbage collectors will think the Observable is still in use, even though it's actually safe to dispose of (along with the attached Observers) ↩︎

  5. As I've mentioned before, writing stuff out in english before I think about the code really helps me get my head around what I'm trying to do ↩︎

  6. Is there even an emoji for capybaras? If not, there should be ↩︎

  7. Note - this is a shitty, untested, implementation. If I were doing it properly, I would use a Finite State Machine, which are amazing. In fact, this version is also a Finite State Machine, just a crappily written one. ↩︎

  8. So it looks like there isn't a capybara emoji. I've used a hamster instead. ↩︎

Rahoul Baruah

Rahoul Baruah

Rubyist since 1.8.6. Freelancer since 2007, dedicated to building incredible, low-cost, bespoke software for tiny businesses. Also CTO at Collabor8Online.
Leeds, England