What is this?
liquidata
is a Python Embedded Domain Specific Language (EDSL) which aims to encourage and facilitate
-
increasing the signal-to-noise ratio in source code
-
avoiding using strings to represent symbols in the API
-
code reuse through composition of reusable and orthogonal components
-
dataflow programming
-
function composition
-
lazy processing.
Why would I want this?
Dataflow networks
It can be helpful to think of your computations as flows through a network or graph of components. For example
candidates
|
quick_screen
|
expensive_screen -------.
| \
can dance ? can sing ?
| |
hop test pitch test
| |
skip test rhythm test
| |
jump test |
| |
sum scores sum scores
| |
score above 210 ? score above 140 ?
| |
output dancers output singers
The aim of liquidata
is to allow you to express the idea laid out in the graph
above, in code that reflects the structure of the graph. A liquidata
implementation of the graph might look something like this:
select_candidates = pipe(
{ quick_screening },
{ expensive_screening },
[ { can_sing },
test_pitch,
test_rhythm,
sum_scores.pitch.rhythm,
{ score_above(140) },
out.singers
],
{ can_dance },
test_hop,
test_skip,
test_jump,
sum_scores.hop.skip.jump,
{ score_above(210) },
out.dancers)
selected = select_candidates(candidates)
# Do something with the results
send_to_singer_committee(selected.singers)
send_to_dancer_committee(selected.dancers)
Function composition
If you feel that the signal is drowned out by the noise in code written like this
for name in filenames:
file_ = open(name):
for line in file_:
for word in line.split():
print(word)
and that the intent is clearer in code presented like this
pipe(source << filenames, open, join, str.split, join, sink(print))
then you might find liquidata
interesting.
Still with me?
That was a trivial example. Let's have a look at something a little more involved.
If you are perfectly happy reading and writing code like this
def keyword_frequency_loop(directories):
counter = Counter()
for directory in directories:
for (path, dirs, files) in os.walk(directory):
for filename in files:
if not filename.endswith('.py'):
continue
for line in open(os.path.join(path, filename)):
for name in line.split('#', maxsplit=1)[0].split():
if iskeyword(name):
counter[name] += 1
return counter
then liquidata
is probably not for you.
But if the last example leaves you wanting to extract the core meaning from the noise, and you feel that this
all_files = os.walk, JOIN, NAME.path.dirs.files
pick_python_files = GET.files * (JOIN, { use(str.endswith, '.py') }) >> PUT.filename
file_contents = GET.path.filename * os.path.join, open, JOIN
ignore_comments = use(str.split, '#', maxsplit=1), GET[0]
pick_keywords = str.split, JOIN, { iskeyword }
keyword_frequency_pipe = pipe(
all_files,
pick_python_files,
file_contents,
ignore_comments,
pick_keywords,
OUT(INTO(Counter)))
is a step in the right direction, and if you feel that abstraction should be as easy as getting the above version by extracting subsequences from this prototype
keyword_frequency_pipe = pipe(
os.walk, JOIN,
NAME.path.dirs.files,
GET.files * (JOIN, { use(str.endswith, '.py') }) >> PUT.filename,
GET.path.filename * os.path.join,
open, JOIN,
use(str.split, '#', maxsplit=1),
GET[0],
str.split, JOIN,
{ iskeyword },
OUT(INTO(Counter)))
then you might want to read on.
Running these samples
-
select_candidates
is an outline of the solution, which omits details. As such, it is not executable. -
keyword_frequency_loop
and both versions ofkeyword_frequency_pipe
are both complete executable examples.
To run keyword_frequency_loop
, you will need these imports:
import os
from keyword import iskeyword
from collections import Counter
To run (either version of) keyword_frequency_pipe
you will additionally need
to get liquidata
, and import thus:
from liquidata import pipe, name as NAME, get as GET, put as PUT, join as JOIN, out as OUT, into as INTO, use
(The liquidata components were uppercased in order to highlight them in the example.)
Instalation
Currently there are two options:
-
Pip:
pip install liquidata
. -
Just grab the source. For now, the implementation lives in a single, dependency-free file.
Comparison to mPyPl
mPyPl
is a project with certain similarities to
liquidata
.
A major architectural difference is that mPyPl
uses generators to pull data
through the pipeline, while liquidata
uses coroutines to push the data
through the pipeline. This is because liquidata
was designed to allow easy
bifurcation of flows into independent unsynchronized branches. (liquidata
will
probably also support pull-pipelines in the future.) Both mPyPl
and
liquidata
support synchronized, named branches by sending compound objects
with named components through the flow. mPyPl
's and liquidata
's approach to
managing these names is markedly different.
Here we compare and contrast the APIs provided by the two packages.
This example appears in the quickstart for mPyPl
:
import mPyPl as mp
images = (
mp.get_files('images',ext='.jpg')
| mp.as_field('filename')
| mp.apply('filename','image', lambda x: imread(x))
| mp.apply('filename','date', get_date)
| mp.apply(['image','date'],'result',lambda x: imprint(x[0],x[1]))
| mp.select_field('result')
| mp.as_list)
Here is its translation into liquidata
from liquidata import pipe, source, name, get, put
images = pipe(
get_files(...) >> source, name.filename,
imread * get.filename >> put.image,
get_date * get.filename >> put.date,
imprint * get.image.date)
Observations:
-
liquidata
highlights the high-level information about what happens in the pipeline:get_files
,imread
,get_date
,imprint
. In contrast,mPyPl
buries it in the noise. -
liquidata
avoids the use of strings as symbols. -
mPyPl
provides a specificget_files
utility;liquidata
can work with any iterable source of files, but providing such sources is outside of the scope ofliquidata
's goals. -
mp.as_field('filename')
is equivalent toname.filename
-
mp.apply
serves three purposes:- mapping a function over the stream data
- selecting arguments from the compound flow items
- placing the result back in the compound flow items
In contrast
liquidata
separates these concerns- mapping is done by default: no need to ask for it
get
selects argumentsput
places results
-
mp.apply(['image', 'date'], 'result', lambda x: imprint(x[0],x[1]))
- creates an argument tuple containing
image
anddate
- uses a
lambda
to unpack the argument tuple into the call toimprint
- puts the result back in the compound flow under the name
result
In contrast, in
imprint * get.image.date
get.image.date
creates an argument tuple*
unpacks the augment tuple into the call toimprint
- The lack of
put
causes the result to continue downstream on its own: the other items in the compound flow are no longer needed!
- creates an argument tuple containing
-
mp.select_field('result')
translates toget.result
inliquidata
. It extracts the interesting item from the compound flow. In theliquidata
version this step is not needed, because it was done implicitly in the previous step: by avoiding the use of>> put.result
, the result continued down the pipe on its own, rather than being placed in the compound object along with everything else. That is to sayimprint * get.image.date
is equivalent to
imprint * get.image.date >> put.result, get.result
-
mp.as_list
collects the results in a list. The equivalent (which would be writtenout(into(list))
) is missing from theliquidata
version, because it's the default. -
out(into(...))
is far more general thanmp.as_list
, as it will work with any callable that consumes iterables, such asset
,tuple
,min
,max
,sum
,sorted
,collections.Counter
, ... including any and all that will be written in the future.
Whirlwind tour of basic features
from liquidata import pipe, sink
fn = pipe(
[ sink(print) ],
{ str.isalpha },
str.upper)
fn(dir())
-
pipe
accepts an arbitrary number of pipe components. -
pipe
returns a function (callable). -
The function created by
pipe
accepts an iterable argument, and pushes its elements through the pipeline. -
Square brackets (
[ ]
) create independent branches: the same data are sent both into the branch and downstream. -
sink
feeds items into a (presumably) side-effectful function (print
in this example), and prevents them from going any further downstream. -
Braces (
{ }
) are filters: they should contain a predicate (a function whose return value is interpreted as a boolean;str.isalpha
, in this example), and will prevent any items which don't satisfy the predicate, from progressing further down the pipe. -
Unadorned functions are mappings: they accept incoming items, and the values they return are sent downstream.
-
Any items that reach the end of a pipe are, by default, collected into a list and returned.
Consequently, in the above example:
-
The names in the global scope (
dir()
) are fed into the pipeline, one by one. -
Each incoming item is printed out.
-
Items containing non-alphabetic characters are filtered out.
-
All remaining items are uppercased ...
-
... and returned in a list.
from operator import add
from liquidata import source, pipe, out, arg
pipe(
source << dir(),
[ out.incoming ],
{ str.isalpha },
{ arg > 5 : len },
[ str.upper, out.big ],
[ len, [ out.ADD(add) ], out.SUM(into(sum)) ],
str.lower,
out.small)
-
Rather than using
pipe
to create a reusable function, we feed data into the pipeline directly, by including the source in the pipeline. The following three variations have the same meaning:source << dir()
dir() >> source
source(dir())
-
Rather than using a side-effect (
print
) to inspect what is passing through the pipe at various points, in this example we used named outputs.out
is a sink which collects values so that they can be returned from the pipe.-
The presence of multiple
out
s in the graph, causes the pipeline to return a namespace, rather than a single result. -
If a pipe (or branch; branches are just pipes) does not explicitly end in a
sink
orout
, then it is implicitly capped with an anonymousout
. -
out.incoming
will collect all items into a list, and arrange for the list to be bound to the nameincoming
in the namespace that is returned by the pipe as a whole. -
out.ADD(add)
uses a binary function (add
) to fold or reduce all the values it receives into a single one, which will be placed in the returned namespace under the specified name,ADD
. -
out.SUM(into(sum))
feeds the items it receives into a callable which consumes iterables (in this casesum
, but keep in mind that there are very many ready-made options here:set
,min
/max
,collections.Counter
,', '.join
, etc.). The result will bound in the returned namespace under the nameSUM
. -
arg
provides a concise syntax for very simple anonymous functions: ones consisting of the application of an operator and a constant to the function's argument. In this example, we havearg > 5
. This is equivalent tolambda arg: arg > 5
. -
Braces containing colons are key-filters. In this example,
{ arg > 5 : len }
. The predicate appears before the colon, but, rather than being applied to the item in the pipe, it is applied to the result the key function (specified after the colon) returns when given the item.If the verdict is positive, the original item (rather than the value returned by the key function) progresses downstream.
In this example, strings are coming through the pipe, and strings continue past the filter, but only those whose
len
is greater than5
. -
Branches can be nested:
[ len, [ out.ADD(add) ], out.SUM(into(sum)) ]
. -
The list of items reaching the end of the main pipe, unlike in the first example, is not anonymous: it is bound to the name
small
in the returned namespace.
-
-
The result returned by the pipeline is a namespace containing 5 names.
Synchnonized branches, compound flow and name management
TODO
Tutorial
In this tutorial we'll gradually build up the code that appears in the introduction, and explore some other features along the way.
The following code will traverse your entire filesystem, and print information about every directory and the files in it. Be ready to interrupt it!
A very simple pipe
import os
from liquidata import pipe, source, sink
pipe(source << os.walk('/'), sink(print))
-
We created a pipe which uses the iterable
os.walk('/')
as its source. The following 3 variations are synonymous:source << iterable
iterable >> source
source(iterable)
-
The items in the source are sent down the pipeline
-
In this case the pipeline is very short, it contains only a single sink.
sink
:- feeds any items it receives into the (presumably side-effecting) function it contains
- stops the items from propagating further downstream
-
os.walk('/')
traverses the entire filesystem: information about each directory in the filesystem is send down the pipe and printed out. Your screen is flooded with information.
Let's try to remove the need to interrupt the pipeline manually.
Limiting the number of items
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3), sink(print))
-
We have introduced a new component into the pipe:
take(3)
. Its purpose is to limit the number of items flowing through the pipe. -
Only the first three items coming out of the source are printed to the screen.
-
But the pipeline does not terminate, because the entire source is being pushed through the pipe.
take
only limits how many of them get through. -
If you do not interrupt this, it will eventually terminate after the source is exhausted (your whole filesystem has been traversed).
Push vs pull
-
liquidata
pushes the data through the pipeline using coroutines. A perfectly valid alternative choice would be to pull the data through the pipe using generators. -
Pushing makes it easy to split the stream into separate, independent branches leading to distinct sinks (or outputs); pulling makes it easier to join independent sources (or inputs). The former made more sense in the context in which
liquidata
was originally needed. -
liquidata
will support pulling in the future. -
If the data were being pulled, the previous example would terminate as soon as the first 3 items from the source had been processed.
-
In a pull-architecture,
take
has to send an 'I am closed, you won't get any more data from me' message downstream. Consequently, downstream will stop asking for more. In a push-architecture,take
has two choices:-
Keep rejecting all subsequent items it receives, until upstream stops sending data.
-
Send a 'please stop sending data' message upstream. In this case, any other branches on the network will also immediately stop receiving any data. This is not always desirable.
-
Early termination of push
We can instruct take
to close down the whole network, as soon as it has let
through all the items it needs, by passing in close_all=True
:
import os
from liquidata import pipe, source, take, sink
pipe(source << os.walk('/'), take(3, close_all=True), sink(print))
-
The first three items are printed to the screen, and the pipeline terminates immediately.
-
All
liquidata
components with the ability to reject all subsequent items, should have aclose_all
option.
Synchronized branches and compound flow
The items generated by os.walk
are 3-tuples, containing:
- A filesystem path
- A list of directories found on that path
- A list of files found on that path
Let's give these components names within our pipeline
import os
from liquidata import pipe, source, name, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
take(3, close_all=True),
sink(print))
-
The new component is
name.path.dirs.files
. -
It specifies three names:
path
,dirs
andfiles
. -
Therefore it expects to receive an iterable containing three items (in this case, the 3-tuple coming out of
os.walk
) -
It creates a namespace containing those three names, bound to the corresponding items in the tuple.
-
The resulting namespace continues down the pipe.
-
The components of such namespaces can be thought of as parallel, synchronized, named branches.
-
liquidata
also supports unsynchronized branches. We'll meet these later. -
name.single_name
wraps the whole item it receives in a namespace. By contrast,name.more.than.one
unpacks the item it receives and wraps its components in a namespace.
Picking components from compound flow
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
sink(print))
-
The new component is
get.files
. It- expects a compound item (Namespace) from upstream
- selects the specified component (
files
, in this example) - sends it downstream.
-
Consequently, this program prints three lists:
- one for each of the first three directories visited by
os.walk('/')
- each containing the names of the files in that directory
- one for each of the first three directories visited by
join
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
take(3, close_all=True),
join,
sink(print))
-
The new component is
join
. -
It expects iterables and sends their items downstream, one by one.
-
Consequently, this program prints the names of the files found in the first 3 directories visited by
os.walk('/')
. -
This is exactly the same information as in the previous example, except that, rather than being grouped in 3 lists, the names appear in one long chain.
Try swapping the take
and the join
components:
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
take(3, close_all=True),
sink(print))
-
This time only 3 names are printed
-
Previously, all the names corresponding to the first three directories were printed.
-
join
plays a role similar to inner loops. -
flat(fn)
is an alternative spelling offn, join
import os
from liquidata import pipe, source, name, get, take, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
flat(get.files),
take(3, close_all=True),
sink(print))
-
flat(get.files)
replacedget.files, join
from the previous example. -
The behaviour doesn't change.
filters
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x.endswith('.py') },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x.endswith('.py') }
-
Braces (
{ }
) create filters, with the contained function being used as the predicate. -
In this example, we only let through filenames which end in
.py
. -
We close the pipeline after seeing the first three of those.
reducing lambda
noise: use
The purpose of lambda
in the last example, is to bind the second argument of a
function (in this example, str.endswith(<string>, <ending>)
), while leaving
the first argument unbound. This situation arises rather frequently, and
lambda
often adds a lot of syntactic noise.
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
The new component is
use(str.endswith, '.py')
which replaceslambda x: x.endswith('.py')
. -
The following two lines are synonymous:
lambda x: fn(x, A2) use(fn, A2)
-
More generally:
lambda x: fn(x, A2, A3, K1=v1, K2=v2) use(fn, A2, A3, K1=v1, K2=v2)
-
In other words:
lambda x: fn(x, *args, **kwds) use(fn, *args, **kwds)
key-filters
Imagine we were interested in filenames whose length is greater than 5.
import os
from liquidata import pipe, source, name, get, take, join, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ lambda x: x < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
{ lambda x: x < 5 : len }
. -
The important new feature within that component is
: len
. -
We have already seen the use of braces (
{ predicate }
) as filters. -
This is a key-filter:
{ predicate : key }
. -
The predicate does not look at the item itself, but at
key(item)
. -
If the predicate is satisfied, the item itself (rather than
key(item)
) continues downstream.
reducing lambda
noise: arg
import os
from liquidata import pipe, source, name, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ arg < 5 : len },
take(3, close_all=True),
sink(print))
-
The new component is
arg < 5
which replaceslambda x: x < 5
-
arg
is a means of creating anonymous functions by binding one operand of an operator. It works for most Python operators. -
from liquidata import arg as _
might be tempting ... but is it wise?
Adding new synchronized branches: put
import os
from liquidata import pipe, source, name, put, get, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
(lambda ns: len(ns.files)) >> put.Nfiles,
take(3, close_all=True),
sink(print))
-
The new component is
(lambda ns: len(ns.files)) >> put.Nfiles
-
lambda ns: len(ns.files)
accepts a namespace, and returns the length of one of the values in the namespace. -
>> put.Nfiles
adds the returned value as a new entry in the namespace, under the nameNfiles
.
-
-
Before this point, there were three synchronized branches in the stream (
path
,dirs
,files
). After this point there are four (the aforementioned three, andNfiles
).
get *
The last example can be written more conveniently with get *
import os
from liquidata import pipe, source, name, get, put, take, join, arg, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * len >> put.Nfiles,
take(3, close_all=True),
sink(print))
get.files * len
replaced (lambda ns: len(ns.files))
The following three variants are synonymous
(lambda ns: len( ns.files)) >> put.Nfiles
len * get.files >> put.Nfiles
get.files * len >> put.Nfiles
-
get
without the*
is essentially a shorthand foroperator.attrgetter
:attrgetter('a') get .a
are synonymous, as are
attrgetter('a', 'b', 'c') get .a .b .c
-
get
can also play the role ofoperator.itemgetter
:itemgetter(key_or_index) get [key_or_index]
are synonymous.
Unfortunately, Python's
__getitem__
syntax throws a spanner in the works, because it cannot distinguish betweenobj[ a, b ] obj[(a, b)]
so
get[ a,b ] itemgetter( a,b ) # would like this itemgetter((a,b)) # but get this
-
When combined with
*
,get
not only extracts values from a compound object, but also unpacks them into a function call, much like the*
in Python's standardfn(*args)
syntax.It is worth noting the following similarities and differences between
get ... ,
andget ... *
:get.a , fn # fn(ns.a) get.a * fn # same as above
get.a.b , fn # fn((ns.a, ns.b)) get.a.b * fn # fn( ns.a, ns.b )
The comma does not unpack the tuple: the whole tuple is passed as a single argument to
fn
. In contrast, the star unpacks the tuple andfn
receives its contents as separate arguments. -
get *
also has an important interaction withput
. Considerget.a * fn >> put.b # ns.b = (fn(ns.a)) get.a , fn >> put.b # Error!
The first case is parsed as
(get.a * fn) >> put.b
. Consequentlyput
attempts to injectb
into the item received by(get.a * fn)
: that item must be a namespace, ifget.a
is to work. Theb
will be placed in that namespace.The second case is parsed as
get.a, (fn >> put.b)
. Consequentlyput
attempts to injectb
into the item received byfn
, which has already been unpacked byget.a
, and will therefore NOT be a namespace.
Armed with this knowledge, let's return to the code we were developing.
Putting get *
and >> put
to work
Recall that:
-
We have a compound flow containing three synchronized branches:
path
,dirs
andfiles
. -
Alternatively we can say that the items flowing through the pipe are namespaces, each containing the names
path
,dirs
andfiles
. -
path
is a single value;files
is a list. -
We have managed to create a stream of (unqualified) filenames which end in
.py
.
The code looked like this:
import os
from liquidata import pipe, source, name, get, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files,
join,
{ use(str.endswith, '.py') },
take(3, close_all=True),
sink(print))
-
We would like to look at the contents of the
.py
files. -
In order to open the files, we will need to know their fully qualified names.
-
os.path.join(path, filename)
can create these for us, as long as we can feed it a path and a filename. -
Before
get.files
the stream contained thepath
we need, but the filename was not obviously accessible. Afterget.files, join
, the stream contains only filenames:path
is no longer available. -
The solution is to add the filenames to the compound stream, rather than replacing it. This can be done with
put
.
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename,
take(3, close_all=True),
sink(print))
-
The first change to the code is
get.files , join, { use(str.endswith, '.py') } # old get.files * (join, { use(str.endswith, '.py') }) >> put.filename # new
The old version:
- picks
files
out of the namespace - throws away the namespace
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.py
filenames
The new version:
- feeds
files
into a sub-pipe - the sub-pipe:
- turns a stream of lists of filenames into a stream of individual filenames
- discards non-
.py
filenames
- for each
.py
file coming out of the sub-pipe:- adds the filename to the namespace
- sends the augmented namespace downstream
- picks
-
The second change is the addition of
get.path.filename
, which extracts the two values that interest us from the namespace.
Consequently, the code prints out the first three .py
filenames, in a tuple
together with the paths to the directories that contain them.
We can use *
to unpack these tuples into separate arguments when calling os.path.join
:
import os
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
take(3, close_all=True),
sink(print))
-
The new addition is
* os.path.join
-
The code now prints the fully qualified names of the first three
.py
files.
Analysing the file contents
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
pipe(
source << os.walk('/'),
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
There are quite a few new additions, but they should all be familiar by now:
-
open
(a Python builtin) turns each filename into an iterator over the lines in the file. -
join
turns the stream of file iterators into a continuous stream of lines in all the files. -
use(str.split, '#', maxsplit=1)
splits each line into[<code>, <comment>]
pairs. -
get[0]
picks the code, and throws away the comment. -
str.split
turns the code on any line into a sequence of tokens. [This tokenizer is much too naive for real uses, but it will do for the purposes of our example.] -
join
gives a continuous stream of tokens on all lines in all files. -
{ iskeyword }
filters out non-keywords. -
Changing
take
's first argument from3
to30
allows us to see a bit more output.
Reusable pipes
Our pipe starts with source << ...
. This means that data are pushed through
the pipe as soon as it is defined. It is possible to define a pipe for reuse,
decoupling it from the data that will flow through it.
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
fn = pipe(
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(os.walk('/'))
-
source
has been removed from the pipe -
pipe
now returns a function -
That function accepts an iterable argument, and pushes it through the pipe.
-
The function can be called multiple times, with different inputs.
Pipe functions
TODO pipe(...)
Composable pipes
Pipes are pipe components themselves:
import os
from keyword import iskeyword
from liquidata import pipe, source, name, get, put, take, join, use, sink
hard_work = (
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword })
fn = pipe(hard_work, take(5, close_all=True), sink(print))
fn(os.walk('/'))
pipe(source << os.walk('/'), hard_work, take(8, close_all=True), sink(print))
-
The bulk of our pipe was extracted into a tuple called
hard_work
. -
hard_work
was used as a component in different pipes
Side-effects vs. results
So far, we have been printing out the values that reach the end of the pipe with sink(print)
.
-
sink
stops items from proceeding downstream. So it makes no sense forsink
to appear anywhere other than the end of a pipe. -
sink
passes all items that reach it from upstream, to the function it contains (print
in all our examples so far). The idea is that the function should perform some side-effect with the values it receives.
Let's try removing the sink:
pipe(source << os.walk('/'), hard_work, take(8, close_all=True))
Any items reaching the end of the pipe, are collected into a list, which is returned as the result of the pipe.
Different strategies for collecting results
from operator import add
pipe(source << range(10), out(min)) # 0
pipe(source << range(10), out(max)) # 9
pipe(source << range(10), out(add)) # 45
pipe(source << range(10), out(add, 1000)) # 1045
-
The default behaviour of collecting all items that reach the end of the pipe into a list, can be overridden with
out
. -
out
accepts a binary function, which will be used to fold or reduce the items into a single value. -
out
accepts an optional default or initial value, just likefunctools.reduce
.
Alternatively:
pipe(source << range(5), out(into(set))) # {0, 1, 2, 3, 4}
pipe(source << range(5), out(into(sorted))) # [0, 1, 2, 3, 4]
pipe(source << range(5), out(into(min))) # 0
pipe(source << range(5), out(into(max))) # 4
-
out(into(...))
accepts a consumer of iterables. -
Therefore, the default behaviour is equivalent to
out(into(list))
.
Named outputs
pipe(source << range(5), out.X ).X # [0, 1, 2, 3, 4]
pipe(source << range(5), out.X (add) ).X # 10
pipe(source << range(5), out.X(into(max))).X # 4
-
out
s can be associated with a name:X
in these examples. -
Naming
out
s causes the pipeline to return its results wrapped it a namespace: note the.X
after the pipeline. -
This isn't very useful unless there are multiple
out
s in a single network, and that won't make sense until we discover independent branches.
Independent branches
result = pipe(source << range(10), [ out.smallest(into(min)) ], out.biggest(into(max)))
assert result.biggest == 9
assert result.smallest == 0
-
Square brackets (
[ ]
) create independent branches. -
Independent branches are pipes which receive all items reaching their position in the pipeline, without affecting what is seen downstream.
-
Independent branches, just like other pipes, can end in
sink
s orout
s. By default, just like other pipes, they end inout(into(list))
. -
Anonymous
out
s, are bound to the namereturn
in the returned namespace. Asreturn
is a keyword, you will need to usegetattr
to access it. [This is likely to change in future versions.] -
If multiple
out
s in the same network have the same name, that name will be bound to a tuple of all the results with that name. [This is likely to change in future versions]. -
This network returned two results, grouped together in namespace, and bound to the names
biggest
andsmallest
Multiple directories, and progress reporting
Let's return to the code we're developing, and make a few more changes:
import os
from keyword import iskeyword
from liquidata import pipe, name, get, put, take, join, use, sink
fn = pipe(
[ sink(lambda d: print(f'Processing input directory {d}')) ],
os.walk,
join,
name.path.dirs.files,
[ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ],
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
[ sink(lambda f: print(f'Processing file {f}')) ],
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
take(30, close_all=True),
sink(print))
fn(['/bin', '/usr/bin])
Firstly, observe that
-
os.walk, join
has been spliced into the pipe (in positions 2 and 3). -
fn
now receives a list of directories, rather thanos.walk(<single-directory>)
. -
Each directory in the input list will be traversed by
os.walk
, and the results will bejoin
ed together into a single stream.
Secondly, there are three othernew components:
[ sink(lambda d: print(f'Processing directory {d}')) ]
[ get.path, sink(lambda d: print(f'Processing discovered directory {d}')) ]
[ sink(lambda f: print(f'Processing file {f}')) ]
These are all independent branches, which report on what flows through the network at different points.
Consequently, as python keywords are printed, we will be informed whenever
-
we start processing the next directory in the input list
-
os.walk
enters a new directory -
we start processing a new
.py
file
spy
Spy hasn't been implemented yet in the current version of liquidata
, but the
following shortcuts should appear soon:
[ sink(<side-effect>) ]
spy(<side-effect>)
[ out.X(<binary-function>) ]
spy.X(<binary-function>)
[ out.X(into(<iterator-consumer>)) ]
spy.X(into(<iterator-consumer>))
Frequency analysis
We're almost there. Let's calculate how many times each keyword appears:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, take, join, use
fn = pipe(
os.walk,
join,
name.path.dirs.files,
get.files * (join, { use(str.endswith, '.py') }) >> put.filename,
get.path.filename * os.path.join,
open,
join,
use(str.split, '#', maxsplit=1),
get[0],
str.split,
join,
{ iskeyword },
out(into(Counter)))
-
We removed
- The branches which printed the progress report
- The
take
which limited the data that flooded our screen
-
We replaced the final
sink(print)
without(into(Counter))
-
collections.Counter
(from Python's standard library) consumes an iterable (so it can be used withinto
) and associates each value with the number of times it was seen.
Abstraction
As it stands, the code throws all the details into our face. Let's cut it up into smaller portions and give them suggestive names, to give a higher-level overview of what the code is doing:
import os
from keyword import iskeyword
from collections import Counter
from liquidata import pipe, name, get, put, join, use
all_files = os.walk, join, name.path.dirs.files,
find_python_files = get.files * (join, { use(str.endswith, '.py') }) >> put.filename
file_contents = get.path.filename * os.path.join, open, join
discard_comments = use(str.split, '#', maxsplit=1), get[0]
find_keywords = str.split, join, { iskeyword }
fn = pipe(
all_files,
find_python_files,
file_contents,
discard_comments,
find_keywords,
out(into(Counter)))
And we're done!
Why do we have the current architecture?
The current implementation uses coroutines to push data through the pipeline.
This was motivated by the context in which the ancestor of liquidata
was written, where a single input stream was required be split into multiple independent output streams fed into separate sinks.
Let's take a step back and ask some questions about this choice: Do we need to push? Do we need coroutines? Why? When? What are the consequences? What alternatives are there?
Function composition
Let's consider a very simple pipeline, consisting of a linear sequence of maps:
pipe(f, g, h)
This could be implemented using any of
- coroutines
- generators
- asyncio
- function composition
Function composition is the simplest, so why bother with the others?
Changing stream length
Let's throw in a filter or a join:
pipe(f, { p }, g, h)
pipe(f, join, g, h)
Function composition no longer cuts the mustard, because there is no longer a 1-to-1 correspendence between items in the input and output streams: something is needed to shrink or expand the stream.
Stream bifurcation
A different complication, branches:
pipe(f, [x, y], g, h)
It's difficult to see how function composition and generators could deal with this.
Joining streams
That last pipe describes a graph that looks something like this:
x -- y
/
source -- f
\
g -- h
How about
sourceA -- a
\
g --- h
/
sourceB -- b
Generators can deal with this easily:
map(h, map(g, map(a, sourceA)
map(b, sourceB)))
but it's not obvious how this would work for function composition or coroutines.
liquidata
syntax for multiple sources
What would the liquidata
syntax for this look like?
Ideally we'd have another kind of bracket (we've exhausted the possibilities
that Python offers: ()
, []
, {}
). Let's imagine that <>
are valid
syntactic brackets, then we could have:
pipe(b, <sourceA, a>, g, h) # join two sources
pipe(b, [a, out.A], g, h) # branch out into two sinks
Working with syntax available in Python, how about:
pipe(b, source(sourceA, a), g, h)
Recall that the following are already synonymous in liquidata
pipe(f)(data)
pipe(source << data, f)
pipe(data >> source, f)
pipe(source(data), f)
so the following could work
pipe(source << sourceB, b, (source << sourceA , a), g, h)
liquidata
used to have a input-variable syntax (called slots) in its earlier
prototype. If something like it were resurrected, we could write something along
the lines of
fn = pipe(source << slot.B, b, (source << slot.A, a), g, h)
fn(A=sourceA, B=sourceB)
Synchronization
In liquidata
[]
-branches are called independent, because there is
absolutely no synchronization between them (in contrast with named branches
which are based on namespaces flowing through the pipe and managed with name
,
get
and put
). Once the stream has split, the branches can change the length
of the stream without the others knowing or caring about it.
We would need to think about the synchronization consequences for multiple independent input streams. I guess that the answer is: it's up to the user to make sure something sensible happens. Essentially, the user has the same freedoms and responsibilities as when joining multiple sources in classically-written generator networks:
map(h, map(g, map(<filter or join>, sourceA)
map(b, sourceB)))
close_all
Consider the following in current liquidata
pipe(source << itertools.count(), take(3))
pipe(source << itertools.count(), take(3, close_all=True))
Because of the pull-architecture, the first never returns. In a pull architecture the issue doesn't arise.
So what?
I would like to remove the universal reliance on coroutines, with two main goals
-
Enabling things that were impossible before.
-
Simplifying portions of the code which don't need coroutines.