Task Dependency Principles¶

Objective:

Learning how ML workflows can be represented using acyclic task graphs, how to construct them, and how they are essential to portability.

Principles:

  1. Workflows are represented using dependencies of their tasks modeled by acyclic graphs.
  2. The actual task graph can be displayed when selecting the graphviz runner.
  3. Individual tasks are implemented using stateless or stateful actors.
  4. Both the train and apply modes of the same pipeline are represented using tightly coupled but distinct task graphs.
  5. Operators are reusable components that facilitate the construction of a task graph topology.
  6. Operator composition is a non-linear process combining two operators to produce more complex task graph topology.
  7. Task graphs can be executed using different runners.

Workflow Paradigm¶

taskgraph

  • conventional programming involves direct coding of the processing logic using entities of the given language
  • workflow-based programming adds an extra layer on top breaking the logic into steps with defined dependencies and data exchanges
  • such workflow is represented using a directed acyclic graph (DAG)
  • allows for introspection, scheduling, scalability, reusability, portability

Runner Portability¶

General ability to transcode DAGs at runtime to be launched using different executors.

Let's demonstrate this capability by executing the following artifact using three possible runners:

In [1]:
from forml import project
from forml.pipeline import payload
from dummycatalog import Foo

SOURCE = project.Source.query(Foo)
PIPELINE = payload.ToPandas()
PROJECT = SOURCE.bind(PIPELINE)

Dask Runner¶

Dask is configured as the default runner on this tutorial platform so it gets used just like this:

In [2]:
PROJECT.launcher.apply()
Out[2]:
Timestamp Label Level Value Bar
0 2021-05-05 03:12:19 1 Alpha 0.26 1
1 2021-05-11 11:27:50 0 Tango 0.94 3
2 2021-05-11 17:35:27 0 Zulu 0.57 4
3 2021-05-06 19:49:43 0 Uniform 0.69 2
4 2021-05-12 08:53:35 0 Xray 0.83 5
5 2021-05-12 22:06:04 0 Victor 0.61 6
6 2021-05-07 13:17:43 1 Echo 0.12 1
7 2021-05-13 16:25:18 0 Whiskey 0.78 3
8 2021-05-13 06:31:58 0 November 0.92 4
9 2021-05-08 15:48:20 0 Yankee 0.68 5
10 2021-05-14 19:56:01 1 Charlie 0.35 2
11 2021-05-14 04:03:32 0 Mike 0.54 6
12 2021-05-09 01:18:13 1 Bravo 0.07 1
13 2021-05-15 19:24:46 0 Romeo 0.58 3
14 2021-05-15 21:31:22 0 Oscar 0.84 4
15 2021-05-16 23:48:57 0 Quebec 0.74 5
16 2021-05-16 00:56:39 1 Foxtrot 0.45 2
17 2021-05-10 16:06:06 0 Papa 0.59 6
18 2021-05-17 14:17:43 1 Delta 0.33 1
19 2021-05-17 06:52:51 0 Siera 0.72 6

Spark Runner¶

Apache Spark can be explicitly selected to execute the workflow.

