Objective:
Learning how ML workflows can be represented using acyclic task graphs, how to construct them, and how they are essential to portability.
Principles:
graphviz
runner.General ability to transcode DAGs at runtime to be launched using different executors.
Let's demonstrate this capability by executing the following artifact using three possible runners:
from forml import project
from forml.pipeline import payload
from dummycatalog import Foo
SOURCE = project.Source.query(Foo)
PIPELINE = payload.ToPandas()
PROJECT = SOURCE.bind(PIPELINE)
PROJECT.launcher.apply()
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 |
1 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 |
2 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 |
3 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 |
4 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 |
5 | 2021-05-12 22:06:04 | 0 | Victor | 0.61 | 6 |
6 | 2021-05-07 13:17:43 | 1 | Echo | 0.12 | 1 |
7 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 |
8 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 |
9 | 2021-05-08 15:48:20 | 0 | Yankee | 0.68 | 5 |
10 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 |
11 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 |
12 | 2021-05-09 01:18:13 | 1 | Bravo | 0.07 | 1 |
13 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 |
14 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 |
15 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 |
16 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 |
17 | 2021-05-10 16:06:06 | 0 | Papa | 0.59 | 6 |
18 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 |
19 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 |
Apache Spark can be explicitly selected to execute the workflow.
(Hint: if you are quick enough, you can visit the Spark Dashboard at http://localhost:4040/ while the workflow is running)
PROJECT.launcher(runner='spark').apply()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/05/30 23:37:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 |
1 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 |
2 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 |
3 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 |
4 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 |
5 | 2021-05-12 22:06:04 | 0 | Victor | 0.61 | 6 |
6 | 2021-05-07 13:17:43 | 1 | Echo | 0.12 | 1 |
7 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 |
8 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 |
9 | 2021-05-08 15:48:20 | 0 | Yankee | 0.68 | 5 |
10 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 |
11 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 |
12 | 2021-05-09 01:18:13 | 1 | Bravo | 0.07 | 1 |
13 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 |
14 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 |
15 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 |
16 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 |
17 | 2021-05-10 16:06:06 | 0 | Papa | 0.59 | 6 |
18 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 |
19 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 |
PROJECT.launcher(runner='graphviz').apply()
This shows our workflow consists of three tasks:
Reader
task wrapping our SOURCE
statement.ToPandas
task as the only provided PIPELINE
step.Captor
task injected by the interactive launcher.Actors are representing the unit of work within the workflow - the vertices in the task graph.
They are interconnected using input and output ports.
ForML supports a couple of different ways to implement actors. For the sake of this tutorial, we will stick just with the high-level concept of wrapping decorators.
A stateless actor represents a task whose output depends purely on its input.
Let's implement a simple stateless actor called LowerActor
that is extracting and lower-casing a specific column from a Pandas dataframe:
import pandas
from forml.pipeline import wrap
@wrap.Actor.apply
def LowerActor(data: pandas.DataFrame, *, column: str) -> pandas.Series:
# return data[column].str.lower()
return data[column].apply(lambda v: v.lower())
This actor has a single input port (data
parameter) and a single output port (the return value). The column
argument here is a hyperparameter.
Let's test this actor:
df = pandas.DataFrame({'greetings': ['Hello', 'Hola', 'Ola', 'Ciao']})
lower_actor = LowerActor(column='greetings')
lower_actor.apply(df)
0 hello 1 hola 2 ola 3 ciao Name: greetings, dtype: object
Let's call this actor OrdActor
and similarly to the LowerActor
parametrize it with the target column name.
Hint: You can use the ord()
function to turn a character to its Ascii code.
@wrap.Actor.apply
def OrdActor(data: pandas.DataFrame, *, column: str) -> pandas.Series:
return data[column].apply(lambda v: ord(v[0].lower()))
To test this actor:
ord_actor = OrdActor(column='greetings')
ord_actor.apply(df)
0 104 1 104 2 111 3 99 Name: greetings, dtype: int64
Stateful actors are more complex - their output is based on not just the input but also their inner state which it acquires during a separate phase called the train mode. For the normal mode, we then use the term apply mode.
Let's implement an actor called CenterActor
that's returning a selected column with the mean value removed:
import typing
@wrap.Actor.train # starting with the actor train mode
def CenterActor(
state: typing.Optional[float], # previous state
data: pandas.DataFrame, # input data points
labels: pandas.Series, # target labels
*,
column: str # hyperparameter
) -> float: # new state
return data[column].mean()
@CenterActor.apply # finishing the CenterActor apply mode
def CenterActor(
state: float, data: pandas.DataFrame, *, column: str
) -> pandas.DataFrame:
return data[column] - state
The implementation has two steps:
Let's now test this actor:
df = pandas.DataFrame({'rating': [0.3, 0.1, 0.7, 0.6, 0.4]})
center_actor = CenterActor(column='rating')
center_actor.train(df, None) # train mode
center_actor.apply(df) # apply mode
0 -0.12 1 -0.32 2 0.28 3 0.18 4 -0.02 Name: rating, dtype: float64
Operators are high-level workflow entities. While actors implement the tasks, operators deal with their wiring - the actual topology of the task graph.
Let's create a traditional mapper operator using our previously implemented actor OrdActor
. There is a number of different ways to implement ForML operators, for simplicity, we are again going to use available wrapping decorators:
Ord = wrap.Operator.mapper(OrdActor)
We can now build a true workflow using this operator as our PIPELINE
:
FEATURES = Foo.select(Foo.Level, Foo.Value)
SOURCE = (
project.Source.query(FEATURES, labels=Foo.Label)
>> payload.ToPandas()
)
PIPELINE = Ord(column="Level")
SOURCE.bind(PIPELINE).launcher(runner="graphviz").apply()
The OrdActor
is now part of the processing flow.
@wrap.Actor.train
def MinMax(
state: typing.Optional[tuple[float, float]],
data: pandas.DataFrame,
labels: pandas.Series,
*,
column: str
) -> tuple[float, float]: # the state is a tuple of min and max - min
min_ = data[column].min()
return min_, data[column].max() - min_
@wrap.Operator.mapper # this will turn it into Operator
@MinMax.apply
def MinMax(
state: tuple[float, float], data: pandas.DataFrame, *, column: str
) -> pandas.DataFrame:
data[column] = (data[column] - state[0]) / state[1]
return data
Let's confirm this can be composed as a PIPELINE
component:
PIPELINE = MinMax(column='Value')
SOURCE.bind(PIPELINE).launcher(runner='graphviz').apply()
Note the extra Loader
task providing the state for the MinMax
actor in the apply mode.
Each operator carries a definition of its task graph composition for both the train vs apply modes.
For example, the Mapper
operator plugs into the flow channels (trunk) as follows:
So far we've only been launching the workflow in the apply mode. Let's see how much different the task graph is going to be produced for the same workflow when in train mode.
SOURCE.bind(PIPELINE).launcher(runner='graphviz').train()
Using the Dask runner, we can use the same launcher to execute the train mode workflow immediately followed by the apply mode to see the MinMax
actor doing the job (Value
is scaled between 0
and 1
):
launcher = SOURCE.bind(PIPELINE).launcher(runner='dask')
launcher.train()
launcher.apply()
INFO: 2023-05-30 23:37:24,198: lazy: Loading Foo INFO: 2023-05-30 23:37:26,360: lazy: Loading Foo
Level | Value | |
---|---|---|
0 | Alpha | 0.218391 |
1 | Tango | 1.000000 |
2 | Zulu | 0.574713 |
3 | Uniform | 0.712644 |
4 | Xray | 0.873563 |
5 | Victor | 0.620690 |
6 | Echo | 0.057471 |
7 | Whiskey | 0.816092 |
8 | November | 0.977011 |
9 | Yankee | 0.701149 |
10 | Charlie | 0.321839 |
11 | Mike | 0.540230 |
12 | Bravo | 0.000000 |
13 | Romeo | 0.586207 |
14 | Oscar | 0.885057 |
15 | Quebec | 0.770115 |
16 | Foxtrot | 0.436782 |
17 | Papa | 0.597701 |
18 | Delta | 0.298851 |
19 | Siera | 0.747126 |
Operators are often well suited for reusability. Collections of useful operators can be maintained in the form of internal or public libraries.
Let's employ the upstream payload.MapReduce
operator with our OrdActor
and CenterActor
actors implemented previously:
PIPELINE = payload.MapReduce(
OrdActor.builder(column="Level"), CenterActor.builder(column="Value")
) >> MinMax(column="Level")
SOURCE.bind(PIPELINE).launcher(runner="graphviz").train()
We can again switch to the Dask runner to execute the pipeline (in both modes) to see the processing result (this time the MinMax
scaler is applied to the Level
column transformed using the OrdActor
in parallel to the CenterActor
removing mean from the Value
column):
launcher = SOURCE.bind(PIPELINE).launcher(runner='dask')
launcher.train()
launcher.apply()
Level | Value | |
---|---|---|
0 | 0.00 | -0.3205 |
1 | 0.76 | 0.3595 |
2 | 1.00 | -0.0105 |
3 | 0.80 | 0.1095 |
4 | 0.92 | 0.2495 |
5 | 0.84 | 0.0295 |
6 | 0.16 | -0.4605 |
7 | 0.88 | 0.1995 |
8 | 0.52 | 0.3395 |
9 | 0.96 | 0.0995 |
10 | 0.08 | -0.2305 |
11 | 0.48 | -0.0405 |
12 | 0.04 | -0.5105 |
13 | 0.68 | -0.0005 |
14 | 0.56 | 0.2595 |
15 | 0.64 | 0.1595 |
16 | 0.20 | -0.1305 |
17 | 0.60 | 0.0095 |
18 | 0.12 | -0.2505 |
19 | 0.72 | 0.1395 |
Let's implement a custom Balancer
operator inserting a 2x2 actor into the train and label paths.
Before we focus on the operator itself, we start with a (stateless) oversampling actor simply based on the Imbalanced-learn library:
from imblearn import over_sampling
@wrap.Actor.apply
def OverSampler(
features: pandas.DataFrame,
labels: pandas.Series,
*,
random_state: typing.Optional[int] = None
):
"""Stateless actor with two input and two output ports for
oversampling the features/labels of the minor class.
"""
return over_sampling.RandomOverSampler(
random_state=random_state
).fit_resample(features, labels)
Quick test to see imbalanced input is transformed to balanced output:
OverSampler(random_state=42).apply([[1], [0], [1]], [1, 0, 1])
([[1], [0], [1], [0]], [1, 0, 1, 0])
Raw operator inherits the flow.Operator
and implements the compose
method:
from forml import flow
class Balancer(flow.Operator):
"""Balancer operator inserting the provided sampler into
the ``train`` & ``label`` paths.
"""
def __init__(
self,
sampler: flow.Builder = OverSampler.builder(random_state=42),
):
self._sampler = sampler
def compose(self, scope: flow.Composable) -> flow.Trunk:
left = scope.expand()
sampler = flow.Worker(self._sampler, 2, 2) # 2x2 node
sampler[0].subscribe(left.train.publisher) # train -> in0
new_features = flow.Future()
new_features[0].subscribe(sampler[0]) # out0 -> newtrain
sampler[1].subscribe(left.label.publisher) # label -> in1
new_labels = flow.Future()
new_labels[0].subscribe(sampler[1]) # out1 -> newlabel
return left.use(
train=left.train.extend(tail=new_features),
label=left.label.extend(tail=new_labels),
)
Visualizing a pipeline with this Balancer
operator produces:
PIPELINE = Balancer()
SOURCE.bind(PIPELINE).launcher(runner='graphviz').train()
Quick test to see the number of labels when engaging our Balancer
confirms they are now balanced:
SOURCE.bind(PIPELINE).launcher.train().labels.value_counts()
Label 1 14 0 14 Name: count, dtype: int64
ForML provides an elegant operator unit testing facility:
from forml import testing
class TestBalancer(testing.operator(Balancer)):
default_oversample = (
testing.Case()
.train([[1], [1], [0]], [1, 1, 0]) # input
.returns([[1], [1], [0], [0]], labels=[1, 1, 0, 0]) # assert
)
Tests are normally launched from CLI so the following wouldn't be needed:
import unittest
suite = unittest.TestSuite()
suite.addTest(TestBalancer('test_default_oversample'))
unittest.TextTestRunner().run(suite)
. ---------------------------------------------------------------------- Ran 1 test in 1.942s OK
<unittest.runner.TextTestResult run=1 errors=0 failures=0>
To simplify integration of existing 3rd party algorithms (i.e. transformers/estimators), ForML allows to transparently turn them into operators right upon importing:
with wrap.importer():
from sklearn.linear_model import LogisticRegression
isinstance(LogisticRegression(), flow.Operator)
True
Let's now put all this together and compose a full pipeline with the different transforming operators defined previously and the final classifier:
PIPELINE = (
Balancer()
>> payload.MapReduce(
OrdActor.builder(column="Level"),
CenterActor.builder(column="Value"),
)
>> MinMax(column="Level")
>> LogisticRegression(random_state=42)
)
SOURCE.bind(PIPELINE).launcher(runner="graphviz").train()
SOURCE.bind(PIPELINE).launcher(runner="graphviz").apply()