Data Abstraction¶

Objective:

Learning how to define project data source requirements independently of actual data to achieve portability while maintaining reproducibility.

Principles:

  1. Projects are decoupled from physical data sources through their schemas.
  2. If capable of doing so, the runtime platform resolves project data requirements using one of its configured feeds.
  3. This tutorial workspace is provided with two dummy schemas (see the local dummycatalog.py module): Foo, Bar, and Baz.
  4. This tutorial platform is configured with a feed capable of resolving data for the first two of these schemas.

Schema Catalog¶

Schema catalogs act as mutual dataset references between projects (ML solutions) and platforms. They are regular Python modules with schema instances defined using the ForML Schema API.

decoupling

We are going to use the following dummy catalog throughout this tutorial:

In [1]:
from IPython import display
display.Code('dummycatalog.py')
Out[1]:
"""
This is a FooBarBaz schema catalog used throughout this tutorial.
"""
from forml.io import dsl


class Foo(dsl.Schema):
    """Foo dataset schema."""

    Timestamp = dsl.Field(dsl.Timestamp())
    Label = dsl.Field(dsl.Integer())
    Level = dsl.Field(dsl.String())
    Value = dsl.Field(dsl.Float())
    Bar = dsl.Field(dsl.Integer())


class Bar(dsl.Schema):
    """Bar dataset schema."""

    Key = dsl.Field(dsl.Integer())
    Length = dsl.Field(dsl.Float())
    Color = dsl.Field(dsl.String())


class Baz(dsl.Schema):
    """Baz dataset schema.

    Note: To demonstrate an unavailable dataset, this schema is deliberately
          not resolved by any of the feeds configured within this tutorial
          platform.
    """

    Key = dsl.Field(dsl.Integer())
    Name = dsl.Field(dsl.String())
    Dob = dsl.Field(dsl.Date())

Basic Data Loading¶

  • The term Artifact used below represents a handle to a particular ML project
  • Minimal ForML artifact = Source component + Pipeline component
  • Artifact can be launched in interactive mode using its .launcher property

Let's create and launch an example of a minimal artifact:

In [2]:
from forml import project
from forml.pipeline import payload
from dummycatalog import Foo, Bar, Baz

STATEMENT = Foo   # Minimal query statement refering to dummycatalog 
PIPELINE = payload.ToPandas()  # Minimal pipeline - a single operator
PROJECT = project.Source.query(STATEMENT).bind(PIPELINE)
PROJECT.launcher.apply()  # Triggering the launcher
INFO: 2023-05-30 23:32:47,003: lazy: Loading Foo
Out[2]:
Timestamp Label Level Value Bar
0 2021-05-05 03:12:19 1 Alpha 0.26 1
1 2021-05-11 11:27:50 0 Tango 0.94 3
2 2021-05-11 17:35:27 0 Zulu 0.57 4
3 2021-05-06 19:49:43 0 Uniform 0.69 2
4 2021-05-12 08:53:35 0 Xray 0.83 5
5 2021-05-12 22:06:04 0 Victor 0.61 6
6 2021-05-07 13:17:43 1 Echo 0.12 1
7 2021-05-13 16:25:18 0 Whiskey 0.78 3
8 2021-05-13 06:31:58 0 November 0.92 4
9 2021-05-08 15:48:20 0 Yankee 0.68 5
10 2021-05-14 19:56:01 1 Charlie 0.35 2
11 2021-05-14 04:03:32 0 Mike 0.54 6
12 2021-05-09 01:18:13 1 Bravo 0.07 1
13 2021-05-15 19:24:46 0 Romeo 0.58 3
14 2021-05-15 21:31:22 0 Oscar 0.84 4
15 2021-05-16 23:48:57 0 Quebec 0.74 5
16 2021-05-16 00:56:39 1 Foxtrot 0.45 2
17 2021-05-10 16:06:06 0 Papa 0.59 6
18 2021-05-17 14:17:43 1 Delta 0.33 1
19 2021-05-17 06:52:51 0 Siera 0.72 6

Advanced Query DSL¶

The data requirements can be refined more granularly using the Query DSL Syntax:

Column Projection¶

Extend the basic STATEMENT to select just the Foo.Level, Foo.Value and Foo.Timestamp columns.

Hints:

  • Use the Foo.select() method
  • Schema fields are referenced using the syntax of Schema.FieldName