(Hint: if you are quick enough, you can visit the Spark Dashboard at http://localhost:4040/ while the workflow is running)

In [3]:
PROJECT.launcher(runner='spark').apply()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/30 23:37:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                
Out[3]:
Timestamp Label Level Value Bar
0 2021-05-05 03:12:19 1 Alpha 0.26 1
1 2021-05-11 11:27:50 0 Tango 0.94 3
2 2021-05-11 17:35:27 0 Zulu 0.57 4
3 2021-05-06 19:49:43 0 Uniform 0.69 2
4 2021-05-12 08:53:35 0 Xray 0.83 5
5 2021-05-12 22:06:04 0 Victor 0.61 6
6 2021-05-07 13:17:43 1 Echo 0.12 1
7 2021-05-13 16:25:18 0 Whiskey 0.78 3
8 2021-05-13 06:31:58 0 November 0.92 4
9 2021-05-08 15:48:20 0 Yankee 0.68 5
10 2021-05-14 19:56:01 1 Charlie 0.35 2
11 2021-05-14 04:03:32 0 Mike 0.54 6
12 2021-05-09 01:18:13 1 Bravo 0.07 1
13 2021-05-15 19:24:46 0 Romeo 0.58 3
14 2021-05-15 21:31:22 0 Oscar 0.84 4
15 2021-05-16 23:48:57 0 Quebec 0.74 5
16 2021-05-16 00:56:39 1 Foxtrot 0.45 2
17 2021-05-10 16:06:06 0 Papa 0.59 6
18 2021-05-17 14:17:43 1 Delta 0.33 1
19 2021-05-17 06:52:51 0 Siera 0.72 6

Graphviz Runner¶

The Graphviz (pseudo)runner displays the given task graph visually instead of actually executing it:

In [4]:
PROJECT.launcher(runner='graphviz').apply()
Out[4]:
%3 139958350509568 RowDriver(Reader(con=LazyReaderBackend), Statement(Foo)).apply 139958350509248 ToPandas.apply 139958350509568->139958350509248 0 139958350501168 Captor(Value).apply 139958350509248->139958350501168 0

This shows our workflow consists of three tasks:

  • The initial feed Reader task wrapping our SOURCE statement.
  • Our actual ToPandas task as the only provided PIPELINE step.
  • Finally the system-generated Captor task injected by the interactive launcher.

Task Actors¶

Actors are representing the unit of work within the workflow - the vertices in the task graph.

They are interconnected using input and output ports.

actor

ForML supports a couple of different ways to implement actors. For the sake of this tutorial, we will stick just with the high-level concept of wrapping decorators.

Stateless Actor¶

A stateless actor represents a task whose output depends purely on its input.

Let's implement a simple stateless actor called LowerActor that is extracting and lower-casing a specific column from a Pandas dataframe:

In [5]:
import pandas
from forml.pipeline import wrap

@wrap.Actor.apply
def LowerActor(data: pandas.DataFrame, *, column: str) -> pandas.Series:
    # return data[column].str.lower()
    return data[column].apply(lambda v: v.lower())

This actor has a single input port (data parameter) and a single output port (the return value). The column argument here is a hyperparameter. Let's test this actor:

In [6]:
df = pandas.DataFrame({'greetings': ['Hello', 'Hola', 'Ola', 'Ciao']})

lower_actor = LowerActor(column='greetings')
lower_actor.apply(df)
Out[6]:
0    hello
1     hola
2      ola
3     ciao
Name: greetings, dtype: object

Excercise: Write an actor returning a selected column as the Ascii code of its lower-cased first letter¶

Let's call this actor OrdActor and similarly to the LowerActor parametrize it with the target column name.

Hint: You can use the ord() function to turn a character to its Ascii code.

In [7]:
@wrap.Actor.apply
def OrdActor(data: pandas.DataFrame, *, column: str) -> pandas.Series:
    return data[column].apply(lambda v: ord(v[0].lower()))

To test this actor:

In [8]:
ord_actor = OrdActor(column='greetings')
ord_actor.apply(df)
Out[8]:
0    104
1    104
2    111
3     99
Name: greetings, dtype: int64

Stateful Actor¶

Stateful actors are more complex - their output is based on not just the input but also their inner state which it acquires during a separate phase called the train mode. For the normal mode, we then use the term apply mode.

Let's implement an actor called CenterActor that's returning a selected column with the mean value removed:

In [9]:
import typing

@wrap.Actor.train  # starting with the actor train mode
def CenterActor(
    state: typing.Optional[float],  # previous state
    data: pandas.DataFrame,         # input data points
    labels: pandas.Series,          # target labels
    *,
    column: str                     # hyperparameter
) -> float:                         # new state
    return data[column].mean()


@CenterActor.apply  # finishing the CenterActor apply mode
def CenterActor(
    state: float, data: pandas.DataFrame, *, column: str
) -> pandas.DataFrame:
    return data[column] - state

The implementation has two steps:

  1. In the train mode, the actor has three input ports the previous state, the source data, and the target labels. It has a single output producing a new state acquired during this training.
  2. In the apply mode, the actor gets the recently trained state and the input data to have the mean removed.

Let's now test this actor:

In [10]:
df = pandas.DataFrame({'rating': [0.3, 0.1, 0.7, 0.6, 0.4]})

center_actor = CenterActor(column='rating')
center_actor.train(df, None)  # train mode
center_actor.apply(df)        # apply mode
Out[10]:
0   -0.12
1   -0.32
2    0.28
3    0.18
4   -0.02
Name: rating, dtype: float64

Operators¶

Operators are high-level workflow entities. While actors implement the tasks, operators deal with their wiring - the actual topology of the task graph.

Let's create a traditional mapper operator using our previously implemented actor OrdActor. There is a number of different ways to implement ForML operators, for simplicity, we are again going to use available wrapping decorators:

In [11]:
Ord = wrap.Operator.mapper(OrdActor)

We can now build a true workflow using this operator as our PIPELINE:

In [12]:
FEATURES = Foo.select(Foo.Level, Foo.Value)
SOURCE = (
    project.Source.query(FEATURES, labels=Foo.Label)
    >> payload.ToPandas()
)
PIPELINE = Ord(column="Level")
SOURCE.bind(PIPELINE).launcher(runner="graphviz").apply()
Out[12]:
%3 139957684938416 RowDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value])).apply 139957685416112 ToPandas.apply 139957684938416->139957685416112 0 139957685415872 OrdActor(column='Level').apply 139957685416112->139957685415872 0 139957685411392 Captor(Value).apply 139957685415872->139957685411392 0

