What is this?

liquidata is a Python Embedded Domain Specific Language (EDSL) which aims to encourage and facilitate

  • increasing the signal-to-noise ratio in source code

  • avoiding using strings to represent symbols in the API

  • code reuse through composition of reusable and orthogonal components

  • dataflow programming

  • function composition

  • lazy processing.

Why would I want this?

Dataflow networks

It can be helpful to think of your computations as flows through a network or graph of components. For example

candidates
    |
quick_screen
    |
expensive_screen -------.
    |                    \
can dance ?           can sing ?
    |                     |
hop test              pitch test
    |                     |
skip test             rhythm test
    |                     |
jump test                 |
    |                     |
sum scores            sum scores
    |                     |
score above 210 ?     score above 140 ?
    |                     |
output dancers        output singers

The aim of liquidata is to allow you to express the idea laid out in the graph above, in code that reflects the structure of the graph. A liquidata implementation of the graph might look something like this:

select_candidates = pipe(
    { quick_screening },
    { expensive_screening },
    [ { can_sing },
      test_pitch,
      test_rhythm,
      sum_scores.pitch.rhythm,
      { score_above(140) },
      out.singers
    ],
    { can_dance },
    test_hop,
    test_skip,
    test_jump,
    sum_scores.hop.skip.jump,
    { score_above(210) },
    out.dancers)

selected = select_candidates(candidates)

# Do something with the results
send_to_singer_committee(selected.singers)
send_to_dancer_committee(selected.dancers)

Function composition

If you feel that the signal is drowned out by the noise in code written like this

for name in filenames:
    file_ = open(name):
        for line in file_:
            for word in line.split():
                print(word)

and that the intent is clearer in code presented like this

pipe(source << filenames, open, join, str.split, join, sink(print))

then you might find liquidata interesting.

Still with me?

That was a trivial example. Let's have a look at something a little more involved.

If you are perfectly happy reading and writing code like this

    def keyword_frequency_loop(directories):
        counter = Counter()
        for directory in directories:
            for (path, dirs, files) in os.walk(directory):
                for filename in files:
                    if not filename.endswith('.py'):
                        continue
                    for line in open(os.path.join(path, filename)):
                        for name in line.split('#', maxsplit=1)[0].split():
                            if iskeyword(name):
                                counter[name] += 1
        return counter

then liquidata is probably not for you.

But if the last example leaves you wanting to extract the core meaning from the noise, and you feel that this

    all_files         = os.walk, JOIN, NAME.path.dirs.files
    pick_python_files = GET.files * (JOIN, { use(str.endswith, '.py') }) >> PUT.filename
    file_contents     = GET.path.filename * os.path.join, open, JOIN
    ignore_comments   = use(str.split, '#', maxsplit=1), GET[0]
    pick_keywords     = str.split, JOIN, { iskeyword }

    keyword_frequency_pipe = pipe(
        all_files,
        pick_python_files,
        file_contents,
        ignore_comments,
        pick_keywords,
        OUT(INTO(Counter)))

is a step in the right direction, and if you feel that abstraction should be as easy as getting the above version by extracting subsequences from this prototype

    keyword_frequency_pipe = pipe(
        os.walk, JOIN,
        NAME.path.dirs.files,
        GET.files * (JOIN, { use(str.endswith, '.py') }) >> PUT.filename,
        GET.path.filename * os.path.join,
        open, JOIN,
        use(str.split, '#', maxsplit=1),
        GET[0],
        str.split, JOIN,
        { iskeyword },
        OUT(INTO(Counter)))

then you might want to read on.

Running these samples

  • select_candidates is an outline of the solution, which omits details. As such, it is not executable.

  • keyword_frequency_loop and both versions of keyword_frequency_pipe are both complete executable examples.

To run keyword_frequency_loop, you will need these imports:

    import os
    from keyword     import iskeyword
    from collections import Counter

To run (either version of) keyword_frequency_pipe you will additionally need to get liquidata, and import thus:

    from liquidata import pipe, name as NAME, get as GET, put as PUT, join as JOIN, out as OUT, into as INTO, use

(The liquidata components were uppercased in order to highlight them in the example.)

