Tutorial

In this tutorial we'll gradually build up the code that appears in the introduction, and explore some other features along the way.

The following code will traverse your entire filesystem, and print information about every directory and the files in it. Be ready to interrupt it!

A very simple pipe

import os
from liquidata import pipe, source, sink

pipe(source << os.walk('/'), sink(print))
  • We created a pipe which uses the iterable os.walk('/') as its source. The following 3 variations are synonymous:

    • source << iterable
    • iterable >> source
    • source(iterable)
  • The items in the source are sent down the pipeline

  • In this case the pipeline is very short, it contains only a single sink. sink:

    • feeds any items it receives into the (presumably side-effecting) function it contains
    • stops the items from propagating further downstream
  • os.walk('/') traverses the entire filesystem: information about each directory in the filesystem is send down the pipe and printed out. Your screen is flooded with information.

Let's try to remove the need to interrupt the pipeline manually.

Limiting the number of items

import os
from liquidata import pipe, source, take, sink

pipe(source << os.walk('/'), take(3), sink(print))
  • We have introduced a new component into the pipe: take(3). Its purpose is to limit the number of items flowing through the pipe.

  • Only the first three items coming out of the source are printed to the screen.

  • But the pipeline does not terminate, because the entire source is being pushed through the pipe. take only limits how many of them get through.

  • If you do not interrupt this, it will eventually terminate after the source is exhausted (your whole filesystem has been traversed).

Push vs pull

  • liquidata pushes the data through the pipeline using coroutines. A perfectly valid alternative choice would be to pull the data through the pipe using generators.

  • Pushing makes it easy to split the stream into separate, independent branches leading to distinct sinks (or outputs); pulling makes it easier to join independent sources (or inputs). The former made more sense in the context in which liquidata was originally needed.

  • liquidata will support pulling in the future.

  • If the data were being pulled, the previous example would terminate as soon as the first 3 items from the source had been processed.

  • In a pull-architecture, take has to send an 'I am closed, you won't get any more data from me' message downstream. Consequently, downstream will stop asking for more. In a push-architecture, take has two choices:

    • Keep rejecting all subsequent items it receives, until upstream stops sending data.

    • Send a 'please stop sending data' message upstream. In this case, any other branches on the network will also immediately stop receiving any data. This is not always desirable.

Early termination of push

We can instruct take to close down the whole network, as soon as it has let through all the items it needs, by passing in close_all=True:

import os
from liquidata import pipe, source, take, sink

pipe(source << os.walk('/'), take(3, close_all=True), sink(print))
  • The first three items are printed to the screen, and the pipeline terminates immediately.

  • All liquidata components with the ability to reject all subsequent items, should have a close_all option.

Synchronized branches and compound flow

The items generated by os.walk are 3-tuples, containing:

  • A filesystem path
  • A list of directories found on that path
  • A list of files found on that path

Let's give these components names within our pipeline

import os
from liquidata import pipe, source, name, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    take(3, close_all=True),
    sink(print))
  • The new component is name.path.dirs.files.

  • It specifies three names: path, dirs and files.

  • Therefore it expects to receive an iterable containing three items (in this case, the 3-tuple coming out of os.walk)

  • It creates a namespace containing those three names, bound to the corresponding items in the tuple.

  • The resulting namespace continues down the pipe.

  • The components of such namespaces can be thought of as parallel, synchronized, named branches.

  • liquidata also supports unsynchronized branches. We'll meet these later.

  • name.single_name wraps the whole item it receives in a namespace. By contrast, name.more.than.one unpacks the item it receives and wraps its components in a namespace.

Picking components from compound flow

import os
from liquidata import pipe, source, name, get, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    take(3, close_all=True),
    sink(print))
  • The new component is get.files. It

    • expects a compound item (Namespace) from upstream
    • selects the specified component (files, in this example)
    • sends it downstream.
  • Consequently, this program prints three lists:

    • one for each of the first three directories visited by os.walk('/')
    • each containing the names of the files in that directory

join

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    take(3, close_all=True),
    join,
    sink(print))
  • The new component is join.

  • It expects iterables and sends their items downstream, one by one.

  • Consequently, this program prints the names of the files found in the first 3 directories visited by os.walk('/').

  • This is exactly the same information as in the previous example, except that, rather than being grouped in 3 lists, the names appear in one long chain.

Try swapping the take and the join components:

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    take(3, close_all=True),
    sink(print))
  • This time only 3 names are printed

  • Previously, all the names corresponding to the first three directories were printed.

  • join plays a role similar to inner loops.

  • flat(fn) is an alternative spelling of fn, join

import os
from liquidata import pipe, source, name, get, take, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    flat(get.files),
    take(3, close_all=True),
    sink(print))
  • flat(get.files) replaced get.files, join from the previous example.

  • The behaviour doesn't change.

