Intensive
Systems
Consulting, Inc

Stream Processing in Clojure

Stream Processing in Clojure

Resources

The git repo for Conduit is here.
The Clojars package is here.
The example code is here.
An example of how Conduit can be used is here.

Part 2 of this tutorial is here.

Starting out

Most people that have used Unix/Linux from the command line know how to chain commands together using the pipeline character.

grep sometext somefile.txt | (some fancy sed command) | less

That command would use grep to find lines in a file containing a specific piece of text, those lines would be processed by the sed command and then displayed by less. The output of one command is fed as the input to the next in the sequence. As simple as the pipe operation is, it's used to do some very complex things. The 'Unix way' is to have specialized exectuables that do one thing well and that have a common interface. Then to compose them using the pipe operator.

And yet, this is just scratching the surface. The book 'Enterprise Integration Patterns' is a hefty tome that explores this topic in great detail. And the Apache Camel project allows you to implement those patterns in Java. Since we're working in a functional language like Clojure, we can do much better.

Streams and procs

Consider what we could do if instead of chaining programs on the command line, we could chain individual functions together to build applications. Just like executables have a standard interface (stdin, stout and stderr), if functions are written to have a standard interface, they can be composed. This standard interface is represented as a stream.

You can think of a stream as a sequence of data values. In the real world, a stream can be a string of characters, a list of numbers, a sequence of thread messages received by a thread, a series of mouse events and key presses from a GUI interface or a list of records from a database query. Or a sequence of AMQP messages in a messaging system. In fact a single application might use streams of several of these types depending on the level of the app that you're looking at. In the EIP book, streams are referred to as 'pipes'.

A complementary idea is that of a stream processor, or a proc. A proc is nothing more than a piece of code that produces a data value at its output everytime it is given a data value at its input. In EIP, procs are called 'filters'.

So conceptually, procs are linked together using streams to carry data values from one proc to the next. Each proc is written so that it accepts a value from its input stream and optionally produces a value for its output stream. When writing an application in this paradigm, you focus on what you want each stage to do and leave the grunt work of connecting the processing stages to the streams. By using the correct type of stream, you can connect stages that are in the same thread, multiple threads or processes, or even on other machines. You can also bring data into your app from sources that look like streams and provide output data as a sequence of values on a stream.

As a side note, the stream paradigm is actually one form of programming with arrows, as detailed in this paper by John Hughes. Arrows are in turn a generalization of monads. And that's the extent of the theory behind streams that this tutorial will cover.

Operators

The whole point of stream programming is to focus on the parts of your app that actually DO something and have the drudgery of moving data from place to place taken care of for you. It shouldn't matter whether procs are on a different thread or a different machine. Composing them should not just be easy, but invisible. The Conduit library provides this capability to Clojure programmers and the remainder of this tutorial will use it to explain the concepts. We'll start by taking a look at the basic operators in Conduit and how they're used before looking at a couple of useful stream types.

a-arr

Streams aren't seqs and stream procs aren't just Clojure functions. A way is needed to convert a Clojure function to a stream proc and 'a-arr' is it. It takes a Clojure function as a parameter and returns a stream proc. The Clojure function must accept one argument and return an argument. The resulting stream proc will call the given Clojure function once for every value that is in its input stream and place the resulting value onto the output stream.

(with-arrow conduit
   (def conduit-inc (a-arr inc)))

(is (= [1 2 3 4 5]
       (conduit-map conduit-inc
                    (range 5))))

Since procs aren't normal Clojure functions, conduit-map is used to map a proc over a sequence of values as if it were an input stream. The out of conduit-map is a sequence of values.

The 'with-arrow conduit' macro declares that the 'conduit' arrow is to be used to provide the definition of the 'a-arr' function. This is very similar to the way the 'with-monad' macro works. This command assigns to 'conduit-inc' a stream proc that will accept a stream of integers and increment each one, producing a stream of integers. (Can you see where this is going?) Future references to 'conduit-inc' do not have to be wrapped in a 'with-arrow' macro.

A really common task is to define an anonymous function, convert it to a proc and assign it to a name. To remove the boilerplate, Conduit provides the 'def-arr' macro.