In [3]:
STATEMENT = Foo.select(Foo.Level, Foo.Value, Foo.Timestamp)

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:49,136: lazy: Loading Foo
Out[3]:
Level Value Timestamp
0 Alpha 0.26 2021-05-05 03:12:19
1 Tango 0.94 2021-05-11 11:27:50
2 Zulu 0.57 2021-05-11 17:35:27
3 Uniform 0.69 2021-05-06 19:49:43
4 Xray 0.83 2021-05-12 08:53:35
5 Victor 0.61 2021-05-12 22:06:04
6 Echo 0.12 2021-05-07 13:17:43
7 Whiskey 0.78 2021-05-13 16:25:18
8 November 0.92 2021-05-13 06:31:58
9 Yankee 0.68 2021-05-08 15:48:20
10 Charlie 0.35 2021-05-14 19:56:01
11 Mike 0.54 2021-05-14 04:03:32
12 Bravo 0.07 2021-05-09 01:18:13
13 Romeo 0.58 2021-05-15 19:24:46
14 Oscar 0.84 2021-05-15 21:31:22
15 Quebec 0.74 2021-05-16 23:48:57
16 Foxtrot 0.45 2021-05-16 00:56:39
17 Papa 0.59 2021-05-10 16:06:06
18 Delta 0.33 2021-05-17 14:17:43
19 Siera 0.72 2021-05-17 06:52:51

Row Ordering¶

Extend the basic STATEMENT to order the records by Timestamp.

Hints:

  • Use the Foo.orderby() method
In [4]:
STATEMENT = Foo.orderby(Foo.Timestamp)

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:51,303: lazy: Loading Foo
Out[4]:
Timestamp Label Level Value Bar
0 2021-05-05 03:12:19 1 Alpha 0.26 1
1 2021-05-06 19:49:43 0 Uniform 0.69 2
2 2021-05-07 13:17:43 1 Echo 0.12 1
3 2021-05-08 15:48:20 0 Yankee 0.68 5
4 2021-05-09 01:18:13 1 Bravo 0.07 1
5 2021-05-10 16:06:06 0 Papa 0.59 6
6 2021-05-11 11:27:50 0 Tango 0.94 3
7 2021-05-11 17:35:27 0 Zulu 0.57 4
8 2021-05-12 08:53:35 0 Xray 0.83 5
9 2021-05-12 22:06:04 0 Victor 0.61 6
10 2021-05-13 06:31:58 0 November 0.92 4
11 2021-05-13 16:25:18 0 Whiskey 0.78 3
12 2021-05-14 04:03:32 0 Mike 0.54 6
13 2021-05-14 19:56:01 1 Charlie 0.35 2
14 2021-05-15 19:24:46 0 Romeo 0.58 3
15 2021-05-15 21:31:22 0 Oscar 0.84 4
16 2021-05-16 00:56:39 1 Foxtrot 0.45 2
17 2021-05-16 23:48:57 0 Quebec 0.74 5
18 2021-05-17 06:52:51 0 Siera 0.72 6
19 2021-05-17 14:17:43 1 Delta 0.33 1

Row Count Limitation¶

Extend the statement to limit the result just to 5 records.

Hints:

  • Use the Foo.limit() method
In [5]:
STATEMENT = Foo.limit(5)

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:53,464: lazy: Loading Foo
Out[5]:
Timestamp Label Level Value Bar
0 2021-05-05 03:12:19 1 Alpha 0.26 1
1 2021-05-11 11:27:50 0 Tango 0.94 3
2 2021-05-11 17:35:27 0 Zulu 0.57 4
3 2021-05-06 19:49:43 0 Uniform 0.69 2
4 2021-05-12 08:53:35 0 Xray 0.83 5

Row Filtering¶

Extend the statement to filter just rows with Timestamp after 2021-05-13:

Hints:

  • Use the Foo.where() method
  • Native Python operators and literals (e.g. integers, strings, but also datetime instances) can be used directly on Schema fields to compose expressions
In [6]:
from datetime import datetime