Instalation

Currently there are two options:

  1. Pip: pip install liquidata.

  2. Just grab the source. For now, the implementation lives in a single, dependency-free file.

Comparison to mPyPl

mPyPl is a project with certain similarities to liquidata.

A major architectural difference is that mPyPl uses generators to pull data through the pipeline, while liquidata uses coroutines to push the data through the pipeline. This is because liquidata was designed to allow easy bifurcation of flows into independent unsynchronized branches. (liquidata will probably also support pull-pipelines in the future.) Both mPyPl and liquidata support synchronized, named branches by sending compound objects with named components through the flow. mPyPl's and liquidata's approach to managing these names is markedly different.

Here we compare and contrast the APIs provided by the two packages.

This example appears in the quickstart for mPyPl:

import mPyPl as mp

images = (
  mp.get_files('images',ext='.jpg')
  | mp.as_field('filename')
  | mp.apply('filename','image', lambda x: imread(x))
  | mp.apply('filename','date', get_date)
  | mp.apply(['image','date'],'result',lambda x: imprint(x[0],x[1]))
  | mp.select_field('result')
  | mp.as_list)

Here is its translation into liquidata

from liquidata import pipe, source, name, get, put

images = pipe(
  get_files(...) >> source,  name.filename,
  imread   * get.filename >> put.image,
  get_date * get.filename >> put.date,
  imprint  * get.image.date)

Observations:

  • liquidata highlights the high-level information about what happens in the pipeline: get_files, imread, get_date, imprint. In contrast, mPyPl buries it in the noise.

  • liquidata avoids the use of strings as symbols.

  • mPyPl provides a specific get_files utility; liquidata can work with any iterable source of files, but providing such sources is outside of the scope of liquidata's goals.

  • mp.as_field('filename') is equivalent to name.filename

  • mp.apply serves three purposes:

    • mapping a function over the stream data
    • selecting arguments from the compound flow items
    • placing the result back in the compound flow items

    In contrast liquidata separates these concerns

    • mapping is done by default: no need to ask for it
    • get selects arguments
    • put places results
  • mp.apply(['image', 'date'], 'result', lambda x: imprint(x[0],x[1]))

    • creates an argument tuple containing image and date
    • uses a lambda to unpack the argument tuple into the call to imprint
    • puts the result back in the compound flow under the name result

    In contrast, in imprint * get.image.date

    • get.image.date creates an argument tuple
    • * unpacks the augment tuple into the call to imprint
    • The lack of put causes the result to continue downstream on its own: the other items in the compound flow are no longer needed!
  • mp.select_field('result') translates to get.result in liquidata. It extracts the interesting item from the compound flow. In the liquidata version this step is not needed, because it was done implicitly in the previous step: by avoiding the use of >> put.result, the result continued down the pipe on its own, rather than being placed in the compound object along with everything else. That is to say

    imprint * get.image.date
    

    is equivalent to

    imprint * get.image.date >> put.result,
    get.result
    
  • mp.as_list collects the results in a list. The equivalent (which would be written out(into(list))) is missing from the liquidata version, because it's the default.

  • out(into(...)) is far more general than mp.as_list, as it will work with any callable that consumes iterables, such as set, tuple, min, max, sum, sorted, collections.Counter, ... including any and all that will be written in the future.

Whirlwind tour of basic features

from liquidata import pipe, sink

fn = pipe(
    [ sink(print) ],
    { str.isalpha },
    str.upper)

fn(dir())
  • pipe accepts an arbitrary number of pipe components.

  • pipe returns a function (callable).

  • The function created by pipe accepts an iterable argument, and pushes its elements through the pipeline.

  • Square brackets ([ ]) create independent branches: the same data are sent both into the branch and downstream.

  • sink feeds items into a (presumably) side-effectful function (print in this example), and prevents them from going any further downstream.

  • Braces ({ }) are filters: they should contain a predicate (a function whose return value is interpreted as a boolean; str.isalpha, in this example), and will prevent any items which don't satisfy the predicate, from progressing further down the pipe.

  • Unadorned functions are mappings: they accept incoming items, and the values they return are sent downstream.

  • Any items that reach the end of a pipe are, by default, collected into a list and returned.

