Tutorial
In this tutorial we'll gradually build up the code that appears in the introduction, and explore some other features along the way.
The following code will traverse your entire filesystem, and print information about every directory and the files in it. Be ready to interrupt it!
A very simple pipe
import os
from liquidata import pipe, source, sink
pipe(source << os.walk('/'), sink(print))
-
We created a pipe which uses the iterable
os.walk('/')
as its source. The following 3 variations are synonymous:source << iterable
iterable >> source
source(iterable)
-
The items in the source are sent down the pipeline
-
In this case the pipeline is very short, it contains only a single sink.
sink
:- feeds any items it receives into the (presumably side-effecting) function it contains
- stops the items from propagating further downstream
-
os.walk('/')
traverses the entire filesystem: information about each directory in the filesystem is send down the pipe and printed out. Your screen is flooded with information.
Let's try to remove the need to interrupt the pipeline manually.
Limiting the number of items
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3), sink(print))
-
We have introduced a new component into the pipe:
take(3)
. Its purpose is to limit the number of items flowing through the pipe. -
Only the first three items coming out of the source are printed to the screen.
-
But the pipeline does not terminate, because the entire source is being pushed through the pipe.
take
only limits how many of them get through. -
If you do not interrupt this, it will eventually terminate after the source is exhausted (your whole filesystem has been traversed).
Push vs pull
-
liquidata
pushes the data through the pipeline using coroutines. A perfectly valid alternative choice would be to pull the data through the pipe using generators. -
Pushing makes it easy to split the stream into separate, independent branches leading to distinct sinks (or outputs); pulling makes it easier to join independent sources (or inputs). The former made more sense in the context in which
liquidata
was originally needed. -
liquidata
will support pulling in the future. -
If the data were being pulled, the previous example would terminate as soon as the first 3 items from the source had been processed.
-
In a pull-architecture,
take
has to send an 'I am closed, you won't get any more data from me' message downstream. Consequently, downstream will stop asking for more. In a push-architecture,take
has two choices:-
Keep rejecting all subsequent items it receives, until upstream stops sending data.
-
Send a 'please stop sending data' message upstream. In this case, any other branches on the network will also immediately stop receiving any data. This is not always desirable.
-
Early termination of push
We can instruct take
to close down the whole network, as soon as it has let
through all the items it needs, by passing in close_all=True
:
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3, close_all=True), sink(print))
-
The first three items are printed to the screen, and the pipeline terminates immediately.
-
All
liquidata
components with the ability to reject all subsequent items, should have aclose_all
option.
Synchronized branches and compound flow
The items generated by os.walk
are 3-tuples, containing:
- A filesystem path
- A list of directories found on that path
- A list of files found on that path
Let's give these components names within our pipeline
import os
from liquidata import pipe, source, name, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
take(3, close_all=True),
sink(print))
-
The new component is
name.path.dirs.files
. -
It specifies three names:
path
,dirs
andfiles
. -
Therefore it expects to receive an iterable containing three items (in this case, the 3-tuple coming out of
os.walk
) -
It creates a namespace containing those three names, bound to the corresponding items in the tuple.
-
The resulting namespace continues down the pipe.
-
The components of such namespaces can be thought of as parallel, synchronized, named branches.
-
liquidata
also supports unsynchronized branches. We'll meet these later. -
name.single_name
wraps the whole item it receives in a namespace. By contrast,name.more.than.one
unpacks the item it receives and wraps its components in a namespace.
Picking components from compound flow
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
sink(print))
-
The new component is
get.files
. It- expects a compound item (Namespace) from upstream
- selects the specified component (
files
, in this example) - sends it downstream.
-
Consequently, this program prints three lists:
- one for each of the first three directories visited by
os.walk('/')
- each containing the names of the files in that directory
- one for each of the first three directories visited by
join
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
join,
sink(print))
-
The new component is
join
. -
It expects iterables and sends their items downstream, one by one.
-
Consequently, this program prints the names of the files found in the first 3 directories visited by
os.walk('/')
. -
This is exactly the same information as in the previous example, except that, rather than being grouped in 3 lists, the names appear in one long chain.
Try swapping the take
and the join
components:
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
take(3, close_all=True),
sink(print))
-
This time only 3 names are printed
-
Previously, all the names corresponding to the first three directories were printed.
-
join
plays a role similar to inner loops. -
flat(fn)
is an alternative spelling offn, join
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
flat(get.files),
take(3, close_all=True),
sink(print))
-
flat(get.files)
replacedget.files, join
from the previous example. -
The behaviour doesn't change.
filters
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x.endswith('.py') },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x.endswith('.py') }
-
Braces (
{ }
) create filters, with the contained function being used as the predicate. -
In this example, we only let through filenames which end in
.py
. -
We close the pipeline after seeing the first three of those.
reducing lambda
noise: use
The purpose of lambda
in the last example, is to bind the second argument of a
function (in this example, str.endswith(<string>, <ending>)
), while leaving
the first argument unbound. This situation arises rather frequently, and
lambda
often adds a lot of syntactic noise.
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
The new component is
use(str.endswith, '.py')
which replaceslambda x: x.endswith('.py')
. -
The following two lines are synonymous:
lambda x: fn(x, A2) use(fn, A2)
-
More generally:
lambda x: fn(x, A2, A3, K1=v1, K2=v2) use(fn, A2, A3, K1=v1, K2=v2)
-
In other words:
lambda x: fn(x, *args, **kwds) use(fn, *args, **kwds)
key-filters
Imagine we were interested in filenames whose length is greater than 5.
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x < 5 : len }
. -
The important new feature within that component is
: len
. -
We have already seen the use of braces (
{ predicate }
) as filters. -
This is a key-filter:
{ predicate : key }
. -
The predicate does not look at the item itself, but at
key(item)
. -
If the predicate is satisfied, the item itself (rather than
key(item)
) continues downstream.
reducing lambda
noise: arg
import os
from liquidata import pipe, source, name, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ arg < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
arg < 5
which replaceslambda x: x < 5
-
arg
is a means of creating anonymous functions by binding one operand of an operator. It works for most Python operators. -
from liquidata import arg as _
might be tempting ... but is it wise?
Adding new synchronized branches: put
import os
from liquidata import pipe, source, name, put, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
(lambda ns: len(ns.files)) >> put.Nfiles,
take(3, close_all=True),
sink(print))
-
The new component is
(lambda ns: len(ns.files)) >> put.Nfiles
-
lambda ns: len(ns.files)
accepts a namespace, and returns the length of one of the values in the namespace. -
>> put.Nfiles
adds the returned value as a new entry in the namespace, under the nameNfiles
.
-
-
Before this point, there were three synchronized branches in the stream (
path
,dirs
,files
). After this point there are four (the aforementioned three, andNfiles
).
get *
The last example can be written more conveniently with get *
import os
from liquidata import pipe, source, name, get, put, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * len >> put.Nfiles,
take(3, close_all=True),
sink(print))
get.files * len
replaced (lambda ns: len(ns.files))
The following three variants are synonymous
(lambda ns: len( ns.files)) >> put.Nfiles
len * get.files >> put.Nfiles
get.files * len >> put.Nfiles
-
get
without the*
is essentially a shorthand foroperator.attrgetter
:attrgetter('a') get .a
are synonymous, as are
attrgetter('a', 'b', 'c') get .a .b .c
-
get
can also play the role ofoperator.itemgetter
:itemgetter(key_or_index) get [key_or_index]
are synonymous.
Unfortunately, Python's
__getitem__
syntax throws a spanner in the works, because it cannot distinguish betweenobj[ a, b ] obj[(a, b)]
so
get[ a,b ] itemgetter( a,b ) # would like this itemgetter((a,b)) # but get this
-
When combined with
*
,get
not only extracts values from a compound object, but also unpacks them into a function call, much like the*
in Python's standardfn(*args)
syntax.It is worth noting the following similarities and differences between
get ... ,
andget ... *
:get.a , fn # fn(ns.a) get.a * fn # same as above
get.a.b , fn # fn((ns.a, ns.b)) get.a.b * fn # fn( ns.a, ns.b )
The comma does not unpack the tuple: the whole tuple is passed as a single argument to
fn
. In contrast, the star unpacks the tuple andfn
receives its contents as separate arguments. -
get *
also has an important interaction withput
. Considerget.a * fn >> put.b # ns.b = (fn(ns.a)) get.a , fn >> put.b # Error!
The first case is parsed as
(get.a * fn) >> put.b
. Consequentlyput
attempts to injectb
into the item received by(get.a * fn)
: that item must be a namespace, ifget.a
is to work. Theb
will be placed in that namespace.The second case is parsed as
get.a, (fn >> put.b)
. Consequentlyput
attempts to injectb
into the item received byfn
, which has already been unpacked byget.a
, and will therefore NOT be a namespace.
Armed with this knowledge, let's return to the code we were developing.
Putting get *
and >> put
to work
Recall that:
-
We have a compound flow containing three synchronized branches:
path
,dirs
andfiles
. -
Alternatively we can say that the items flowing through the pipe are namespaces, each containing the names
path
,dirs
andfiles
. -
path
is a single value;files
is a list. -
We have managed to create a stream of (unqualified) filenames which end in
.py
.
The code looked like this:
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
We would like to look at the contents of the
.py
files. -
In order to open the files, we will need to know their fully qualified names.
-
os.path.join(path, filename)
can create these for us, as long as we can feed it a path and a filename. -
Before
get.files
the stream contained thepath
we need, but the filename was not obviously accessible. Afterget.files, join
, the stream contains only filenames:path
is no longer available. -
The solution is to add the filenames to the compound stream, rather than replacing it. This can be done with
put
.
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename,
take(3, close_all=True),
sink(print))
-
The first change to the code is
get.files , join, { use(str.endswith, '.py') } # old get.files * (join, { use(str.endswith, '.py') }) >> put.filename # new
The old version:
- picks
files
out of the namespace - throws away the namespace
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.py
filenames
The new version:
- feeds
files
into a sub-pipe - the sub-pipe:
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.py
filenames
- for each
.py
file coming out of the sub-pipe:- adds the filename to the namespace
- sends the augmented namespace downstream
- picks
-
The second change is the addition of
get.path.filename
, which extracts the two values that interest us from the namespace.
Consequently, the code prints out the first three .py
filenames, in a tuple
together with the paths to the directories that contain them.
We can use *
to unpack these tuples into separate arguments when calling os.path.join
:
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
take(3, close_all=True),
sink(print))
-
The new addition is
* os.path.join
-
The code now prints the fully qualified names of the first three
.py
files.
Analysing the file contents
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
There are quite a few new additions, but they should all be familiar by now:
-
open
(a Python builtin) turns each filename into an iterator over the lines in the file. -
join
turns the stream of file iterators into a continuous stream of lines in all the files. -
use(str.split, '#', maxsplit=1)
splits each line into[<code>, <comment>]
pairs. -
get[0]
picks the code, and throws away the comment. -
str.split
turns the code on any line into a sequence of tokens. [This tokenizer is much too naive for real uses, but it will do for the purposes of our example.] -
join
gives a continuous stream of tokens on all lines in all files. -
{ iskeyword }
filters out non-keywords. -
Changing
take
's first argument from3
to30
allows us to see a bit more output.
Reusable pipes
Our pipe starts with source << ...
. This means that data are pushed through
the pipe as soon as it is defined. It is possible to define a pipe for reuse,
decoupling it from the data that will flow through it.
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
fn = pipe(
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(os.walk('/'))
-
source
has been removed from the pipe -
pipe
now returns a function -
That function accepts an iterable argument, and pushes it through the pipe.
-
The function can be called multiple times, with different inputs.
Pipe functions
TODO pipe(...)
Composable pipes
Pipes are pipe components themselves:
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
hard_work = (
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword })
fn = pipe(hard_work, take(5, close_all=True), sink(print))
fn(os.walk('/'))
pipe(source << os.walk('/'), hard_work, take(8, close_all=True), sink(print))
-
The bulk of our pipe was extracted into a tuple called
hard_work
. -
hard_work
was used as a component in different pipes
Side-effects vs. results
So far, we have been printing out the values that reach the end of the pipe with sink(print)
.
-
sink
stops items from proceeding downstream. So it makes no sense forsink
to appear anywhere other than the end of a pipe. -
sink
passes all items that reach it from upstream, to the function it contains (print
in all our examples so far). The idea is that the function should perform some side-effect with the values it receives.
Let's try removing the sink:
pipe(source << os.walk('/'), hard_work, take(8, close_all=True))
Any items reaching the end of the pipe, are collected into a list, which is returned as the result of the pipe.
Different strategies for collecting results
from operator import add
pipe(source << range(10), out(min)) # 0
pipe(source << range(10), out(max)) # 9
pipe(source << range(10), out(add)) # 45
pipe(source << range(10), out(add, 1000)) # 1045
-
The default behaviour of collecting all items that reach the end of the pipe into a list, can be overridden with
out
. -
out
accepts a binary function, which will be used to fold or reduce the items into a single value. -
out
accepts an optional default or initial value, just likefunctools.reduce
.
Alternatively:
pipe(source << range(5), out(into(set))) # {0, 1, 2, 3, 4}
pipe(source << range(5), out(into(sorted))) # [0, 1, 2, 3, 4]
pipe(source << range(5), out(into(min))) # 0
pipe(source << range(5), out(into(max))) # 4
-
out(into(...))
accepts a consumer of iterables. -
Therefore, the default behaviour is equivalent to
out(into(list))
.
Named outputs
pipe(source << range(5), out.X ).X # [0, 1, 2, 3, 4]
pipe(source << range(5), out.X (add) ).X # 10
pipe(source << range(5), out.X(into(max))).X # 4
-
out
s can be associated with a name:X
in these examples. -
Naming
out
s causes the pipeline to return its results wrapped it a namespace: note the.X
after the pipeline. -
This isn't very useful unless there are multiple
out
s in a single network, and that won't make sense until we discover independent branches.
Independent branches
result = pipe(source << range(10), [ out.smallest(into(min)) ], out.biggest(into(max)))
assert result.biggest == 9
assert result.smallest == 0
-
Square brackets (
[ ]
) create independent branches. -
Independent branches are pipes which receive all items reaching their position in the pipeline, without affecting what is seen downstream.
-
Independent branches, just like other pipes, can end in
sink
s orout
s. By default, just like other pipes, they end inout(into(list))
. -
Anonymous
out
s, are bound to the namereturn
in the returned namespace. Asreturn
is a keyword, you will need to usegetattr
to access it. [This is likely to change in future versions.] -
If multiple
out
s in the same network have the same name, that name will be bound to a tuple of all the results with that name. [This is likely to change in future versions]. -
This network returned two results, grouped together in namespace, and bound to the names
biggest
andsmallest
Multiple directories, and progress reporting
Let's return to the code we're developing, and make a few more changes:
import os
from keyword import iskeyword
from liquidata import pipe, name, get, put, take, join, use, sink
fn = pipe(
[ sink(lambda d: print(f'Processing input directory {d}')) ],
os.walk,
join,
name.path.dirs.files,
[ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ],
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
[ sink(lambda f: print(f'Processing file {f}')) ],
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(['/bin', '/usr/bin])
Firstly, observe that
-
os.walk, join
has been spliced into the pipe (in positions 2 and 3). -
fn
now receives a list of directories, rather thanos.walk(<single-directory>)
. -
Each directory in the input list will be traversed by
os.walk
, and the results will bejoin
ed together into a single stream.
Secondly, there are three othernew components:
[ sink(lambda d: print(f'Processing directory {d}')) ]
[ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ]
[ sink(lambda f: print(f'Processing file {f}')) ]
These are all independent branches, which report on what flows through the network at different points.
Consequently, as python keywords are printed, we will be informed whenever
-
we start processing the next directory in the input list
-
os.walk
enters a new directory -
we start processing a new
.py
file
spy
Spy hasn't been implemented yet in the current version of liquidata
, but the
following shortcuts should appear soon:
[ sink(<side-effect>) ]
spy(<side-effect>)
[ out.X(<binary-function>) ]
spy.X(<binary-function>)
[ out.X(into(<iterator-consumer>)) ]
spy.X(into(<iterator-consumer>))
Frequency analysis
We're almost there. Let's calculate how many times each keyword appears:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, take, join, use
fn = pipe(
os.walk,
join,
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
out(into(Counter)))
-
We removed
- The branches which printed the progress report
- The
take
which limited the data that flooded our screen
-
We replaced the final
sink(print)
without(into(Counter))
-
collections.Counter
(from Python's standard library) consumes an iterable (so it can be used withinto
) and associates each value with the number of times it was seen.
Abstraction
As it stands, the code throws all the details into our face. Let's cut it up into smaller portions and give them suggestive names, to give a higher-level overview of what the code is doing:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, join, use
all_files = os.walk, join, name.path.dirs.files,
find_python_files = get.files * (join, { use(str.endswith, '.py') }) >> put.filename
file_contents = get.path.filename * os.path.join, open, join
discard_comments = use(str.split, '#', maxsplit=1), get[0]
find_keywords = str.split, join, { iskeyword }
fn = pipe(
all_files,
find_python_files,
file_contents,
discard_comments,
find_keywords,
out(into(Counter)))
And we're done!