STATEMENT = Foo.where(Foo.Timestamp > datetime(2021, 5, 13))

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:55,616: lazy: Loading Foo
Out[6]:
Timestamp Label Level Value Bar
0 2021-05-13 16:25:18 0 Whiskey 0.78 3
1 2021-05-13 06:31:58 0 November 0.92 4
2 2021-05-14 19:56:01 1 Charlie 0.35 2
3 2021-05-14 04:03:32 0 Mike 0.54 6
4 2021-05-15 19:24:46 0 Romeo 0.58 3
5 2021-05-15 21:31:22 0 Oscar 0.84 4
6 2021-05-16 23:48:57 0 Quebec 0.74 5
7 2021-05-16 00:56:39 1 Foxtrot 0.45 2
8 2021-05-17 14:17:43 1 Delta 0.33 1
9 2021-05-17 06:52:51 0 Siera 0.72 6

Multiple Datasets Joining¶

Extend the basic statement to join the Foo schema with Bar using the Foo.Bar == Bar.Key condition.

Hints:

  • Use the Foo.inner_join() method
In [7]:
STATEMENT = Foo.inner_join(Bar, Foo.Bar == Bar.Key)

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:57,803: lazy: Loading Bar
INFO: 2023-05-30 23:32:57,806: lazy: Loading Foo
Out[7]:
Timestamp Label Level Value Bar Key Length Color
0 2021-05-05 03:12:19 1 Alpha 0.26 1 1 0.69 red
1 2021-05-11 11:27:50 0 Tango 0.94 3 3 0.58 yellow
2 2021-05-11 17:35:27 0 Zulu 0.57 4 4 0.53 green
3 2021-05-06 19:49:43 0 Uniform 0.69 2 2 0.61 orange
4 2021-05-12 08:53:35 0 Xray 0.83 5 5 0.47 blue
5 2021-05-12 22:06:04 0 Victor 0.61 6 6 0.41 violet
6 2021-05-07 13:17:43 1 Echo 0.12 1 1 0.69 red
7 2021-05-13 16:25:18 0 Whiskey 0.78 3 3 0.58 yellow
8 2021-05-13 06:31:58 0 November 0.92 4 4 0.53 green
9 2021-05-08 15:48:20 0 Yankee 0.68 5 5 0.47 blue
10 2021-05-14 19:56:01 1 Charlie 0.35 2 2 0.61 orange
11 2021-05-14 04:03:32 0 Mike 0.54 6 6 0.41 violet
12 2021-05-09 01:18:13 1 Bravo 0.07 1 1 0.69 red
13 2021-05-15 19:24:46 0 Romeo 0.58 3 3 0.58 yellow
14 2021-05-15 21:31:22 0 Oscar 0.84 4 4 0.53 green
15 2021-05-16 23:48:57 0 Quebec 0.74 5 5 0.47 blue
16 2021-05-16 00:56:39 1 Foxtrot 0.45 2 2 0.61 orange
17 2021-05-10 16:06:06 0 Papa 0.59 6 6 0.41 violet
18 2021-05-17 14:17:43 1 Delta 0.33 1 1 0.69 red
19 2021-05-17 06:52:51 0 Siera 0.72 6 6 0.41 violet

Exercise¶

Formulate a single DSL query to:

  • Join Foo & Bar using their Foo.Bar == Bar.Key
  • Return only rows where Foo.Value > 0.5
  • Order result by Foo.Level
  • Select just the Bar.Length, Foo.Value and Bar.Color
In [8]:
STATEMENT = (
    Foo.inner_join(Bar, Foo.Bar == Bar.Key)
    .select(Bar.Length, Foo.Value, Bar.Color)
    .where(Foo.Value > 0.5)
    .orderby(Foo.Level)
)

project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:33:00,015: lazy: Loading Bar
INFO: 2023-05-30 23:33:00,018: lazy: Loading Foo
Out[8]:
Length Value Color
0 0.41 0.54 violet
1 0.53 0.92 green
2 0.53 0.84 green
3 0.41 0.59 violet
4 0.47 0.74 blue
5 0.58 0.58 yellow
6 0.41 0.72 violet
7 0.58 0.94 yellow
8 0.61 0.69 orange
9 0.41 0.61 violet
10 0.58 0.78 yellow
11 0.47 0.83 blue
12 0.47 0.68 blue
13 0.53 0.57 green

Unavailable Datasets¶

Resolving project data requirements based on references to catalogized schemas might be unsuccessful if the runtime platform can't provide the particular dataset:

In [9]:
STATEMENT = Baz

try:
    project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
except Exception as err:
    print(err)
None of the 2 available feeds provide all of the required sources