Consequently, in the above example:

  • The names in the global scope (dir()) are fed into the pipeline, one by one.

  • Each incoming item is printed out.

  • Items containing non-alphabetic characters are filtered out.

  • All remaining items are uppercased ...

  • ... and returned in a list.

from operator import add
from liquidata import source, pipe, out, arg

pipe(
    source << dir(),
    [ out.incoming ],
    { str.isalpha },
    { arg > 5 : len },
    [ str.upper, out.big ],
    [ len, [ out.ADD(add) ], out.SUM(into(sum)) ],
    str.lower,
    out.small)
  • Rather than using pipe to create a reusable function, we feed data into the pipeline directly, by including the source in the pipeline. The following three variations have the same meaning:

    • source << dir()
    • dir() >> source
    • source(dir())
  • Rather than using a side-effect (print) to inspect what is passing through the pipe at various points, in this example we used named outputs. out is a sink which collects values so that they can be returned from the pipe.

    • The presence of multiple outs in the graph, causes the pipeline to return a namespace, rather than a single result.

    • If a pipe (or branch; branches are just pipes) does not explicitly end in a sink or out, then it is implicitly capped with an anonymous out.

    • out.incoming will collect all items into a list, and arrange for the list to be bound to the name incoming in the namespace that is returned by the pipe as a whole.

    • out.ADD(add) uses a binary function (add) to fold or reduce all the values it receives into a single one, which will be placed in the returned namespace under the specified name, ADD.

    • out.SUM(into(sum)) feeds the items it receives into a callable which consumes iterables (in this case sum, but keep in mind that there are very many ready-made options here: set, min/max, collections.Counter, ', '.join, etc.). The result will bound in the returned namespace under the name SUM.

    • arg provides a concise syntax for very simple anonymous functions: ones consisting of the application of an operator and a constant to the function's argument. In this example, we have arg > 5. This is equivalent to lambda arg: arg > 5.

    • Braces containing colons are key-filters. In this example, { arg > 5 : len }. The predicate appears before the colon, but, rather than being applied to the item in the pipe, it is applied to the result the key function (specified after the colon) returns when given the item.

      If the verdict is positive, the original item (rather than the value returned by the key function) progresses downstream.

      In this example, strings are coming through the pipe, and strings continue past the filter, but only those whose len is greater than 5.

    • Branches can be nested: [ len, [ out.ADD(add) ], out.SUM(into(sum)) ].

    • The list of items reaching the end of the main pipe, unlike in the first example, is not anonymous: it is bound to the name small in the returned namespace.

  • The result returned by the pipeline is a namespace containing 5 names.

Synchnonized branches, compound flow and name management

TODO

Tutorial

In this tutorial we'll gradually build up the code that appears in the introduction, and explore some other features along the way.

The following code will traverse your entire filesystem, and print information about every directory and the files in it. Be ready to interrupt it!

A very simple pipe

import os
from liquidata import pipe, source, sink

pipe(source << os.walk('/'), sink(print))
  • We created a pipe which uses the iterable os.walk('/') as its source. The following 3 variations are synonymous:

    • source << iterable
    • iterable >> source
    • source(iterable)
  • The items in the source are sent down the pipeline

  • In this case the pipeline is very short, it contains only a single sink. sink:

    • feeds any items it receives into the (presumably side-effecting) function it contains
    • stops the items from propagating further downstream
  • os.walk('/') traverses the entire filesystem: information about each directory in the filesystem is send down the pipe and printed out. Your screen is flooded with information.

Let's try to remove the need to interrupt the pipeline manually.

Limiting the number of items

import os
from liquidata import pipe, source, take, sink

pipe(source << os.walk('/'), take(3), sink(print))
  • We have introduced a new component into the pipe: take(3). Its purpose is to limit the number of items flowing through the pipe.

  • Only the first three items coming out of the source are printed to the screen.

  • But the pipeline does not terminate, because the entire source is being pushed through the pipe. take only limits how many of them get through.

  • If you do not interrupt this, it will eventually terminate after the source is exhausted (your whole filesystem has been traversed).