filters

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { lambda x: x.endswith('.py') },
    take(3, close_all=True),
    sink(print))
  • The new component is { lambda x: x.endswith('.py') }

  • Braces ({ }) create filters, with the contained function being used as the predicate.

  • In this example, we only let through filenames which end in .py.

  • We close the pipeline after seeing the first three of those.

reducing lambda noise: use

The purpose of lambda in the last example, is to bind the second argument of a function (in this example, str.endswith(<string>, <ending>)), while leaving the first argument unbound. This situation arises rather frequently, and lambda often adds a lot of syntactic noise.

import os
from liquidata import pipe, source, name, get, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { use(str.endswith, '.py') },
    take(3, close_all=True),
    sink(print))
  • The new component is use(str.endswith, '.py') which replaces lambda x: x.endswith('.py').

  • The following two lines are synonymous:

    lambda x: fn(x, A2)
          use(fn,   A2)
    
  • More generally:

    lambda x: fn(x, A2, A3, K1=v1, K2=v2)
          use(fn,   A2, A3, K1=v1, K2=v2)
    
  • In other words:

    lambda x: fn(x, *args, **kwds)
          use(fn,   *args, **kwds)
    

key-filters

Imagine we were interested in filenames whose length is greater than 5.

import os
from liquidata import pipe, source, name, get, take, join, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { lambda x: x < 5 : len },
    take(3, close_all=True),
    sink(print))
  • The new component is { lambda x: x < 5 : len }.

  • The important new feature within that component is : len.

  • We have already seen the use of braces ({ predicate }) as filters.

  • This is a key-filter: { predicate : key }.

  • The predicate does not look at the item itself, but at key(item).

  • If the predicate is satisfied, the item itself (rather than key(item)) continues downstream.

reducing lambda noise: arg

import os
from liquidata import pipe, source, name, get, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { arg < 5 : len },
    take(3, close_all=True),
    sink(print))
  • The new component is arg < 5 which replaces lambda x: x < 5

  • arg is a means of creating anonymous functions by binding one operand of an operator. It works for most Python operators.

  • from liquidata import arg as _ might be tempting ... but is it wise?

Adding new synchronized branches: put

import os
from liquidata import pipe, source, name, put, get, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    (lambda ns: len(ns.files)) >> put.Nfiles,
    take(3, close_all=True),
    sink(print))
  • The new component is (lambda ns: len(ns.files)) >> put.Nfiles

    • lambda ns: len(ns.files) accepts a namespace, and returns the length of one of the values in the namespace.

    • >> put.Nfiles adds the returned value as a new entry in the namespace, under the name Nfiles.

  • Before this point, there were three synchronized branches in the stream (path, dirs, files). After this point there are four (the aforementioned three, and Nfiles).

get *

The last example can be written more conveniently with get *

import os
from liquidata import pipe, source, name, get, put, take, join, arg, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * len >> put.Nfiles,
    take(3, close_all=True),
    sink(print))

get.files * len replaced (lambda ns: len(ns.files))

The following three variants are synonymous

(lambda ns: len(   ns.files)) >> put.Nfiles
            len * get.files   >> put.Nfiles
get.files * len               >> put.Nfiles
  • get without the * is essentially a shorthand for operator.attrgetter:

    attrgetter('a')
    get        .a
    

    are synonymous, as are

    attrgetter('a', 'b', 'c')
    get        .a   .b   .c
    
  • get can also play the role of operator.itemgetter:

    itemgetter(key_or_index)
    get       [key_or_index]
    

    are synonymous.

    Unfortunately, Python's __getitem__ syntax throws a spanner in the works, because it cannot distinguish between

    obj[ a, b ]
    obj[(a, b)]
    

    so

           get[ a,b ]
    itemgetter( a,b ) # would like this
    itemgetter((a,b)) # but get this
    
  • When combined with *, get not only extracts values from a compound object, but also unpacks them into a function call, much like the * in Python's standard fn(*args) syntax.

    It is worth noting the following similarities and differences between get ... , and get ... *:

    get.a , fn  # fn(ns.a)
    get.a * fn  # same as above
    
    get.a.b , fn  # fn((ns.a, ns.b))
    get.a.b * fn  # fn( ns.a, ns.b )
    

    The comma does not unpack the tuple: the whole tuple is passed as a single argument to fn. In contrast, the star unpacks the tuple and fn receives its contents as separate arguments.

  • get * also has an important interaction with put. Consider

    get.a * fn >> put.b  # ns.b = (fn(ns.a))
    get.a , fn >> put.b  # Error!
    

    The first case is parsed as (get.a * fn) >> put.b. Consequently put attempts to inject b into the item received by (get.a * fn): that item must be a namespace, if get.a is to work. The b will be placed in that namespace.

    The second case is parsed as get.a, (fn >> put.b). Consequently put attempts to inject b into the item received by fn, which has already been unpacked by get.a, and will therefore NOT be a namespace.