The OrdActor is now part of the processing flow.

Excercise: Use decorators to implement a mapper for scaling values of a selected column to [0-1]¶

In [13]:
@wrap.Actor.train
def MinMax(
    state: typing.Optional[tuple[float, float]],
    data: pandas.DataFrame,
    labels: pandas.Series,
    *,
    column: str
) -> tuple[float, float]:  # the state is a tuple of min and max - min
    min_ = data[column].min()
    return min_, data[column].max() - min_


@wrap.Operator.mapper  # this will turn it into Operator
@MinMax.apply
def MinMax(
    state: tuple[float, float], data: pandas.DataFrame, *, column: str
) -> pandas.DataFrame:
    data[column] = (data[column] - state[0]) / state[1]
    return data

Let's confirm this can be composed as a PIPELINE component:

In [14]:
PIPELINE = MinMax(column='Value')

SOURCE.bind(PIPELINE).launcher(runner='graphviz').apply()
Out[14]:
%3 139957684940576 RowDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value])).apply 139957684932576 ToPandas.apply 139957684940576->139957684932576 0 139957684937936 MinMax(column='Value').setstate.apply 139957684932576->139957684937936 1 139957581521696 Loader 139957581521696->139957684937936 0 139957685431936 Captor(Value).apply 139957684937936->139957685431936 0

Note the extra Loader task providing the state for the MinMax actor in the apply mode.

Train-Apply Workflow Duality¶

Each operator carries a definition of its task graph composition for both the train vs apply modes.

For example, the Mapper operator plugs into the flow channels (trunk) as follows:

Mapper

So far we've only been launching the workflow in the apply mode. Let's see how much different the task graph is going to be produced for the same workflow when in train mode.

In [15]:
SOURCE.bind(PIPELINE).launcher(runner='graphviz').train()
Out[15]:
%3 139957685405792 TableDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value, Foo.Label])).apply 139957581367712 Slicer(range(0, 2), 2).apply 139957685405792->139957581367712 0 139957581585024 Getter#0 139957581367712->139957581585024 0 139957581579792 Getter#1 139957581367712->139957581579792 0 139957581368752 ToPandas.apply 139957581585024->139957581368752 0 139957581371232 ToPandas.apply 139957581579792->139957581371232 0 139957581366672 MinMax(column='Value').setstate.train 139957581368752->139957581366672 1 139957581377952 MinMax(column='Value').setstate.apply 139957581368752->139957581377952 1 139957581526928 Committer 139957581526592 Dumper 139957581526592->139957581526928 0 139957581366672->139957581526592 0 139957581366672->139957581377952 0 139957581524384 Loader 139957581524384->139957581366672 0 139957581371232->139957581366672 2 139957581366752 Captor(Value).train 139957581371232->139957581366752 1 139957581377952->139957581366752 0