Push vs pull

  • liquidata pushes the data through the pipeline using coroutines. A perfectly valid alternative choice would be to pull the data through the pipe using generators.

  • Pushing makes it easy to split the stream into separate, independent branches leading to distinct sinks (or outputs); pulling makes it easier to join independent sources (or inputs). The former made more sense in the context in which liquidata was originally needed.

  • liquidata will support pulling in the future.

  • If the data were being pulled, the previous example would terminate as soon as the first 3 items from the source had been processed.

  • In a pull-architecture, take has to send an 'I am closed, you won't get any more data from me' message downstream. Consequently, downstream will stop asking for more. In a push-architecture, take has two choices:

    • Keep rejecting all subsequent items it receives, until upstream stops sending data.

    • Send a 'please stop sending data' message upstream. In this case, any other branches on the network will also immediately stop receiving any data. This is not always desirable.

Early termination of push

We can instruct take to close down the whole network, as soon as it has let through all the items it needs, by passing in close_all=True:

import os
from liquidata import pipe, source, take, sink

pipe(source << os.walk('/'), take(3, close_all=True), sink(print))
  • The first three items are printed to the screen, and the pipeline terminates immediately.

  • All liquidata components with the ability to reject all subsequent items, should have a close_all option.

Synchronized branches and compound flow

The items generated by os.walk are 3-tuples, containing:

  • A filesystem path
  • A list of directories found on that path
  • A list of files found on that path

Let's give these components names within our pipeline

import os
from liquidata import pipe, source, name, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    take(3, close_all=True),
    sink(print))
  • The new component is name.path.dirs.files.

  • It specifies three names: path, dirs and files.

  • Therefore it expects to receive an iterable containing three items (in this case, the 3-tuple coming out of os.walk)

  • It creates a namespace containing those three names, bound to the corresponding items in the tuple.

  • The resulting namespace continues down the pipe.

  • The components of such namespaces can be thought of as parallel, synchronized, named branches.

  • liquidata also supports unsynchronized branches. We'll meet these later.

  • name.single_name wraps the whole item it receives in a namespace. By contrast, name.more.than.one unpacks the item it receives and wraps its components in a namespace.

Picking components from compound flow

import os
from liquidata import pipe, source, name, get, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    take(3, close_all=True),
    sink(print))
  • The new component is get.files. It

    • expects a compound item (Namespace) from upstream
    • selects the specified component (files, in this example)
    • sends it downstream.
  • Consequently, this program prints three lists:

    • one for each of the first three directories visited by os.walk('/')
    • each containing the names of the files in that directory

join

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    take(3, close_all=True),
    join,
    sink(print))
  • The new component is join.

  • It expects iterables and sends their items downstream, one by one.

  • Consequently, this program prints the names of the files found in the first 3 directories visited by os.walk('/').

  • This is exactly the same information as in the previous example, except that, rather than being grouped in 3 lists, the names appear in one long chain.

Try swapping the take and the join components:

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    take(3, close_all=True),
    sink(print))
  • This time only 3 names are printed

  • Previously, all the names corresponding to the first three directories were printed.

  • join plays a role similar to inner loops.

  • flat(fn) is an alternative spelling of fn, join

import os
from liquidata import pipe, source, name, get, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    flat(get.files),
    take(3, close_all=True),
    sink(print))
  • flat(get.files) replaced get.files, join from the previous example.

  • The behaviour doesn't change.

filters

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { lambda x: x.endswith('.py') },
    take(3, close_all=True),
    sink(print))
  • The new component is { lambda x: x.endswith('.py') }

  • Braces ({ }) create filters, with the contained function being used as the predicate.

  • In this example, we only let through filenames which end in .py.

  • We close the pipeline after seeing the first three of those.

reducing lambda noise: use

The purpose of lambda in the last example, is to bind the second argument of a function (in this example, str.endswith(<string>, <ending>)), while leaving the first argument unbound. This situation arises rather frequently, and lambda often adds a lot of syntactic noise.

