ML Graphs (with mlflow)¶
Flypipe allows creation of graphs to train ML models and run predictions.
Here as example, we are building the following graph to train, evaluate and predict.
Training Graph
- data: loads sklearn iris dataset into a dataframe
- split: splits the data into train/test data
- fit_scale: fit and scale the data using sklearn Standard Scaler
- train_svm_model: trains a sklearn SVM model on train data and returns the prediction
- evaluate: calculates evaluation metrics
Prediction Graph
- scale: scales the data using the scaler fit on node
fit_scale
- predict: loads the SVM model trained in the node
train_svm_model
and does the predictions
Governance
- graph: dummy node used to plot all related graphs to the model
In this section, we are using mlflow to save and loand ML artifacts
Training Graph¶
Data¶
In [2]:
Copied!
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, Integer
import pandas as pd
from sklearn import datasets
@node(
type="pandas",
description="Load Iris dataset",
tags=["data"],
output=Schema(
Column('sepal_length', Float(), 'sepal length'),
Column('sepal_width', Float(), 'sepal width'),
Column('petal_length', Float(), 'petal length'),
Column('petal_width', Float(), 'petal width'),
Column('target', Integer(), '0: Setosa, 1: Versicolour, and 2: Virginica'),
))
def data():
iris = datasets.load_iris()
df = pd.DataFrame(data = {
'sepal_length': iris.data[:,0],
'sepal_width': iris.data[:,1],
'petal_length': iris.data[:,2],
'petal_width': iris.data[:,3],
'target': iris.target
})
return df
data.run()
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, Integer
import pandas as pd
from sklearn import datasets
@node(
type="pandas",
description="Load Iris dataset",
tags=["data"],
output=Schema(
Column('sepal_length', Float(), 'sepal length'),
Column('sepal_width', Float(), 'sepal width'),
Column('petal_length', Float(), 'petal length'),
Column('petal_width', Float(), 'petal width'),
Column('target', Integer(), '0: Setosa, 1: Versicolour, and 2: Virginica'),
))
def data():
iris = datasets.load_iris()
df = pd.DataFrame(data = {
'sepal_length': iris.data[:,0],
'sepal_width': iris.data[:,1],
'petal_length': iris.data[:,2],
'petal_width': iris.data[:,3],
'target': iris.target
})
return df
data.run()
Out[2]:
sepal_length | sepal_width | petal_length | petal_width | target | |
---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | 0 |
1 | 4.9 | 3.0 | 1.4 | 0.2 | 0 |
2 | 4.7 | 3.2 | 1.3 | 0.2 | 0 |
3 | 4.6 | 3.1 | 1.5 | 0.2 | 0 |
4 | 5.0 | 3.6 | 1.4 | 0.2 | 0 |
... | ... | ... | ... | ... | ... |
145 | 6.7 | 3.0 | 5.2 | 2.3 | 2 |
146 | 6.3 | 2.5 | 5.0 | 1.9 | 2 |
147 | 6.5 | 3.0 | 5.2 | 2.0 | 2 |
148 | 6.2 | 3.4 | 5.4 | 2.3 | 2 |
149 | 5.9 | 3.0 | 5.1 | 1.8 | 2 |
150 rows × 5 columns
Split data as train (70%) and test (30%)¶
In [3]:
Copied!
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String
from sklearn.model_selection import train_test_split
@node(
type="pandas",
description="Split train (70%) and test (30%) data",
tags=["data", "split"],
dependencies=[
data.select(
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
)
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
data.output.get("sepal_length"),
data.output.get("sepal_width"),
data.output.get("petal_length"),
data.output.get("petal_width"),
data.output.get("target"),
))
def split(data):
data['data_type'] = "train"
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
y_col = 'target'
X_train, X_test, y_train, y_test = train_test_split(data[X_cols],
data[y_col],
test_size=0.3,
random_state=1)
X_train['data_type'] = 'train'
X_train['target'] = y_train
X_test['data_type'] = 'test'
X_test['target'] = y_test
data = pd.concat([X_train, X_test])
return data
df = split.run()
display(df)
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String
from sklearn.model_selection import train_test_split
@node(
type="pandas",
description="Split train (70%) and test (30%) data",
tags=["data", "split"],
dependencies=[
data.select(
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
)
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
data.output.get("sepal_length"),
data.output.get("sepal_width"),
data.output.get("petal_length"),
data.output.get("petal_width"),
data.output.get("target"),
))
def split(data):
data['data_type'] = "train"
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
y_col = 'target'
X_train, X_test, y_train, y_test = train_test_split(data[X_cols],
data[y_col],
test_size=0.3,
random_state=1)
X_train['data_type'] = 'train'
X_train['target'] = y_train
X_test['data_type'] = 'test'
X_test['target'] = y_test
data = pd.concat([X_train, X_test])
return data
df = split.run()
display(df)
data_type | sepal_length | sepal_width | petal_length | petal_width | target | |
---|---|---|---|---|---|---|
118 | train | 7.7 | 2.6 | 6.9 | 2.3 | 2 |
18 | train | 5.7 | 3.8 | 1.7 | 0.3 | 0 |
4 | train | 5.0 | 3.6 | 1.4 | 0.2 | 0 |
45 | train | 4.8 | 3.0 | 1.4 | 0.3 | 0 |
59 | train | 5.2 | 2.7 | 3.9 | 1.4 | 1 |
... | ... | ... | ... | ... | ... | ... |
112 | test | 6.8 | 3.0 | 5.5 | 2.1 | 2 |
17 | test | 5.1 | 3.5 | 1.4 | 0.3 | 0 |
119 | test | 6.0 | 2.2 | 5.0 | 1.5 | 2 |
103 | test | 6.3 | 2.9 | 5.6 | 1.8 | 2 |
58 | test | 6.6 | 2.9 | 4.6 | 1.3 | 1 |
150 rows × 6 columns
Fit and Scale¶
In [4]:
Copied!
import os
ARTIFACT_LOCATION = "/data/tmp/artifacts/"
os.makedirs(ARTIFACT_LOCATION, exist_ok=True)
import os
ARTIFACT_LOCATION = "/data/tmp/artifacts/"
os.makedirs(ARTIFACT_LOCATION, exist_ok=True)
In [5]:
Copied!
import os
import pickle
import mlflow
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
@node(
type="pandas",
description="Fits a standard scaler",
tags=["data", "train", "scaler"],
dependencies=[
split.select(
'data_type',
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
)
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
split.output.get("sepal_length"),
split.output.get("sepal_width"),
split.output.get("petal_length"),
split.output.get("petal_width"),
split.output.get("target"),
))
def fit_scale(split):
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
scaler = StandardScaler()
scaler = scaler.fit(split[split['data_type']=='train'][X_cols])
if mlflow.active_run():
artifact_path = f"{ARTIFACT_LOCATION}{mlflow.active_run().info.run_id}/model"
if not os.path.exists(artifact_path):
os.makedirs(artifact_path, exist_ok=True)
pickle.dump(scaler, open(os.path.join(artifact_path, 'scaler.pkl'), 'wb'))
split[X_cols] = scaler.transform(split[X_cols])
return split
df = fit_scale.run()
display(df.head(10))
import os
import pickle
import mlflow
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
@node(
type="pandas",
description="Fits a standard scaler",
tags=["data", "train", "scaler"],
dependencies=[
split.select(
'data_type',
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
)
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
split.output.get("sepal_length"),
split.output.get("sepal_width"),
split.output.get("petal_length"),
split.output.get("petal_width"),
split.output.get("target"),
))
def fit_scale(split):
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
scaler = StandardScaler()
scaler = scaler.fit(split[split['data_type']=='train'][X_cols])
if mlflow.active_run():
artifact_path = f"{ARTIFACT_LOCATION}{mlflow.active_run().info.run_id}/model"
if not os.path.exists(artifact_path):
os.makedirs(artifact_path, exist_ok=True)
pickle.dump(scaler, open(os.path.join(artifact_path, 'scaler.pkl'), 'wb'))
split[X_cols] = scaler.transform(split[X_cols])
return split
df = fit_scale.run()
display(df.head(10))
data_type | sepal_length | sepal_width | petal_length | petal_width | target | |
---|---|---|---|---|---|---|
118 | train | 2.260502 | -1.050897 | 1.776229 | 1.423710 | 2 |
18 | train | -0.118974 | 1.827647 | -1.144919 | -1.142634 | 0 |
4 | train | -0.951790 | 1.347889 | -1.313447 | -1.270951 | 0 |
45 | train | -1.189738 | -0.091382 | -1.313447 | -1.142634 | 0 |
59 | train | -0.713843 | -0.811018 | 0.090951 | 0.268855 | 1 |
39 | train | -0.832816 | 0.868132 | -1.257271 | -1.270951 | 0 |
36 | train | -0.356921 | 1.108011 | -1.369623 | -1.270951 | 0 |
117 | train | 2.260502 | 1.827647 | 1.663877 | 1.295393 | 2 |
139 | train | 1.308712 | 0.148496 | 0.933590 | 1.167075 | 2 |
107 | train | 1.784607 | -0.331261 | 1.439174 | 0.782124 | 2 |
Train SVM Model¶
In [6]:
Copied!
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String, Integer
from sklearn.model_selection import train_test_split
from sklearn import svm
from mlflow.models.signature import infer_signature
@node(
type="pandas",
description="Model training using SVM",
tags=["model", "svm"],
dependencies=[
fit_scale.select(
'data_type',
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
).alias("df")
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
fit_scale.output.get("sepal_length"),
fit_scale.output.get("sepal_width"),
fit_scale.output.get("petal_length"),
fit_scale.output.get("petal_width"),
fit_scale.output.get("target"),
Column('prediction', Integer(), 'prediction'),
))
def train_svm_model(df):
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
X_train = df[df['data_type']=='train']
y_train = X_train['target']
X_train = X_train[X_cols]
clf = svm.SVC().fit(X_train, y_train)
if mlflow.active_run():
signature = infer_signature(X_train, y_train)
mlflow.sklearn.log_model(clf,
"model",
signature=signature,
input_example=X_train.head(5))
df['prediction'] = clf.predict(df[X_cols])
return df
df = train_svm_model.run()
display(df)
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String, Integer
from sklearn.model_selection import train_test_split
from sklearn import svm
from mlflow.models.signature import infer_signature
@node(
type="pandas",
description="Model training using SVM",
tags=["model", "svm"],
dependencies=[
fit_scale.select(
'data_type',
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'target',
).alias("df")
],
output=Schema(
Column('data_type', String(), 'train (70%), test (30%)'),
fit_scale.output.get("sepal_length"),
fit_scale.output.get("sepal_width"),
fit_scale.output.get("petal_length"),
fit_scale.output.get("petal_width"),
fit_scale.output.get("target"),
Column('prediction', Integer(), 'prediction'),
))
def train_svm_model(df):
X_cols = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width'
]
X_train = df[df['data_type']=='train']
y_train = X_train['target']
X_train = X_train[X_cols]
clf = svm.SVC().fit(X_train, y_train)
if mlflow.active_run():
signature = infer_signature(X_train, y_train)
mlflow.sklearn.log_model(clf,
"model",
signature=signature,
input_example=X_train.head(5))
df['prediction'] = clf.predict(df[X_cols])
return df
df = train_svm_model.run()
display(df)
data_type | sepal_length | sepal_width | petal_length | petal_width | target | prediction | |
---|---|---|---|---|---|---|---|
118 | train | 2.260502 | -1.050897 | 1.776229 | 1.423710 | 2 | 2 |
18 | train | -0.118974 | 1.827647 | -1.144919 | -1.142634 | 0 | 0 |
4 | train | -0.951790 | 1.347889 | -1.313447 | -1.270951 | 0 | 0 |
45 | train | -1.189738 | -0.091382 | -1.313447 | -1.142634 | 0 | 0 |
59 | train | -0.713843 | -0.811018 | 0.090951 | 0.268855 | 1 | 1 |
... | ... | ... | ... | ... | ... | ... | ... |
112 | test | 1.189738 | -0.091382 | 0.989766 | 1.167075 | 2 | 2 |
17 | test | -0.832816 | 1.108011 | -1.313447 | -1.142634 | 0 | 0 |
119 | test | 0.237948 | -2.010411 | 0.708887 | 0.397172 | 2 | 1 |
103 | test | 0.594869 | -0.331261 | 1.045942 | 0.782124 | 2 | 2 |
58 | test | 0.951790 | -0.331261 | 0.484183 | 0.140538 | 1 | 1 |
150 rows × 7 columns
Evaluate¶
In [7]:
Copied!
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String, Integer
from sklearn.model_selection import train_test_split
from sklearn import svm
from mlflow.models.signature import infer_signature
from sklearn.metrics import f1_score
@node(
type="pandas",
description="Model training using SVM",
tags=["model", "svm"],
dependencies=[
train_svm_model.select(
'data_type',
'target',
'prediction'
).alias("df")
],
output=Schema(
Column('data_type', String(), 'all, train or test'),
Column('metric', String(), 'score metric'),
Column('value', Float(), 'value of the metric'),
))
def evaluate(df):
result = pd.DataFrame(columns=['data_type', 'metric', 'value'])
# All data
score = f1_score(df['target'], df['prediction'], average='macro')
result.loc[result.shape[0]] = ['all', 'f1_score macro', score]
# Train data
df_ = df[df['data_type']=='train']
score = f1_score(df_['target'], df_['prediction'], average='macro')
result.loc[result.shape[0]] = ['train', 'f1_score macro', score]
# Test data
df_ = df[df['data_type']=='test']
score = f1_score(df_['target'], df_['prediction'], average='macro')
result.loc[result.shape[0]] = ['test', 'f1_score macro', score]
return result
df = evaluate.run()
display(df)
displayHTML(evaluate.html())
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import Float, String, Integer
from sklearn.model_selection import train_test_split
from sklearn import svm
from mlflow.models.signature import infer_signature
from sklearn.metrics import f1_score
@node(
type="pandas",
description="Model training using SVM",
tags=["model", "svm"],
dependencies=[
train_svm_model.select(
'data_type',
'target',
'prediction'
).alias("df")
],
output=Schema(
Column('data_type', String(), 'all, train or test'),
Column('metric', String(), 'score metric'),
Column('value', Float(), 'value of the metric'),
))
def evaluate(df):
result = pd.DataFrame(columns=['data_type', 'metric', 'value'])
# All data
score = f1_score(df['target'], df['prediction'], average='macro')
result.loc[result.shape[0]] = ['all', 'f1_score macro', score]
# Train data
df_ = df[df['data_type']=='train']
score = f1_score(df_['target'], df_['prediction'], average='macro')
result.loc[result.shape[0]] = ['train', 'f1_score macro', score]
# Test data
df_ = df[df['data_type']=='test']
score = f1_score(df_['target'], df_['prediction'], average='macro')
result.loc[result.shape[0]] = ['test', 'f1_score macro', score]
return result
df = evaluate.run()
display(df)
displayHTML(evaluate.html())
data_type | metric | value | |
---|---|---|---|
0 | all | f1_score macro | 0.973323 |
1 | train | f1_score macro | 0.980475 |
2 | test | f1_score macro | 0.955840 |
Out[7]: