Objective:
Learning how to define project data source requirements independently of actual data to achieve portability while maintaining reproducibility.
Principles:
Foo
, Bar
, and Baz
.Schema catalogs act as mutual dataset references between projects (ML solutions) and platforms. They are regular Python modules with schema instances defined using the ForML Schema API.
We are going to use the following dummy catalog throughout this tutorial:
from IPython import display
display.Code('dummycatalog.py')
"""
This is a FooBarBaz schema catalog used throughout this tutorial.
"""
from forml.io import dsl
class Foo(dsl.Schema):
"""Foo dataset schema."""
Timestamp = dsl.Field(dsl.Timestamp())
Label = dsl.Field(dsl.Integer())
Level = dsl.Field(dsl.String())
Value = dsl.Field(dsl.Float())
Bar = dsl.Field(dsl.Integer())
class Bar(dsl.Schema):
"""Bar dataset schema."""
Key = dsl.Field(dsl.Integer())
Length = dsl.Field(dsl.Float())
Color = dsl.Field(dsl.String())
class Baz(dsl.Schema):
"""Baz dataset schema.
Note: To demonstrate an unavailable dataset, this schema is deliberately
not resolved by any of the feeds configured within this tutorial
platform.
"""
Key = dsl.Field(dsl.Integer())
Name = dsl.Field(dsl.String())
Dob = dsl.Field(dsl.Date())
.launcher
propertyLet's create and launch an example of a minimal artifact:
from forml import project
from forml.pipeline import payload
from dummycatalog import Foo, Bar, Baz
STATEMENT = Foo # Minimal query statement refering to dummycatalog
PIPELINE = payload.ToPandas() # Minimal pipeline - a single operator
PROJECT = project.Source.query(STATEMENT).bind(PIPELINE)
PROJECT.launcher.apply() # Triggering the launcher
INFO: 2023-05-30 23:32:47,003: lazy: Loading Foo
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 |
1 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 |
2 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 |
3 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 |
4 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 |
5 | 2021-05-12 22:06:04 | 0 | Victor | 0.61 | 6 |
6 | 2021-05-07 13:17:43 | 1 | Echo | 0.12 | 1 |
7 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 |
8 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 |
9 | 2021-05-08 15:48:20 | 0 | Yankee | 0.68 | 5 |
10 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 |
11 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 |
12 | 2021-05-09 01:18:13 | 1 | Bravo | 0.07 | 1 |
13 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 |
14 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 |
15 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 |
16 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 |
17 | 2021-05-10 16:06:06 | 0 | Papa | 0.59 | 6 |
18 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 |
19 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 |
The data requirements can be refined more granularly using the Query DSL Syntax:
Extend the basic STATEMENT
to select just the Foo.Level
, Foo.Value
and Foo.Timestamp
columns.
Hints:
Schema.FieldName
STATEMENT = Foo.select(Foo.Level, Foo.Value, Foo.Timestamp)
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:49,136: lazy: Loading Foo
Level | Value | Timestamp | |
---|---|---|---|
0 | Alpha | 0.26 | 2021-05-05 03:12:19 |
1 | Tango | 0.94 | 2021-05-11 11:27:50 |
2 | Zulu | 0.57 | 2021-05-11 17:35:27 |
3 | Uniform | 0.69 | 2021-05-06 19:49:43 |
4 | Xray | 0.83 | 2021-05-12 08:53:35 |
5 | Victor | 0.61 | 2021-05-12 22:06:04 |
6 | Echo | 0.12 | 2021-05-07 13:17:43 |
7 | Whiskey | 0.78 | 2021-05-13 16:25:18 |
8 | November | 0.92 | 2021-05-13 06:31:58 |
9 | Yankee | 0.68 | 2021-05-08 15:48:20 |
10 | Charlie | 0.35 | 2021-05-14 19:56:01 |
11 | Mike | 0.54 | 2021-05-14 04:03:32 |
12 | Bravo | 0.07 | 2021-05-09 01:18:13 |
13 | Romeo | 0.58 | 2021-05-15 19:24:46 |
14 | Oscar | 0.84 | 2021-05-15 21:31:22 |
15 | Quebec | 0.74 | 2021-05-16 23:48:57 |
16 | Foxtrot | 0.45 | 2021-05-16 00:56:39 |
17 | Papa | 0.59 | 2021-05-10 16:06:06 |
18 | Delta | 0.33 | 2021-05-17 14:17:43 |
19 | Siera | 0.72 | 2021-05-17 06:52:51 |
Extend the basic STATEMENT
to order the records by Timestamp
.
Hints:
STATEMENT = Foo.orderby(Foo.Timestamp)
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:51,303: lazy: Loading Foo
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 |
1 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 |
2 | 2021-05-07 13:17:43 | 1 | Echo | 0.12 | 1 |
3 | 2021-05-08 15:48:20 | 0 | Yankee | 0.68 | 5 |
4 | 2021-05-09 01:18:13 | 1 | Bravo | 0.07 | 1 |
5 | 2021-05-10 16:06:06 | 0 | Papa | 0.59 | 6 |
6 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 |
7 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 |
8 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 |
9 | 2021-05-12 22:06:04 | 0 | Victor | 0.61 | 6 |
10 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 |
11 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 |
12 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 |
13 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 |
14 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 |
15 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 |
16 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 |
17 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 |
18 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 |
19 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 |
Extend the statement to limit the result just to 5 records.
Hints:
STATEMENT = Foo.limit(5)
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:53,464: lazy: Loading Foo
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 |
1 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 |
2 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 |
3 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 |
4 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 |
Extend the statement to filter just rows with Timestamp
after 2021-05-13
:
Hints:
datetime
instances) can be used directly on Schema fields to compose expressionsfrom datetime import datetime
STATEMENT = Foo.where(Foo.Timestamp > datetime(2021, 5, 13))
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:55,616: lazy: Loading Foo
Timestamp | Label | Level | Value | Bar | |
---|---|---|---|---|---|
0 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 |
1 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 |
2 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 |
3 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 |
4 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 |
5 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 |
6 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 |
7 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 |
8 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 |
9 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 |
Extend the basic statement to join the Foo
schema with Bar
using the Foo.Bar == Bar.Key
condition.
Hints:
STATEMENT = Foo.inner_join(Bar, Foo.Bar == Bar.Key)
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:32:57,803: lazy: Loading Bar INFO: 2023-05-30 23:32:57,806: lazy: Loading Foo
Timestamp | Label | Level | Value | Bar | Key | Length | Color | |
---|---|---|---|---|---|---|---|---|
0 | 2021-05-05 03:12:19 | 1 | Alpha | 0.26 | 1 | 1 | 0.69 | red |
1 | 2021-05-11 11:27:50 | 0 | Tango | 0.94 | 3 | 3 | 0.58 | yellow |
2 | 2021-05-11 17:35:27 | 0 | Zulu | 0.57 | 4 | 4 | 0.53 | green |
3 | 2021-05-06 19:49:43 | 0 | Uniform | 0.69 | 2 | 2 | 0.61 | orange |
4 | 2021-05-12 08:53:35 | 0 | Xray | 0.83 | 5 | 5 | 0.47 | blue |
5 | 2021-05-12 22:06:04 | 0 | Victor | 0.61 | 6 | 6 | 0.41 | violet |
6 | 2021-05-07 13:17:43 | 1 | Echo | 0.12 | 1 | 1 | 0.69 | red |
7 | 2021-05-13 16:25:18 | 0 | Whiskey | 0.78 | 3 | 3 | 0.58 | yellow |
8 | 2021-05-13 06:31:58 | 0 | November | 0.92 | 4 | 4 | 0.53 | green |
9 | 2021-05-08 15:48:20 | 0 | Yankee | 0.68 | 5 | 5 | 0.47 | blue |
10 | 2021-05-14 19:56:01 | 1 | Charlie | 0.35 | 2 | 2 | 0.61 | orange |
11 | 2021-05-14 04:03:32 | 0 | Mike | 0.54 | 6 | 6 | 0.41 | violet |
12 | 2021-05-09 01:18:13 | 1 | Bravo | 0.07 | 1 | 1 | 0.69 | red |
13 | 2021-05-15 19:24:46 | 0 | Romeo | 0.58 | 3 | 3 | 0.58 | yellow |
14 | 2021-05-15 21:31:22 | 0 | Oscar | 0.84 | 4 | 4 | 0.53 | green |
15 | 2021-05-16 23:48:57 | 0 | Quebec | 0.74 | 5 | 5 | 0.47 | blue |
16 | 2021-05-16 00:56:39 | 1 | Foxtrot | 0.45 | 2 | 2 | 0.61 | orange |
17 | 2021-05-10 16:06:06 | 0 | Papa | 0.59 | 6 | 6 | 0.41 | violet |
18 | 2021-05-17 14:17:43 | 1 | Delta | 0.33 | 1 | 1 | 0.69 | red |
19 | 2021-05-17 06:52:51 | 0 | Siera | 0.72 | 6 | 6 | 0.41 | violet |
Formulate a single DSL query to:
Foo.Bar == Bar.Key
Foo.Value > 0.5
Foo.Level
Bar.Length
, Foo.Value
and Bar.Color
STATEMENT = (
Foo.inner_join(Bar, Foo.Bar == Bar.Key)
.select(Bar.Length, Foo.Value, Bar.Color)
.where(Foo.Value > 0.5)
.orderby(Foo.Level)
)
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
INFO: 2023-05-30 23:33:00,015: lazy: Loading Bar INFO: 2023-05-30 23:33:00,018: lazy: Loading Foo
Length | Value | Color | |
---|---|---|---|
0 | 0.41 | 0.54 | violet |
1 | 0.53 | 0.92 | green |
2 | 0.53 | 0.84 | green |
3 | 0.41 | 0.59 | violet |
4 | 0.47 | 0.74 | blue |
5 | 0.58 | 0.58 | yellow |
6 | 0.41 | 0.72 | violet |
7 | 0.58 | 0.94 | yellow |
8 | 0.61 | 0.69 | orange |
9 | 0.41 | 0.61 | violet |
10 | 0.58 | 0.78 | yellow |
11 | 0.47 | 0.83 | blue |
12 | 0.47 | 0.68 | blue |
13 | 0.53 | 0.57 | green |
Resolving project data requirements based on references to catalogized schemas might be unsuccessful if the runtime platform can't provide the particular dataset:
STATEMENT = Baz
try:
project.Source.query(STATEMENT).bind(PIPELINE).launcher.apply()
except Exception as err:
print(err)
None of the 2 available feeds provide all of the required sources