import os
from liquidata import pipe, source, name, get, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { use(str.endswith, '.py') },
    take(3, close_all=True),
    sink(print))
  • The new component is use(str.endswith, '.py') which replaces lambda x: x.endswith('.py').

  • The following two lines are synonymous:

    lambda x: fn(x, A2)
          use(fn,   A2)
    
  • More generally:

    lambda x: fn(x, A2, A3, K1=v1, K2=v2)
          use(fn,   A2, A3, K1=v1, K2=v2)
    
  • In other words:

    lambda x: fn(x, *args, **kwds)
          use(fn,   *args, **kwds)
    

key-filters

Imagine we were interested in filenames whose length is greater than 5.

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { lambda x: x < 5 : len },
    take(3, close_all=True),
    sink(print))
  • The new component is { lambda x: x < 5 : len }.

  • The important new feature within that component is : len.

  • We have already seen the use of braces ({ predicate }) as filters.

  • This is a key-filter: { predicate : key }.

  • The predicate does not look at the item itself, but at key(item).

  • If the predicate is satisfied, the item itself (rather than key(item)) continues downstream.

reducing lambda noise: arg

import os
from liquidata import pipe, source, name, get, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { arg < 5 : len },
    take(3, close_all=True),
    sink(print))
  • The new component is arg < 5 which replaces lambda x: x < 5

  • arg is a means of creating anonymous functions by binding one operand of an operator. It works for most Python operators.

  • from liquidata import arg as _ might be tempting ... but is it wise?

Adding new synchronized branches: put

import os
from liquidata import pipe, source, name, put, get, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    (lambda ns: len(ns.files)) >> put.Nfiles,
    take(3, close_all=True),
    sink(print))
  • The new component is (lambda ns: len(ns.files)) >> put.Nfiles

    • lambda ns: len(ns.files) accepts a namespace, and returns the length of one of the values in the namespace.

    • >> put.Nfiles adds the returned value as a new entry in the namespace, under the name Nfiles.

  • Before this point, there were three synchronized branches in the stream (path, dirs, files). After this point there are four (the aforementioned three, and Nfiles).

get *

The last example can be written more conveniently with get *

import os
from liquidata import pipe, source, name, get, put, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * len >> put.Nfiles,
    take(3, close_all=True),
    sink(print))

get.files * len replaced (lambda ns: len(ns.files))

The following three variants are synonymous

(lambda ns: len(   ns.files)) >> put.Nfiles
            len * get.files   >> put.Nfiles
get.files * len               >> put.Nfiles
  • get without the * is essentially a shorthand for operator.attrgetter:

    attrgetter('a')
    get        .a
    

    are synonymous, as are

    attrgetter('a', 'b', 'c')
    get        .a   .b   .c
    
  • get can also play the role of operator.itemgetter:

    itemgetter(key_or_index)
    get       [key_or_index]
    

    are synonymous.

    Unfortunately, Python's __getitem__ syntax throws a spanner in the works, because it cannot distinguish between

    obj[ a, b ]
    obj[(a, b)]
    

    so

           get[ a,b ]
    itemgetter( a,b ) # would like this
    itemgetter((a,b)) # but get this
    
  • When combined with *, get not only extracts values from a compound object, but also unpacks them into a function call, much like the * in Python's standard fn(*args) syntax.

    It is worth noting the following similarities and differences between get ... , and get ... *:

    get.a , fn  # fn(ns.a)
    get.a * fn  # same as above
    
    get.a.b , fn  # fn((ns.a, ns.b))
    get.a.b * fn  # fn( ns.a, ns.b )
    

    The comma does not unpack the tuple: the whole tuple is passed as a single argument to fn. In contrast, the star unpacks the tuple and fn receives its contents as separate arguments.

  • get * also has an important interaction with put. Consider

    get.a * fn >> put.b  # ns.b = (fn(ns.a))
    get.a , fn >> put.b  # Error!
    

    The first case is parsed as (get.a * fn) >> put.b. Consequently put attempts to inject b into the item received by (get.a * fn): that item must be a namespace, if get.a is to work. The b will be placed in that namespace.

    The second case is parsed as get.a, (fn >> put.b). Consequently put attempts to inject b into the item received by fn, which has already been unpacked by get.a, and will therefore NOT be a namespace.