Using the Dask runner, we can use the same launcher to execute the train mode workflow immediately followed by the apply mode to see the MinMax actor doing the job (Value is scaled between 0 and 1):

In [16]:
launcher = SOURCE.bind(PIPELINE).launcher(runner='dask')
launcher.train()
launcher.apply()
INFO: 2023-05-30 23:37:24,198: lazy: Loading Foo
INFO: 2023-05-30 23:37:26,360: lazy: Loading Foo
Out[16]:
Level Value
0 Alpha 0.218391
1 Tango 1.000000
2 Zulu 0.574713
3 Uniform 0.712644
4 Xray 0.873563
5 Victor 0.620690
6 Echo 0.057471
7 Whiskey 0.816092
8 November 0.977011
9 Yankee 0.701149
10 Charlie 0.321839
11 Mike 0.540230
12 Bravo 0.000000
13 Romeo 0.586207
14 Oscar 0.885057
15 Quebec 0.770115
16 Foxtrot 0.436782
17 Papa 0.597701
18 Delta 0.298851
19 Siera 0.747126

Library Operators¶

Operators are often well suited for reusability. Collections of useful operators can be maintained in the form of internal or public libraries. Let's employ the upstream payload.MapReduce operator with our OrdActor and CenterActor actors implemented previously:

In [17]:
PIPELINE = payload.MapReduce(
    OrdActor.builder(column="Level"), CenterActor.builder(column="Value")
) >> MinMax(column="Level")

SOURCE.bind(PIPELINE).launcher(runner="graphviz").train()
Out[17]:
%3 139957685426736 TableDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value, Foo.Label])).apply 139957685435056 Slicer(range(0, 2), 2).apply 139957685426736->139957685435056 0 139957581847360 Getter#0 139957685435056->139957581847360 0 139957581844144 Getter#1 139957685435056->139957581844144 0 139957685422976 ToPandas.apply 139957581847360->139957685422976 0 139957685430976 ToPandas.apply 139957581844144->139957685430976 0 139957685427776 OrdActor(column='Level').apply 139957685422976->139957685427776 0 139957685407472 CenterActor(column='Value').setstate.apply 139957685422976->139957685407472 1 139957685411712 CenterActor(column='Value').setstate.train 139957685422976->139957685411712 1 139957685419552 PandasConcat(axis='columns').apply 139957685427776->139957685419552 0 139957685419952 MinMax(column='Level').setstate.train 139957685419552->139957685419952 1 139957685408112 MinMax(column='Level').setstate.apply 139957685419552->139957685408112 1 139957685407472->139957685419552 1 139957581843040 Committer 139957581843328 Dumper 139957581843328->139957581843040 0 139957581844672 Dumper 139957581844672->139957581843040 1 139957685419952->139957581843328 0 139957685419952->139957685408112 0 139957581842848 Loader 139957581842848->139957685419952 0 139957685430976->139957685419952 2 139957685405952 Captor(Value).train 139957685430976->139957685405952 1 139957685430976->139957685411712 2 139957685408112->139957685405952 0 139957685411712->139957685407472 0 139957685411712->139957581844672 0 139957581845536 Loader 139957581845536->139957685411712 0

We can again switch to the Dask runner to execute the pipeline (in both modes) to see the processing result (this time the MinMax scaler is applied to the Level column transformed using the OrdActor in parallel to the CenterActor removing mean from the Value column):

In [18]:
launcher = SOURCE.bind(PIPELINE).launcher(runner='dask')
launcher.train()
launcher.apply()
Out[18]:
Level Value
0 0.00 -0.3205
1 0.76 0.3595
2 1.00 -0.0105
3 0.80 0.1095
4 0.92 0.2495
5 0.84 0.0295
6 0.16 -0.4605
7 0.88 0.1995
8 0.52 0.3395
9 0.96 0.0995
10 0.08 -0.2305
11 0.48 -0.0405
12 0.04 -0.5105
13 0.68 -0.0005
14 0.56 0.2595
15 0.64 0.1595
16 0.20 -0.1305
17 0.60 0.0095
18 0.12 -0.2505
19 0.72 0.1395