(with-arrow conduit
   (def-arr conduit-double [x]
      (* 2 x)))

a-comp

The first, most obvious, operator is the sequencing one. It takes any number of procs and creates a new proc that feeds the data through the various procs in order from first to last. It is not required that each proc accept the same type of inputs as any other proc, nor that they output the same kind of types. Only that each proc accepts the type(s) produced by the previous proc in the chain and produces type(s) that are accepted by the next proc. The name is meant to resemble Clojure's 'comp' function which has a similar functionality.

(with-arrow conduit
   (def inc-ints (a-comp conduit-inc conduit-double)))

(is (= [2 4 6 8 10]
       (conduit-map inc-ints
                    (range 5))))

a-all

The next most obvious way to compose stream procs is to execute them in parallel. There are a couple of ways to do this and the first we'll look at is the case where a single input value is given to a number of procs and produces an output value that is a collection of the output values of each proc.

(with-arrow conduit
   (def conduit-dec (a-arr dec))

   (def inc-n-dec (a-all conduit-inc
                         conduit-dec)))

(is (= [[1 -1] [2 0] [3 1] [4 2] [5 3]]
       (conduit-map inc-n-dec
                    (range 5))))

a-par

The a-all operator highlighted the fact that a stream can be a sequence of collections of values. For such a stream, it is possible to construct a stream proc that operates on each value of each collection in parallel and produces a stream of collections as output.

(with-arrow conduit
   (def par-inc-n-dec (a-par conduit-inc
                             conduit-dec)))

(is (= [[2 9] [4 14] [6 17] [9 11]]
       (conduit-map par-inc-n-dec
                    [[1 10] [3 15] [5 18] [8 12]])))

An interesting thing to notice is that a-par can be used to construct a-all.

(with-arrow conduit
   (def new-inc-n-dec (a-comp (a-arr (partial repeat 2))
                              par-inc-n-dec)))

(is (= [[1 -1] [2 0] [3 1] [4 2] [5 3]]
       (conduit-map new-inc-n-dec
                   (range 5))))

a-nth

A logical question to ask when dealing with streams of collections is, 'How can I operate on only one item in a collection and leave the others unchanged.' This could be done with the operators we've seen so far using Clojure's 'identity' function. But that is really clumsy and there's a better way, the 'a-nth' operator. This operator takes an integer and a stream proc as arguments and returns a stream proc that accepts a stream of collections, passes the nth value of each collection through the given stream proc and produces a stream of new collections as its output.

(with-arrow conduit
   (def inc-first (a-nth 0 conduit-inc)))

(is (= [[13 9] [9 :a] [100 8] [22 :b]]
       (conduit-map inc-first
                    [[12 9] [8 :a] [99 8] [21 :b]])))

a-select

An essential part of programming is being able to choose an execution path using various criteria. A similar concept applies in stream programming, choosing one of several stream procs to use based on the input value. In Conduit, this is expressed using the 'a-select' operator.

(with-arrow conduit
   (def inc-odd-dec-even (a-select
                            :odd conduit-inc
                            :even conduit-dec)))

(is (= [2 1 6 8]
    (conduit-map inc-odd-dec-even [[:odd 1] [:even 2] [:odd 5] [:odd 7]])))

The parameters to a-select are very similar to a call to 'cond' in that they must be pairs. The first value of each pair is the selection value. The second value is a stream proc. The 'a-select' operator constructs a stream proc that accepts a stream of value pairs. If the first element of the pair matches any of the selection values, the second element of the input pair is passed to the corresponding stream proc.

A natural question is, what happens when no selection values match the input. If one of the selection values is '_, then the corresponding stream proc will be used. If there is no '_ as a selection value and the input doesn't match any other selection value, the input is discarded.

(with-arrow conduit
   (def pass-it-on (a-select
                      '_ (a-arr identity)
                      :odd conduit-inc
                      :even conduit-dec)))

(is (= [2 1 5 7]
    (conduit-map pass-it-on [[:odd 1] [:even 2] [:bogus 5] [:oops 7]])))