Armed with this knowledge, let's return to the code we were developing.

Putting get * and >> put to work

Recall that:

  • We have a compound flow containing three synchronized branches: path, dirs and files.

  • Alternatively we can say that the items flowing through the pipe are namespaces, each containing the names path, dirs and files.

  • path is a single value; files is a list.

  • We have managed to create a stream of (unqualified) filenames which end in .py.

The code looked like this:

import os
from liquidata import pipe, source, name, get, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { use(str.endswith, '.py') },
    take(3, close_all=True),
    sink(print))
  • We would like to look at the contents of the .py files.

  • In order to open the files, we will need to know their fully qualified names.

  • os.path.join(path, filename) can create these for us, as long as we can feed it a path and a filename.

  • Before get.files the stream contained the path we need, but the filename was not obviously accessible. After get.files, join, the stream contains only filenames: path is no longer available.

  • The solution is to add the filenames to the compound stream, rather than replacing it. This can be done with put.

import os
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename,
    take(3, close_all=True),
    sink(print))
  • The first change to the code is

    get.files ,  join, { use(str.endswith, '.py') }                    # old
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename   # new
    

    The old version:

    • picks files out of the namespace
    • throws away the namespace
    • turns a stream of lists of filenames into a stream of individual filenames
    • discards non-.py filenames

    The new version:

    • feeds files into a sub-pipe
    • the sub-pipe:
      • turns a stream of lists of filenames into a stream of individual filenames
      • discards non-.py filenames
    • for each .py file coming out of the sub-pipe:
      • adds the filename to the namespace
      • sends the augmented namespace downstream
  • The second change is the addition of get.path.filename, which extracts the two values that interest us from the namespace.

Consequently, the code prints out the first three .py filenames, in a tuple together with the paths to the directories that contain them.

We can use * to unpack these tuples into separate arguments when calling os.path.join:

import os
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    take(3, close_all=True),
    sink(print))
  • The new addition is * os.path.join

  • The code now prints the fully qualified names of the first three .py files.

Analysing the file contents

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

There are quite a few new additions, but they should all be familiar by now:

  • open (a Python builtin) turns each filename into an iterator over the lines in the file.

  • join turns the stream of file iterators into a continuous stream of lines in all the files.

  • use(str.split, '#', maxsplit=1) splits each line into [<code>, <comment>] pairs.

  • get[0] picks the code, and throws away the comment.

  • str.split turns the code on any line into a sequence of tokens. [This tokenizer is much too naive for real uses, but it will do for the purposes of our example.]

  • join gives a continuous stream of tokens on all lines in all files.

  • { iskeyword } filters out non-keywords.

  • Changing take's first argument from 3 to 30 allows us to see a bit more output.

Reusable pipes

Our pipe starts with source << .... This means that data are pushed through the pipe as soon as it is defined. It is possible to define a pipe for reuse, decoupling it from the data that will flow through it.

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

fn = pipe(
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

fn(os.walk('/'))
  • source has been removed from the pipe

  • pipe now returns a function

  • That function accepts an iterable argument, and pushes it through the pipe.

  • The function can be called multiple times, with different inputs.

Pipe functions

TODO pipe(...)

Composable pipes

Pipes are pipe components themselves:

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

hard_work = (
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword })

fn = pipe(hard_work, take(5, close_all=True), sink(print))
fn(os.walk('/'))

pipe(source << os.walk('/'), hard_work, take(8, close_all=True), sink(print))
  • The bulk of our pipe was extracted into a tuple called hard_work.

  • hard_work was used as a component in different pipes

Side-effects vs. results

So far, we have been printing out the values that reach the end of the pipe with sink(print).

  • sink stops items from proceeding downstream. So it makes no sense for sink to appear anywhere other than the end of a pipe.

  • sink passes all items that reach it from upstream, to the function it contains (print in all our examples so far). The idea is that the function should perform some side-effect with the values it receives.

Let's try removing the sink:

pipe(source << os.walk('/'), hard_work, take(8, close_all=True))

Any items reaching the end of the pipe, are collected into a list, which is returned as the result of the pipe.