Implementing Custom Operator¶

Let's implement a custom Balancer operator inserting a 2x2 actor into the train and label paths.

Balancer

  • The actor steps into the label and train channels using its 2 input and 2 output ports
  • The apply channel is unaffected (balancing is only relevant to the train mode)

Before we focus on the operator itself, we start with a (stateless) oversampling actor simply based on the Imbalanced-learn library:

In [19]:
from imblearn import over_sampling

@wrap.Actor.apply
def OverSampler(
    features: pandas.DataFrame,
    labels: pandas.Series,
    *,
    random_state: typing.Optional[int] = None
):
    """Stateless actor with two input and two output ports for
    oversampling the features/labels of the minor class.
    """
    return over_sampling.RandomOverSampler(
        random_state=random_state
    ).fit_resample(features, labels)

Quick test to see imbalanced input is transformed to balanced output:

In [20]:
OverSampler(random_state=42).apply([[1], [0], [1]], [1, 0, 1])
Out[20]:
([[1], [0], [1], [0]], [1, 0, 1, 0])

Raw operator inherits the flow.Operator and implements the compose method:

In [21]:
from forml import flow

class Balancer(flow.Operator):
    """Balancer operator inserting the provided sampler into
    the ``train`` & ``label`` paths.
    """

    def __init__(
        self,
        sampler: flow.Builder = OverSampler.builder(random_state=42),
    ):
        self._sampler = sampler

    def compose(self, scope: flow.Composable) -> flow.Trunk:
        left = scope.expand()
        sampler = flow.Worker(self._sampler, 2, 2)  # 2x2 node
        sampler[0].subscribe(left.train.publisher)  # train -> in0
        new_features = flow.Future()
        new_features[0].subscribe(sampler[0])       # out0 -> newtrain
        sampler[1].subscribe(left.label.publisher)  # label -> in1
        new_labels = flow.Future()
        new_labels[0].subscribe(sampler[1])         # out1 -> newlabel
        return left.use(
            train=left.train.extend(tail=new_features),
            label=left.label.extend(tail=new_labels),
        )

Visualizing a pipeline with this Balancer operator produces:

In [22]:
PIPELINE = Balancer()
SOURCE.bind(PIPELINE).launcher(runner='graphviz').train()
Out[22]:
%3 139957685427776 TableDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value, Foo.Label])).apply 139957304467808 Slicer(range(0, 2), 2).apply 139957685427776->139957304467808 0 139957304522880 Getter#0 139957304467808->139957304522880 0 139957304522160 Getter#1 139957304467808->139957304522160 0 139957304471648 ToPandas.apply 139957304522880->139957304471648 0 139957304469728 ToPandas.apply 139957304522160->139957304469728 0 139957304469008 OverSampler(random_state=42).apply 139957304471648->139957304469008 0 139957304609040 Getter#0 139957304469008->139957304609040 0 139957304609232 Getter#1 139957304469008->139957304609232 0 139957304469728->139957304469008 1 139957304471968 Captor(Value).train 139957304609040->139957304471968 0 139957304609232->139957304471968 1

Quick test to see the number of labels when engaging our Balancer confirms they are now balanced:

In [23]:
SOURCE.bind(PIPELINE).launcher.train().labels.value_counts()
Out[23]:
Label
1    14
0    14
Name: count, dtype: int64

Operator Unit Testing¶

ForML provides an elegant operator unit testing facility:

In [24]:
from forml import testing

class TestBalancer(testing.operator(Balancer)):

    default_oversample = (
        testing.Case()
        .train([[1], [1], [0]], [1, 1, 0])                   # input
        .returns([[1], [1], [0], [0]], labels=[1, 1, 0, 0])  # assert
    )

Tests are normally launched from CLI so the following wouldn't be needed:

In [25]:
import unittest
suite = unittest.TestSuite()
suite.addTest(TestBalancer('test_default_oversample'))
unittest.TextTestRunner().run(suite)
.
----------------------------------------------------------------------
Ran 1 test in 1.942s

OK
Out[25]:
<unittest.runner.TextTestResult run=1 errors=0 failures=0>

