Pipeline Enhancements¶

Let's go through one more development iteration...

Model Ensembling¶

Instead of just the plain LogisticRegression used in our base model pipeline, we can combine multiple different classifiers using the stacked ensembling technique to further improve the performance. ForML already comes with one possible operator implementing this concept so let's try to use it.

Adding the Ensemble¶

Add a basic model ensemble of two classifiers with just two-fold crossvalidation to the avazuctr/pipeline.py:

  1. Open the avazuctr/pipeline.py component.
  2. Add all the required imports:
from sklearn import model_selection

from forml import project
from forml.pipeline import ensemble, wrap

with wrap.importer():
    from category_encoders import TargetEncoder
    from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import MinMaxScaler
  1. Update it with the code below using the ensemble of GradientBoostingClasifier and RandomForestClassifier:
CATEGORICAL_COLUMNS = ...  # Keep original

STACK = ensemble.FullStack(
    GradientBoostingClassifier(random_state=42),
    RandomForestClassifier(random_state=42),
    crossvalidator=model_selection.StratifiedKFold(n_splits=2, shuffle=True, random_state=42),
)

PIPELINE = (
    TargetEncoder(cols=CATEGORICAL_COLUMNS)
    >> MinMaxScaler()
    >> STACK
    >> LogisticRegression(max_iter=1000, random_state=42)
)

# Registering the pipeline
project.setup(PIPELINE)
  1. SAVE THE avazuctr/pipeline.py FILE!

Evaluating the Change¶

Let's now run the project evaluation to see whether this change was worth it:

In [2]:
! forml project eval
running eval
0.3911373233919557

Excellent, this is an improvement!

In [3]:
! git add avazuctr/pipeline.py

Reviewing the Ensembling Task Graph¶

Visualizing the ensembling task graph can help to understand the principle:

In [4]:
! forml project train -R graphviz
running train

forml-solution-avazuctr/forml.dot.svg: Ensembling Task Graph

Balancing the Target Classes¶

As noticed during the exploration, the target variable is highly imbalanced (417,963 in the negative class vs only 82,037 in the positive). This might be getting the model biased towards the majority class.

Let's try to use our Balancer implemented previously to see if it brings any improvements.

Adding the Balancer¶

Edit the pyproject.toml and add the new dependency of imbalanced-learn==0.10.1:

  1. Open the pyproject.toml.
  2. Update it with the config below adding the new dependency of imbalanced-learn==0.10.1:
[project]
name = "forml-solution-avazuctr"
version = "0.1"
dependencies = [
    "category-encoders==2.6.0",
    "forml==0.93",
    "imbalanced-learn==0.10.1 ",
    "openschema==0.7",
    "pandas==2.0.1",
    "scikit-learn==1.2.2"
]

[tool.forml]
package = "avazuctr"
  1. SAVE THE pyproject.toml FILE!
In [5]:
! git add pyproject.toml

Now, add the Balancer implementation to the avazuctr/pipeline.py:

  1. Open the avazuctr/pipeline.py component.
  2. Add all the required imports:
import typing

from imblearn import over_sampling
from sklearn import model_selection

from forml import flow, project
from forml.pipeline import ensemble, wrap

with wrap.importer():
    from category_encoders import TargetEncoder
    from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import MinMaxScaler
  1. Add the OverSampler actor:
@wrap.Actor.apply
def OverSampler(
    features, labels, *, random_state: typing.Optional[int] = None
):
    """Stateless actor with two input and two output ports for oversampling the features/labels of the minor class."""
    return over_sampling.RandomOverSampler(
        random_state=random_state
    ).fit_resample(features, labels)
  1. Add the Balancer operator implementation:
class Balancer(flow.Operator):
    """Balancer operator inserting the provided sampler into the ``train`` & ``label`` paths."""

    def __init__(
        self,
        sampler: flow.Builder = OverSampler.builder(random_state=42),
    ):
        self._sampler = sampler

    def compose(self, scope: flow.Composable) -> flow.Trunk:
        left = scope.expand()
        sampler = flow.Worker(self._sampler, 2, 2)
        sampler[0].subscribe(left.train.publisher)
        new_features = flow.Future()
        new_features[0].subscribe(sampler[0])
        sampler[1].subscribe(left.label.publisher)
        new_labels = flow.Future()
        new_labels[0].subscribe(sampler[1])
        return left.use(
            train=left.train.extend(tail=new_features),
            label=left.label.extend(tail=new_labels),
        )
  1. Insert the Balancer into the operator right after the TargetEncoder:
CATEGORICAL_COLUMNS = ... # Keep original
STACK = ... # Keep original

PIPELINE = (
    TargetEncoder(cols=CATEGORICAL_COLUMNS)
    >> Balancer()    # Inserting the Balancer
    >> MinMaxScaler()
    >> STACK
    >> LogisticRegression(warm_start=True, max_iter=1000, random_state=42)
)

# Registering the pipeline
project.setup(PIPELINE)
  1. SAVE THE avazuctr/pipeline.py FILE!

Evaluating the Change¶

Let's quickly confirm the data is now balanced:

In [6]:
from forml import project
from avazuctr import pipeline
PROJECT = project.open(path=".", package="avazuctr")
PROJECT.components.source.bind(
    pipeline.Balancer()
).launcher.train().labels.value_counts()
Out[6]:
click
0    417919
1    417919
Name: count, dtype: int64

Good, let's kick off the evaluation:

In [7]:
! forml project eval
running eval
0.38634029551291765

That's another improvement!

In [8]:
! git add avazuctr/pipeline.py

Reviewing the Final Task Graph¶

To visualize the the final task graph:

In [9]:
! forml project train -R graphviz
running train

forml-solution-avazuctr/forml.dot.svg: Final Task Graph

Adding the Balancer Unit Test¶

Let's also add the Balancer unit test implemented previously to the project tests:

In [10]:
! touch tests/test_pipeline.py

Edit the created test_pipeline.py and implement the unit test:

  1. Open the test_pipeline.py.
  2. Update it with the code below providing the TestBalancer unit test implementation:
from forml import testing
from avazuctr import pipeline

class TestBalancer(testing.operator(pipeline.Balancer)):
    """Balancer unit tests."""

    default_oversample = (
        testing.Case()
        .train([[1], [1], [0]], [1, 1, 0])
        .returns([[1], [1], [0], [0]], labels=[1, 1, 0, 0])
    )
  1. SAVE THE test_pipeline.py FILE!
In [11]:
! git add tests/test_pipeline.py

Let's trigger the project tests:

In [12]:
! forml project test 2>&1 | tail -n 20
    result = self.execute(*args)
  File "/usr/local/lib/python3.10/site-packages/forml/flow/_code/target/user.py", line 196, in execute
    return self.action(self.builder(), *args)
  File "/usr/local/lib/python3.10/site-packages/forml/flow/_code/target/user.py", line 150, in __call__
    result = actor.apply(*args)
  File "/usr/local/lib/python3.10/site-packages/forml/pipeline/wrap/_actor.py", line 166, in apply
    return self.Apply(*features, **self._kwargs)
  File "/opt/forml/workspace/3-solution/forml-solution-avazuctr/avazuctr/source.py", line 11, in TimeExtractor
    assert "hour" in features.columns, "Missing column: hour"
AssertionError: Missing column: hour
ok
test_valid_extraction (tests.test_source.TestTimeExtractor)
Test of Valid Extraction ... ok
test_default_oversample (tests.test_pipeline.TestBalancer)
Test of Default Oversample ... ok

----------------------------------------------------------------------
Ran 3 tests in 5.571s

OK