Different strategies for collecting results

from operator import add

pipe(source << range(10), out(min))        #    0
pipe(source << range(10), out(max))        #    9
pipe(source << range(10), out(add))        #   45
pipe(source << range(10), out(add, 1000))  # 1045
  • The default behaviour of collecting all items that reach the end of the pipe into a list, can be overridden with out.

  • out accepts a binary function, which will be used to fold or reduce the items into a single value.

  • out accepts an optional default or initial value, just like functools.reduce.

Alternatively:

pipe(source << range(5), out(into(set)))     # {0, 1, 2, 3, 4}
pipe(source << range(5), out(into(sorted)))  # [0, 1, 2, 3, 4]
pipe(source << range(5), out(into(min)))     #  0
pipe(source << range(5), out(into(max)))     #              4
  • out(into(...)) accepts a consumer of iterables.

  • Therefore, the default behaviour is equivalent to out(into(list)).

Named outputs

pipe(source << range(5), out.X           ).X  # [0, 1, 2, 3, 4]
pipe(source << range(5), out.X     (add) ).X  # 10
pipe(source << range(5), out.X(into(max))).X  # 4
  • outs can be associated with a name: X in these examples.

  • Naming outs causes the pipeline to return its results wrapped it a namespace: note the .X after the pipeline.

  • This isn't very useful unless there are multiple outs in a single network, and that won't make sense until we discover independent branches.

Independent branches

result = pipe(source << range(10), [ out.smallest(into(min)) ], out.biggest(into(max)))
assert result.biggest  == 9
assert result.smallest == 0
  • Square brackets ([ ]) create independent branches.

  • Independent branches are pipes which receive all items reaching their position in the pipeline, without affecting what is seen downstream.

  • Independent branches, just like other pipes, can end in sinks or outs. By default, just like other pipes, they end in out(into(list)).

  • Anonymous outs, are bound to the name return in the returned namespace. As return is a keyword, you will need to use getattr to access it. [This is likely to change in future versions.]

  • If multiple outs in the same network have the same name, that name will be bound to a tuple of all the results with that name. [This is likely to change in future versions].

  • This network returned two results, grouped together in namespace, and bound to the names biggest and smallest

Multiple directories, and progress reporting

Let's return to the code we're developing, and make a few more changes:

import os
from keyword import iskeyword
from liquidata import pipe, name, get, put, take, join, use, sink