Armed with this knowledge, let's return to the code we were developing.

Putting get * and >> put to work

Recall that:

  • We have a compound flow containing three synchronized branches: path, dirs and files.

  • Alternatively we can say that the items flowing through the pipe are namespaces, each containing the names path, dirs and files.

  • path is a single value; files is a list.

  • We have managed to create a stream of (unqualified) filenames which end in .py.

The code looked like this:

import os
from liquidata import pipe, source, name, get, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files,
    join,
    { use(str.endswith, '.py') },
    take(3, close_all=True),
    sink(print))
  • We would like to look at the contents of the .py files.

  • In order to open the files, we will need to know their fully qualified names.

  • os.path.join(path, filename) can create these for us, as long as we can feed it a path and a filename.

  • Before get.files the stream contained the path we need, but the filename was not obviously accessible. After get.files, join, the stream contains only filenames: path is no longer available.

  • The solution is to add the filenames to the compound stream, rather than replacing it. This can be done with put.

import os
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename,
    take(3, close_all=True),
    sink(print))
  • The first change to the code is

    get.files ,  join, { use(str.endswith, '.py') }                    # old
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename   # new
    

    The old version:

    • picks files out of the namespace
    • throws away the namespace
    • turns a stream of lists of filenames into a stream of individual filenames
    • discards non-.py filenames

    The new version:

    • feeds files into a sub-pipe
    • the sub-pipe:
      • turns a stream of lists of filenames into a stream of individual filenames
      • discards non-.py filenames
    • for each .py file coming out of the sub-pipe:
      • adds the filename to the namespace
      • sends the augmented namespace downstream
  • The second change is the addition of get.path.filename, which extracts the two values that interest us from the namespace.

Consequently, the code prints out the first three .py filenames, in a tuple together with the paths to the directories that contain them.

We can use * to unpack these tuples into separate arguments when calling os.path.join:

import os
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    take(3, close_all=True),
    sink(print))
  • The new addition is * os.path.join

  • The code now prints the fully qualified names of the first three .py files.

Analysing the file contents

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

