Why do we have the current architecture?

The current implementation uses coroutines to push data through the pipeline.

This was motivated by the context in which the ancestor of liquidata was written, where a single input stream was required be split into multiple independent output streams fed into separate sinks.

Let's take a step back and ask some questions about this choice: Do we need to push? Do we need coroutines? Why? When? What are the consequences? What alternatives are there?

Function composition

Let's consider a very simple pipeline, consisting of a linear sequence of maps:

pipe(f, g, h)

This could be implemented using any of

  • coroutines
  • generators
  • asyncio
  • function composition

Function composition is the simplest, so why bother with the others?

Changing stream length

Let's throw in a filter or a join:

pipe(f, { p }, g, h)
pipe(f, join, g, h)

Function composition no longer cuts the mustard, because there is no longer a 1-to-1 correspendence between items in the input and output streams: something is needed to shrink or expand the stream.

Stream bifurcation

A different complication, branches:

pipe(f, [x, y], g, h)

It's difficult to see how function composition and generators could deal with this.

Joining streams

That last pipe describes a graph that looks something like this:

             x -- y
           /
source -- f
           \
             g -- h

How about

sourceA -- a
            \
             g --- h
            /
sourceB -- b

Generators can deal with this easily:

map(h, map(g, map(a, sourceA)
              map(b, sourceB)))

but it's not obvious how this would work for function composition or coroutines.

liquidata syntax for multiple sources

What would the liquidata syntax for this look like?

Ideally we'd have another kind of bracket (we've exhausted the possibilities that Python offers: (), [], {}). Let's imagine that <> are valid syntactic brackets, then we could have:

pipe(b, <sourceA, a>, g, h)  # join two sources
pipe(b, [a, out.A],   g, h)  # branch out into two sinks

Working with syntax available in Python, how about:

pipe(b, source(sourceA, a), g, h)

Recall that the following are already synonymous in liquidata

pipe(f)(data)
pipe(source << data, f)
pipe(data >> source, f)
pipe(source(data), f)

so the following could work

pipe(source << sourceB, b, (source << sourceA , a), g, h)

liquidata used to have a input-variable syntax (called slots) in its earlier prototype. If something like it were resurrected, we could write something along the lines of

fn = pipe(source << slot.B, b, (source << slot.A, a), g, h)
fn(A=sourceA, B=sourceB)

Synchronization

In liquidata []-branches are called independent, because there is absolutely no synchronization between them (in contrast with named branches which are based on namespaces flowing through the pipe and managed with name, get and put). Once the stream has split, the branches can change the length of the stream without the others knowing or caring about it.

We would need to think about the synchronization consequences for multiple independent input streams. I guess that the answer is: it's up to the user to make sure something sensible happens. Essentially, the user has the same freedoms and responsibilities as when joining multiple sources in classically-written generator networks:

map(h, map(g, map(<filter or join>, sourceA)
              map(b,                sourceB)))

close_all

Consider the following in current liquidata

pipe(source << itertools.count(), take(3))
pipe(source << itertools.count(), take(3, close_all=True))

Because of the pull-architecture, the first never returns. In a pull architecture the issue doesn't arise.

So what?

I would like to remove the universal reliance on coroutines, with two main goals

  • Enabling things that were impossible before.

  • Simplifying portions of the code which don't need coroutines.