Auto-wrapping 3rd-party Components¶

To simplify integration of existing 3rd party algorithms (i.e. transformers/estimators), ForML allows to transparently turn them into operators right upon importing:

In [26]:
with wrap.importer():
    from sklearn.linear_model import LogisticRegression

isinstance(LogisticRegression(), flow.Operator)
Out[26]:
True

Pipeline¶

Let's now put all this together and compose a full pipeline with the different transforming operators defined previously and the final classifier:

In [27]:
PIPELINE = (
    Balancer()
    >> payload.MapReduce(
        OrdActor.builder(column="Level"),
        CenterActor.builder(column="Value"),
    )
    >> MinMax(column="Level")
    >> LogisticRegression(random_state=42)
)

Excercise: Compare the final train/apply task graphs¶

In [28]:
SOURCE.bind(PIPELINE).launcher(runner="graphviz").train()
Out[28]:
%3 139957304473408 TableDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value, Foo.Label])).apply 139957304474608 Slicer(range(0, 2), 2).apply 139957304473408->139957304474608 0 139957303456000 Getter#0 139957304474608->139957303456000 0 139957303456144 Getter#1 139957304474608->139957303456144 0 139957304462608 ToPandas.apply 139957303456000->139957304462608 0 139957304469168 ToPandas.apply 139957303456144->139957304469168 0 139957304935424 OverSampler(random_state=42).apply 139957304462608->139957304935424 0 139957303457440 Getter#0 139957304935424->139957303457440 0 139957303457584 Getter#1 139957304935424->139957303457584 0 139957304469168->139957304935424 1 139957304462448 OrdActor(column='Level').apply 139957303457440->139957304462448 0 139957304470208 CenterActor(column='Value').setstate.apply 139957303457440->139957304470208 1 139957304463008 CenterActor(column='Value').setstate.train 139957303457440->139957304463008 1 139957304469968 MinMax(column='Level').setstate.train 139957303457584->139957304469968 2 139957304839760 LogisticRegression(random_state=42).setstate.train 139957303457584->139957304839760 2 139957581864112 Captor(Value).train 139957303457584->139957581864112 1 139957303457584->139957304463008 2 139957304469808 PandasConcat(axis='columns').apply 139957304462448->139957304469808 0 139957304469808->139957304469968 1 139957304473728 MinMax(column='Level').setstate.apply 139957304469808->139957304473728 1 139957304470208->139957304469808 1 139957303414928 Committer 139957303383600 Dumper 139957303383600->139957303414928 0 139957303383744 Dumper 139957303383744->139957303414928 1 139957303420544 Dumper 139957303420544->139957303414928 2 139957304469968->139957303383600 0 139957304469968->139957304473728 0 139957303414688 Loader 139957303414688->139957304469968 0 139957304473728->139957304839760 1 139957304473728->139957581864112 0 139957304839760->139957303383744 0 139957303382976 Loader 139957303382976->139957304839760 0 139957304463008->139957304470208 0 139957304463008->139957303420544 0 139957303421456 Loader 139957303421456->139957304463008 0
In [29]:
SOURCE.bind(PIPELINE).launcher(runner="graphviz").apply()
Out[29]:
%3 139957304938944 RowDriver(Reader(con=LazyReaderBackend), Statement(Foo[Foo.Level, Foo.Value])).apply 139957304940464 ToPandas.apply 139957304938944->139957304940464 0 139957304940624 OrdActor(column='Level').apply 139957304940464->139957304940624 0 139957304949904 CenterActor(column='Value').setstate.apply 139957304940464->139957304949904 1 139957304949984 PandasConcat(axis='columns').apply 139957304940624->139957304949984 0 139957304949104 MinMax(column='Level').setstate.apply 139957304949984->139957304949104 1 139957304949904->139957304949984 1 139957303459120 Loader 139957303459120->139957304949104 0 139957304938064 LogisticRegression(random_state=42).setstate.apply 139957304949104->139957304938064 1 139957303560272 Loader 139957303560272->139957304938064 0 139957304950544 Captor(Value).apply 139957304938064->139957304950544 0 139957304650640 Loader 139957304650640->139957304949904 0