fn = pipe(
    [ sink(lambda d: print(f'Processing input directory {d}')) ],
    os.walk,
    join,
    name.path.dirs.files,
    [ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ],
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    [ sink(lambda f: print(f'Processing file {f}')) ],
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

fn(['/bin', '/usr/bin])

Firstly, observe that

  • os.walk, join has been spliced into the pipe (in positions 2 and 3).

  • fn now receives a list of directories, rather than os.walk(<single-directory>).

  • Each directory in the input list will be traversed by os.walk, and the results will be joined together into a single stream.

Secondly, there are three othernew components:

  • [ sink(lambda d: print(f'Processing directory {d}')) ]
  • [ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ]
  • [ sink(lambda f: print(f'Processing file {f}')) ]

These are all independent branches, which report on what flows through the network at different points.

Consequently, as python keywords are printed, we will be informed whenever

  • we start processing the next directory in the input list

  • os.walk enters a new directory

  • we start processing a new .py file

spy

Spy hasn't been implemented yet in the current version of liquidata, but the following shortcuts should appear soon:

[ sink(<side-effect>) ]
   spy(<side-effect>)
[ out.X(<binary-function>) ]
  spy.X(<binary-function>)
[ out.X(into(<iterator-consumer>)) ]
  spy.X(into(<iterator-consumer>))

Frequency analysis

We're almost there. Let's calculate how many times each keyword appears:

import os
from keyword     import iskeyword
from collections import Counter
from liquidata   import pipe, name, get, put, take, join, use

fn = pipe(
    os.walk,
    join,
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    out(into(Counter)))
  • We removed

    • The branches which printed the progress report
    • The take which limited the data that flooded our screen
  • We replaced the final sink(print) with out(into(Counter))

  • collections.Counter (from Python's standard library) consumes an iterable (so it can be used with into) and associates each value with the number of times it was seen.

Abstraction

As it stands, the code throws all the details into our face. Let's cut it up into smaller portions and give them suggestive names, to give a higher-level overview of what the code is doing:

import os
from keyword     import iskeyword
from collections import Counter
from liquidata   import pipe, name, get, put, join, use

all_files         = os.walk, join, name.path.dirs.files,
find_python_files = get.files * (join, { use(str.endswith, '.py') }) >> put.filename
file_contents     = get.path.filename * os.path.join, open, join
discard_comments  = use(str.split, '#', maxsplit=1), get[0]
find_keywords     = str.split, join, { iskeyword }

fn = pipe(
    all_files,
    find_python_files,
    file_contents,
    discard_comments,
    find_keywords,
    out(into(Counter)))

And we're done!

Why do we have the current architecture?

The current implementation uses coroutines to push data through the pipeline.

This was motivated by the context in which the ancestor of liquidata was written, where a single input stream was required be split into multiple independent output streams fed into separate sinks.

Let's take a step back and ask some questions about this choice: Do we need to push? Do we need coroutines? Why? When? What are the consequences? What alternatives are there?

Function composition

Let's consider a very simple pipeline, consisting of a linear sequence of maps:

pipe(f, g, h)

This could be implemented using any of

  • coroutines
  • generators
  • asyncio
  • function composition

Function composition is the simplest, so why bother with the others?

Changing stream length

Let's throw in a filter or a join:

pipe(f, { p }, g, h)
pipe(f, join, g, h)

Function composition no longer cuts the mustard, because there is no longer a 1-to-1 correspendence between items in the input and output streams: something is needed to shrink or expand the stream.

Stream bifurcation

A different complication, branches:

pipe(f, [x, y], g, h)

It's difficult to see how function composition and generators could deal with this.

Joining streams

That last pipe describes a graph that looks something like this:

             x -- y
           /
source -- f
           \
             g -- h

How about

sourceA -- a
            \
             g --- h
            /
sourceB -- b

Generators can deal with this easily:

map(h, map(g, map(a, sourceA)
              map(b, sourceB)))

but it's not obvious how this would work for function composition or coroutines.

liquidata syntax for multiple sources

What would the liquidata syntax for this look like?

Ideally we'd have another kind of bracket (we've exhausted the possibilities that Python offers: (), [], {}). Let's imagine that <> are valid syntactic brackets, then we could have:

pipe(b, <sourceA, a>, g, h)  # join two sources
pipe(b, [a, out.A],   g, h)  # branch out into two sinks

Working with syntax available in Python, how about:

pipe(b, source(sourceA, a), g, h)

Recall that the following are already synonymous in liquidata

pipe(f)(data)
pipe(source << data, f)
pipe(data >> source, f)
pipe(source(data), f)

so the following could work

pipe(source << sourceB, b, (source << sourceA , a), g, h)

liquidata used to have a input-variable syntax (called slots) in its earlier prototype. If something like it were resurrected, we could write something along the lines of

fn = pipe(source << slot.B, b, (source << slot.A, a), g, h)
fn(A=sourceA, B=sourceB)

Synchronization

In liquidata []-branches are called independent, because there is absolutely no synchronization between them (in contrast with named branches which are based on namespaces flowing through the pipe and managed with name, get and put). Once the stream has split, the branches can change the length of the stream without the others knowing or caring about it.

We would need to think about the synchronization consequences for multiple independent input streams. I guess that the answer is: it's up to the user to make sure something sensible happens. Essentially, the user has the same freedoms and responsibilities as when joining multiple sources in classically-written generator networks:

map(h, map(g, map(<filter or join>, sourceA)
              map(b,                sourceB)))

close_all

Consider the following in current liquidata

pipe(source << itertools.count(), take(3))
pipe(source << itertools.count(), take(3, close_all=True))

Because of the pull-architecture, the first never returns. In a pull architecture the issue doesn't arise.

So what?

I would like to remove the universal reliance on coroutines, with two main goals

  • Enabling things that were impossible before.

  • Simplifying portions of the code which don't need coroutines.