Tutorial
In this tutorial we'll gradually build up the code that appears in the introduction, and explore some other features along the way.
The following code will traverse your entire filesystem, and print information about every directory and the files in it. Be ready to interrupt it!
A very simple pipe
import os
from liquidata import pipe, source, sink
pipe(source << os.walk('/'), sink(print))
-
We created a pipe which uses the iterable
os.walk('/')as its source. The following 3 variations are synonymous:source << iterableiterable >> sourcesource(iterable)
-
The items in the source are sent down the pipeline
-
In this case the pipeline is very short, it contains only a single sink.
sink:- feeds any items it receives into the (presumably side-effecting) function it contains
- stops the items from propagating further downstream
-
os.walk('/')traverses the entire filesystem: information about each directory in the filesystem is send down the pipe and printed out. Your screen is flooded with information.
Let's try to remove the need to interrupt the pipeline manually.
Limiting the number of items
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3), sink(print))
-
We have introduced a new component into the pipe:
take(3). Its purpose is to limit the number of items flowing through the pipe. -
Only the first three items coming out of the source are printed to the screen.
-
But the pipeline does not terminate, because the entire source is being pushed through the pipe.
takeonly limits how many of them get through. -
If you do not interrupt this, it will eventually terminate after the source is exhausted (your whole filesystem has been traversed).
Push vs pull
-
liquidatapushes the data through the pipeline using coroutines. A perfectly valid alternative choice would be to pull the data through the pipe using generators. -
Pushing makes it easy to split the stream into separate, independent branches leading to distinct sinks (or outputs); pulling makes it easier to join independent sources (or inputs). The former made more sense in the context in which
liquidatawas originally needed. -
liquidatawill support pulling in the future. -
If the data were being pulled, the previous example would terminate as soon as the first 3 items from the source had been processed.
-
In a pull-architecture,
takehas to send an 'I am closed, you won't get any more data from me' message downstream. Consequently, downstream will stop asking for more. In a push-architecture,takehas two choices:-
Keep rejecting all subsequent items it receives, until upstream stops sending data.
-
Send a 'please stop sending data' message upstream. In this case, any other branches on the network will also immediately stop receiving any data. This is not always desirable.
-
Early termination of push
We can instruct take to close down the whole network, as soon as it has let
through all the items it needs, by passing in close_all=True:
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3, close_all=True), sink(print))
-
The first three items are printed to the screen, and the pipeline terminates immediately.
-
All
liquidatacomponents with the ability to reject all subsequent items, should have aclose_alloption.
Synchronized branches and compound flow
The items generated by os.walk are 3-tuples, containing:
- A filesystem path
- A list of directories found on that path
- A list of files found on that path
Let's give these components names within our pipeline
import os
from liquidata import pipe, source, name, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
take(3, close_all=True),
sink(print))
-
The new component is
name.path.dirs.files. -
It specifies three names:
path,dirsandfiles. -
Therefore it expects to receive an iterable containing three items (in this case, the 3-tuple coming out of
os.walk) -
It creates a namespace containing those three names, bound to the corresponding items in the tuple.
-
The resulting namespace continues down the pipe.
-
The components of such namespaces can be thought of as parallel, synchronized, named branches.
-
liquidataalso supports unsynchronized branches. We'll meet these later. -
name.single_namewraps the whole item it receives in a namespace. By contrast,name.more.than.oneunpacks the item it receives and wraps its components in a namespace.
Picking components from compound flow
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
sink(print))
-
The new component is
get.files. It- expects a compound item (Namespace) from upstream
- selects the specified component (
files, in this example) - sends it downstream.
-
Consequently, this program prints three lists:
- one for each of the first three directories visited by
os.walk('/') - each containing the names of the files in that directory
- one for each of the first three directories visited by
join
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
join,
sink(print))
-
The new component is
join. -
It expects iterables and sends their items downstream, one by one.
-
Consequently, this program prints the names of the files found in the first 3 directories visited by
os.walk('/'). -
This is exactly the same information as in the previous example, except that, rather than being grouped in 3 lists, the names appear in one long chain.
Try swapping the take and the join components:
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
take(3, close_all=True),
sink(print))
-
This time only 3 names are printed
-
Previously, all the names corresponding to the first three directories were printed.
-
joinplays a role similar to inner loops. -
flat(fn)is an alternative spelling offn, join
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
flat(get.files),
take(3, close_all=True),
sink(print))
-
flat(get.files)replacedget.files, joinfrom the previous example. -
The behaviour doesn't change.
filters
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x.endswith('.py') },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x.endswith('.py') } -
Braces (
{ }) create filters, with the contained function being used as the predicate. -
In this example, we only let through filenames which end in
.py. -
We close the pipeline after seeing the first three of those.
reducing lambda noise: use
The purpose of lambda in the last example, is to bind the second argument of a
function (in this example, str.endswith(<string>, <ending>)), while leaving
the first argument unbound. This situation arises rather frequently, and
lambda often adds a lot of syntactic noise.
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
The new component is
use(str.endswith, '.py')which replaceslambda x: x.endswith('.py'). -
The following two lines are synonymous:
lambda x: fn(x, A2) use(fn, A2) -
More generally:
lambda x: fn(x, A2, A3, K1=v1, K2=v2) use(fn, A2, A3, K1=v1, K2=v2) -
In other words:
lambda x: fn(x, *args, **kwds) use(fn, *args, **kwds)
key-filters
Imagine we were interested in filenames whose length is greater than 5.
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x < 5 : len }. -
The important new feature within that component is
: len. -
We have already seen the use of braces (
{ predicate }) as filters. -
This is a key-filter:
{ predicate : key }. -
The predicate does not look at the item itself, but at
key(item). -
If the predicate is satisfied, the item itself (rather than
key(item)) continues downstream.
reducing lambda noise: arg
import os
from liquidata import pipe, source, name, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ arg < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
arg < 5which replaceslambda x: x < 5 -
argis a means of creating anonymous functions by binding one operand of an operator. It works for most Python operators. -
from liquidata import arg as _might be tempting ... but is it wise?
Adding new synchronized branches: put
import os
from liquidata import pipe, source, name, put, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
(lambda ns: len(ns.files)) >> put.Nfiles,
take(3, close_all=True),
sink(print))
-
The new component is
(lambda ns: len(ns.files)) >> put.Nfiles-
lambda ns: len(ns.files)accepts a namespace, and returns the length of one of the values in the namespace. -
>> put.Nfilesadds the returned value as a new entry in the namespace, under the nameNfiles.
-
-
Before this point, there were three synchronized branches in the stream (
path,dirs,files). After this point there are four (the aforementioned three, andNfiles).
get *
The last example can be written more conveniently with get *
import os
from liquidata import pipe, source, name, get, put, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * len >> put.Nfiles,
take(3, close_all=True),
sink(print))
get.files * len replaced (lambda ns: len(ns.files))
The following three variants are synonymous
(lambda ns: len( ns.files)) >> put.Nfiles
len * get.files >> put.Nfiles
get.files * len >> put.Nfiles
-
getwithout the*is essentially a shorthand foroperator.attrgetter:attrgetter('a') get .aare synonymous, as are
attrgetter('a', 'b', 'c') get .a .b .c -
getcan also play the role ofoperator.itemgetter:itemgetter(key_or_index) get [key_or_index]are synonymous.
Unfortunately, Python's
__getitem__syntax throws a spanner in the works, because it cannot distinguish betweenobj[ a, b ] obj[(a, b)]so
get[ a,b ] itemgetter( a,b ) # would like this itemgetter((a,b)) # but get this -
When combined with
*,getnot only extracts values from a compound object, but also unpacks them into a function call, much like the*in Python's standardfn(*args)syntax.It is worth noting the following similarities and differences between
get ... ,andget ... *:get.a , fn # fn(ns.a) get.a * fn # same as aboveget.a.b , fn # fn((ns.a, ns.b)) get.a.b * fn # fn( ns.a, ns.b )The comma does not unpack the tuple: the whole tuple is passed as a single argument to
fn. In contrast, the star unpacks the tuple andfnreceives its contents as separate arguments. -
get *also has an important interaction withput. Considerget.a * fn >> put.b # ns.b = (fn(ns.a)) get.a , fn >> put.b # Error!The first case is parsed as
(get.a * fn) >> put.b. Consequentlyputattempts to injectbinto the item received by(get.a * fn): that item must be a namespace, ifget.ais to work. Thebwill be placed in that namespace.The second case is parsed as
get.a, (fn >> put.b). Consequentlyputattempts to injectbinto the item received byfn, which has already been unpacked byget.a, and will therefore NOT be a namespace.
Armed with this knowledge, let's return to the code we were developing.
Putting get * and >> put to work
Recall that:
-
We have a compound flow containing three synchronized branches:
path,dirsandfiles. -
Alternatively we can say that the items flowing through the pipe are namespaces, each containing the names
path,dirsandfiles. -
pathis a single value;filesis a list. -
We have managed to create a stream of (unqualified) filenames which end in
.py.
The code looked like this:
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
We would like to look at the contents of the
.pyfiles. -
In order to open the files, we will need to know their fully qualified names.
-
os.path.join(path, filename)can create these for us, as long as we can feed it a path and a filename. -
Before
get.filesthe stream contained thepathwe need, but the filename was not obviously accessible. Afterget.files, join, the stream contains only filenames:pathis no longer available. -
The solution is to add the filenames to the compound stream, rather than replacing it. This can be done with
put.
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename,
take(3, close_all=True),
sink(print))
-
The first change to the code is
get.files , join, { use(str.endswith, '.py') } # old get.files * (join, { use(str.endswith, '.py') }) >> put.filename # newThe old version:
- picks
filesout of the namespace - throws away the namespace
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.pyfilenames
The new version:
- feeds
filesinto a sub-pipe - the sub-pipe:
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.pyfilenames
- for each
.pyfile coming out of the sub-pipe:- adds the filename to the namespace
- sends the augmented namespace downstream
- picks
-
The second change is the addition of
get.path.filename, which extracts the two values that interest us from the namespace.
Consequently, the code prints out the first three .py filenames, in a tuple
together with the paths to the directories that contain them.
We can use * to unpack these tuples into separate arguments when calling os.path.join:
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
take(3, close_all=True),
sink(print))
-
The new addition is
* os.path.join -
The code now prints the fully qualified names of the first three
.pyfiles.
Analysing the file contents
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
There are quite a few new additions, but they should all be familiar by now:
-
open(a Python builtin) turns each filename into an iterator over the lines in the file. -
jointurns the stream of file iterators into a continuous stream of lines in all the files. -
use(str.split, '#', maxsplit=1)splits each line into[<code>, <comment>]pairs. -
get[0]picks the code, and throws away the comment. -
str.splitturns the code on any line into a sequence of tokens. [This tokenizer is much too naive for real uses, but it will do for the purposes of our example.] -
joingives a continuous stream of tokens on all lines in all files. -
{ iskeyword }filters out non-keywords. -
Changing
take's first argument from3to30allows us to see a bit more output.
Reusable pipes
Our pipe starts with source << .... This means that data are pushed through
the pipe as soon as it is defined. It is possible to define a pipe for reuse,
decoupling it from the data that will flow through it.
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
fn = pipe(
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(os.walk('/'))
-
sourcehas been removed from the pipe -
pipenow returns a function -
That function accepts an iterable argument, and pushes it through the pipe.
-
The function can be called multiple times, with different inputs.
Pipe functions
TODO pipe(...)
Composable pipes
Pipes are pipe components themselves:
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
hard_work = (
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword })
fn = pipe(hard_work, take(5, close_all=True), sink(print))
fn(os.walk('/'))
pipe(source << os.walk('/'), hard_work, take(8, close_all=True), sink(print))
-
The bulk of our pipe was extracted into a tuple called
hard_work. -
hard_workwas used as a component in different pipes
Side-effects vs. results
So far, we have been printing out the values that reach the end of the pipe with sink(print).
-
sinkstops items from proceeding downstream. So it makes no sense forsinkto appear anywhere other than the end of a pipe. -
sinkpasses all items that reach it from upstream, to the function it contains (printin all our examples so far). The idea is that the function should perform some side-effect with the values it receives.
Let's try removing the sink:
pipe(source << os.walk('/'), hard_work, take(8, close_all=True))
Any items reaching the end of the pipe, are collected into a list, which is returned as the result of the pipe.
Different strategies for collecting results
from operator import add
pipe(source << range(10), out(min)) # 0
pipe(source << range(10), out(max)) # 9
pipe(source << range(10), out(add)) # 45
pipe(source << range(10), out(add, 1000)) # 1045
-
The default behaviour of collecting all items that reach the end of the pipe into a list, can be overridden with
out. -
outaccepts a binary function, which will be used to fold or reduce the items into a single value. -
outaccepts an optional default or initial value, just likefunctools.reduce.
Alternatively:
pipe(source << range(5), out(into(set))) # {0, 1, 2, 3, 4}
pipe(source << range(5), out(into(sorted))) # [0, 1, 2, 3, 4]
pipe(source << range(5), out(into(min))) # 0
pipe(source << range(5), out(into(max))) # 4
-
out(into(...))accepts a consumer of iterables. -
Therefore, the default behaviour is equivalent to
out(into(list)).
Named outputs
pipe(source << range(5), out.X ).X # [0, 1, 2, 3, 4]
pipe(source << range(5), out.X (add) ).X # 10
pipe(source << range(5), out.X(into(max))).X # 4
-
outs can be associated with a name:Xin these examples. -
Naming
outs causes the pipeline to return its results wrapped it a namespace: note the.Xafter the pipeline. -
This isn't very useful unless there are multiple
outs in a single network, and that won't make sense until we discover independent branches.
Independent branches
result = pipe(source << range(10), [ out.smallest(into(min)) ], out.biggest(into(max)))
assert result.biggest == 9
assert result.smallest == 0
-
Square brackets (
[ ]) create independent branches. -
Independent branches are pipes which receive all items reaching their position in the pipeline, without affecting what is seen downstream.
-
Independent branches, just like other pipes, can end in
sinks orouts. By default, just like other pipes, they end inout(into(list)). -
Anonymous
outs, are bound to the namereturnin the returned namespace. Asreturnis a keyword, you will need to usegetattrto access it. [This is likely to change in future versions.] -
If multiple
outs in the same network have the same name, that name will be bound to a tuple of all the results with that name. [This is likely to change in future versions]. -
This network returned two results, grouped together in namespace, and bound to the names
biggestandsmallest
Multiple directories, and progress reporting
Let's return to the code we're developing, and make a few more changes:
import os
from keyword import iskeyword
from liquidata import pipe, name, get, put, take, join, use, sink
fn = pipe(
[ sink(lambda d: print(f'Processing input directory {d}')) ],
os.walk,
join,
name.path.dirs.files,
[ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ],
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
[ sink(lambda f: print(f'Processing file {f}')) ],
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(['/bin', '/usr/bin])
Firstly, observe that
-
os.walk, joinhas been spliced into the pipe (in positions 2 and 3). -
fnnow receives a list of directories, rather thanos.walk(<single-directory>). -
Each directory in the input list will be traversed by
os.walk, and the results will bejoined together into a single stream.
Secondly, there are three othernew components:
[ sink(lambda d: print(f'Processing directory {d}')) ][ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ][ sink(lambda f: print(f'Processing file {f}')) ]
These are all independent branches, which report on what flows through the network at different points.
Consequently, as python keywords are printed, we will be informed whenever
-
we start processing the next directory in the input list
-
os.walkenters a new directory -
we start processing a new
.pyfile
spy
Spy hasn't been implemented yet in the current version of liquidata, but the
following shortcuts should appear soon:
[ sink(<side-effect>) ]
spy(<side-effect>)
[ out.X(<binary-function>) ]
spy.X(<binary-function>)
[ out.X(into(<iterator-consumer>)) ]
spy.X(into(<iterator-consumer>))
Frequency analysis
We're almost there. Let's calculate how many times each keyword appears:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, take, join, use
fn = pipe(
os.walk,
join,
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
out(into(Counter)))
-
We removed
- The branches which printed the progress report
- The
takewhich limited the data that flooded our screen
-
We replaced the final
sink(print)without(into(Counter)) -
collections.Counter(from Python's standard library) consumes an iterable (so it can be used withinto) and associates each value with the number of times it was seen.
Abstraction
As it stands, the code throws all the details into our face. Let's cut it up into smaller portions and give them suggestive names, to give a higher-level overview of what the code is doing:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, join, use
all_files = os.walk, join, name.path.dirs.files,
find_python_files = get.files * (join, { use(str.endswith, '.py') }) >> put.filename
file_contents = get.path.filename * os.path.join, open, join
discard_comments = use(str.split, '#', maxsplit=1), get[0]
find_keywords = str.split, join, { iskeyword }
fn = pipe(
all_files,
find_python_files,
file_contents,
discard_comments,
find_keywords,
out(into(Counter)))
And we're done!