(with-arrow conduit
   (def discarding (a-select
                      :odd conduit-inc
                      :even conduit-dec)))

(is (= [2 1]
    (conduit-map discarding [[:odd 1] [:even 2] [:bogus 5] [:oops 7]])))

a-loop

With the operators so far, we can compose stream procs sequentially or in parallel and we can choose which proc to use based on the incoming data value. These operators all produce stream procs that are stateless, that is each input value is handled independently of the input values that have preceeded it. While it is possible to build custom, stateful, stream procs, that breaks the paradigm. So a different kind of operator is needed. This operator is 'a-loop'.

(with-arrow conduit
    (def accumulate (a-loop
                      (a-arr (partial apply +))
                      0))

    (is (= [0 1 3 6 10 15 21]
           (conduit-map accumulate (range 7)))))

The a-loop operator accepts a proc and an initial value. This operator is a little harder to understand. The idea is that each new value on the input stream is to be processed in conjuction with a state value that is derived from all previous input values. It's entirely possible to write a proc that maintains this state value internally, but that breaks the pure funtion paradigm. So, there's a need for an operator to accomplish this with a 'standard' proc whose output depends only on its input. This proc needs to accept a value that is a collection containing two values, a state value and a new input value, and produce an updated state value and an output value. The updated state value and the output value may be the same value. The updated state value needs to be 'looped back' to be paired with the next input value as input to the proc while also being placed on the output stream. The a-loop operator does this by building a proc that pairs a state value with an input value, passing that pair to the proc given to a-loop, then capturing the output value which is put on the output stream and also paired with the next input value. The initial state value is the other parameter to the a-loop operator and is paired with the first input value. So the output stream of the proc is essentially duplicated with one branch being used as the output and the other being looped back around as input.

Therefore, the proc passed to a-loop needs to accept a pair composed of the state value and an input value. The output of the constructed proc is thus the output of the internal proc.

The question that quickly arises is what to do in the case where the updated state value emitted by the proc is not in the form needed as an input state value. There needs to be some way of transforming the looped back stream while leaving the output stream untouched. The a-loop takes an optional third parameter, which is a proc that is applied to the loopback stream to generate the stream of state values to be paired with the input values.

First some simple procs to set things up starting with a proc that accepts a pair and generates a new pair from the first value of the pair and the pair itself.

(def extract-state-value (a-all
                             (a-arr first)
                             (a-arr identity)))

Then a proc to set the first value of a pair to 0 while incrementing the second value.

(def inc-reset-state (a-par
                       (a-arr (constantly 0))
                       conduit-inc))

Next a proc that passes a value to inc-reset-state if the selection value is 3.

(def inc-on-3 (a-select
                3 inc-reset-state
                '_ (a-arr identity)))

(def inc-on-3-with-state (a-comp
                           extract-state-value
                           inc-on-3))

Now a proc that strips out the first value of a pair and increments it.

(def update-state (a-comp (a-arr first)
                         conduit-inc))

And finally the proc that puts it all together using a-loop. Starting with an initial state of 1, it takes the input values and pairs them, extracts a copy of the state value to be used in the selection, increments the input and resets the state every third time producing a state/value pair. This pair is then split with the updated state being looped back for the next input and the new output value being put on the output stream

(def inc-every-third (a-comp
                       (a-loop
                         inc-on-3-with-state
                         1
                         update-state)
                       (a-arr second)))

The a-loop is the most complicated operator and the most cumbersome to work with. However, with careful decomposition and practice, coupled with sufficient unit tests, it can be used effectively.

Conduit and EIP

As stated above, there is a close similarity between the operators in Conduit and the patterns enumerated in 'Enterprise Integration Patterns'. The Conduit operators are the primitives from which many EIP patters can be easily built. Pipelines can be built up using the a-comp operator. Parallel pipelines that are synchronized can be built with a-par. The scatter/gather pattern is built using a-all. Routers can be built from a-select. And accumulators can be built using a-loop.

Wrapping up

This tutorial has introduced the core of the Conduit library and the concepts behind them. By thinking in this paradigm, much of the drudgery of programming distributed apps can be eliminated.

Copyright 2010 by Jim Duey