pipe(
    source << os.walk('/'),
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

There are quite a few new additions, but they should all be familiar by now:

  • open (a Python builtin) turns each filename into an iterator over the lines in the file.

  • join turns the stream of file iterators into a continuous stream of lines in all the files.

  • use(str.split, '#', maxsplit=1) splits each line into [<code>, <comment>] pairs.

  • get[0] picks the code, and throws away the comment.

  • str.split turns the code on any line into a sequence of tokens. [This tokenizer is much too naive for real uses, but it will do for the purposes of our example.]

  • join gives a continuous stream of tokens on all lines in all files.

  • { iskeyword } filters out non-keywords.

  • Changing take's first argument from 3 to 30 allows us to see a bit more output.

Reusable pipes

Our pipe starts with source << .... This means that data are pushed through the pipe as soon as it is defined. It is possible to define a pipe for reuse, decoupling it from the data that will flow through it.

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

fn = pipe(
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

fn(os.walk('/'))
  • source has been removed from the pipe

  • pipe now returns a function

  • That function accepts an iterable argument, and pushes it through the pipe.

  • The function can be called multiple times, with different inputs.

Pipe functions

TODO pipe(...)

Composable pipes

Pipes are pipe components themselves:

import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink

hard_work = (
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword })

fn = pipe(hard_work, take(5, close_all=True), sink(print))
fn(os.walk('/'))

pipe(source << os.walk('/'), hard_work, take(8, close_all=True), sink(print))
  • The bulk of our pipe was extracted into a tuple called hard_work.

  • hard_work was used as a component in different pipes

Side-effects vs. results

So far, we have been printing out the values that reach the end of the pipe with sink(print).

  • sink stops items from proceeding downstream. So it makes no sense for sink to appear anywhere other than the end of a pipe.

  • sink passes all items that reach it from upstream, to the function it contains (print in all our examples so far). The idea is that the function should perform some side-effect with the values it receives.

Let's try removing the sink:

pipe(source << os.walk('/'), hard_work, take(8, close_all=True))

Any items reaching the end of the pipe, are collected into a list, which is returned as the result of the pipe.

Different strategies for collecting results

from operator import add

pipe(source << range(10), out(min))        #    0
pipe(source << range(10), out(max))        #    9
pipe(source << range(10), out(add))        #   45
pipe(source << range(10), out(add, 1000))  # 1045
  • The default behaviour of collecting all items that reach the end of the pipe into a list, can be overridden with out.

  • out accepts a binary function, which will be used to fold or reduce the items into a single value.

  • out accepts an optional default or initial value, just like functools.reduce.

Alternatively:

pipe(source << range(5), out(into(set)))     # {0, 1, 2, 3, 4}
pipe(source << range(5), out(into(sorted)))  # [0, 1, 2, 3, 4]
pipe(source << range(5), out(into(min)))     #  0
pipe(source << range(5), out(into(max)))     #              4
  • out(into(...)) accepts a consumer of iterables.

  • Therefore, the default behaviour is equivalent to out(into(list)).

Named outputs

pipe(source << range(5), out.X           ).X  # [0, 1, 2, 3, 4]
pipe(source << range(5), out.X     (add) ).X  # 10
pipe(source << range(5), out.X(into(max))).X  # 4
  • outs can be associated with a name: X in these examples.

  • Naming outs causes the pipeline to return its results wrapped it a namespace: note the .X after the pipeline.

  • This isn't very useful unless there are multiple outs in a single network, and that won't make sense until we discover independent branches.

Independent branches

result = pipe(source << range(10), [ out.smallest(into(min)) ], out.biggest(into(max)))
assert result.biggest  == 9
assert result.smallest == 0
  • Square brackets ([ ]) create independent branches.

  • Independent branches are pipes which receive all items reaching their position in the pipeline, without affecting what is seen downstream.

  • Independent branches, just like other pipes, can end in sinks or outs. By default, just like other pipes, they end in out(into(list)).

  • Anonymous outs, are bound to the name return in the returned namespace. As return is a keyword, you will need to use getattr to access it. [This is likely to change in future versions.]

  • If multiple outs in the same network have the same name, that name will be bound to a tuple of all the results with that name. [This is likely to change in future versions].

  • This network returned two results, grouped together in namespace, and bound to the names biggest and smallest

Multiple directories, and progress reporting

Let's return to the code we're developing, and make a few more changes:

import os
from keyword import iskeyword
from liquidata import pipe, name, get, put, take, join, use, sink

fn = pipe(
    [ sink(lambda d: print(f'Processing input directory {d}')) ],
    os.walk,
    join,
    name.path.dirs.files,
    [ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ],
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    [ sink(lambda f: print(f'Processing file {f}')) ],
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    take(30, close_all=True),
    sink(print))

fn(['/bin', '/usr/bin])

Firstly, observe that

  • os.walk, join has been spliced into the pipe (in positions 2 and 3).

  • fn now receives a list of directories, rather than os.walk(<single-directory>).

  • Each directory in the input list will be traversed by os.walk, and the results will be joined together into a single stream.

Secondly, there are three othernew components:

  • [ sink(lambda d: print(f'Processing directory {d}')) ]
  • [ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ]
  • [ sink(lambda f: print(f'Processing file {f}')) ]

These are all independent branches, which report on what flows through the network at different points.

Consequently, as python keywords are printed, we will be informed whenever

  • we start processing the next directory in the input list

  • os.walk enters a new directory

  • we start processing a new .py file

spy

Spy hasn't been implemented yet in the current version of liquidata, but the following shortcuts should appear soon:

[ sink(<side-effect>) ]
   spy(<side-effect>)
[ out.X(<binary-function>) ]
  spy.X(<binary-function>)
[ out.X(into(<iterator-consumer>)) ]
  spy.X(into(<iterator-consumer>))

Frequency analysis

We're almost there. Let's calculate how many times each keyword appears:

import os
from keyword     import iskeyword
from collections import Counter
from liquidata   import pipe, name, get, put, take, join, use

fn = pipe(
    os.walk,
    join,
    name.path.dirs.files,
    get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
    get.path.filename * os.path.join,
    open,
    join,
    use(str.split, '#', maxsplit=1),
    get[0],
    str.split,
    join,
    { iskeyword },
    out(into(Counter)))
  • We removed

    • The branches which printed the progress report
    • The take which limited the data that flooded our screen
  • We replaced the final sink(print) with out(into(Counter))

  • collections.Counter (from Python's standard library) consumes an iterable (so it can be used with into) and associates each value with the number of times it was seen.

Abstraction

As it stands, the code throws all the details into our face. Let's cut it up into smaller portions and give them suggestive names, to give a higher-level overview of what the code is doing:

import os
from keyword     import iskeyword
from collections import Counter
from liquidata   import pipe, name, get, put, join, use

all_files         = os.walk, join, name.path.dirs.files,
find_python_files = get.files * (join, { use(str.endswith, '.py') }) >> put.filename
file_contents     = get.path.filename * os.path.join, open, join
discard_comments  = use(str.split, '#', maxsplit=1), get[0]
find_keywords     = str.split, join, { iskeyword }

fn = pipe(
    all_files,
    find_python_files,
    file_contents,
    discard_comments,
    find_keywords,
    out(into(Counter)))

And we're done!