Working Example
In [1]:
Copied!
from flypipe import node
import pandas as pd
from datetime import datetime
@node(type="pandas")
def raw_sales():
return pd.DataFrame(data={
"product": ["apple", "banana", "orange"],
"price": [5.33, 1.2, 7.5],
"datetime_sale": [datetime(2025, 1, 1, 10, 55, 32), datetime(2025, 1, 3, 13, 15, 22), datetime(2025, 1, 4, 1, 5, 1)]
})
df = raw_sales.run()
display(df)
from flypipe import node
import pandas as pd
from datetime import datetime
@node(type="pandas")
def raw_sales():
return pd.DataFrame(data={
"product": ["apple", "banana", "orange"],
"price": [5.33, 1.2, 7.5],
"datetime_sale": [datetime(2025, 1, 1, 10, 55, 32), datetime(2025, 1, 3, 13, 15, 22), datetime(2025, 1, 4, 1, 5, 1)]
})
df = raw_sales.run()
display(df)
product | price | datetime_sale | |
---|---|---|---|
0 | apple | 5.33 | 2025-01-01 10:55:32 |
1 | banana | 1.20 | 2025-01-03 13:15:22 |
2 | orange | 7.50 | 2025-01-04 01:05:01 |
Preprocess function
In [2]:
Copied!
def cdc_changes(df):
sales_from_datetime = datetime(2025, 1, 3, 0, 0, 0)
print(f"==> Getting cdc_changes from {sales_from_datetime}")
return df[df['datetime_sale'] >= sales_from_datetime]
def cdc_changes(df):
sales_from_datetime = datetime(2025, 1, 3, 0, 0, 0)
print(f"==> Getting cdc_changes from {sales_from_datetime}")
return df[df['datetime_sale'] >= sales_from_datetime]
In [3]:
Copied!
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run()
display(df)
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run()
display(df)
==> Getting cdc_changes from 2025-01-03 00:00:00
product | price | datetime_sale | |
---|---|---|---|
1 | banana | 1.2 | 2025-01-03 13:15:22 |
2 | orange | 7.5 | 2025-01-04 01:05:01 |
Disabling preprocessing¶
All nodes dependencies¶
In [4]:
Copied!
from flypipe.mode import PreprocessMode
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run(preprocess=PreprocessMode.DISABLE)
display(df)
from flypipe.mode import PreprocessMode
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run(preprocess=PreprocessMode.DISABLE)
display(df)
product | price | datetime_sale | |
---|---|---|---|
0 | apple | 5.33 | 2025-01-01 10:55:32 |
1 | banana | 1.20 | 2025-01-03 13:15:22 |
2 | orange | 7.50 | 2025-01-04 01:05:01 |
Specific node dependencies¶
In [5]:
Copied!
from flypipe.mode import PreprocessMode
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run(preprocess={
# node: {node_dependency: PreprocessMode.DISABLE}
sales: {raw_sales: PreprocessMode.DISABLE}
})
display(df)
from flypipe.mode import PreprocessMode
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(cdc_changes).alias("df_raw")
]
)
def sales(df_raw):
return df_raw
df = sales.run(preprocess={
# node: {node_dependency: PreprocessMode.DISABLE}
sales: {raw_sales: PreprocessMode.DISABLE}
})
display(df)
product | price | datetime_sale | |
---|---|---|---|
0 | apple | 5.33 | 2025-01-01 10:55:32 |
1 | banana | 1.20 | 2025-01-03 13:15:22 |
2 | orange | 7.50 | 2025-01-04 01:05:01 |
Enable preprocess for all dependencies by default¶
In [6]:
Copied!
import os
from flypipe.config import config_context
@node(
type="pandas",
dependencies=[
raw_sales.alias("df_raw")
]
)
def other_sales(df_raw):
return df_raw
# with context was used here only to show how global processes work, in production use environment variables
with config_context(
default_dependencies_preprocess_module="preprocess_function",
default_dependencies_preprocess_function="global_preprocess"
):
df = other_sales.run()
display(df)
import os
from flypipe.config import config_context
@node(
type="pandas",
dependencies=[
raw_sales.alias("df_raw")
]
)
def other_sales(df_raw):
return df_raw
# with context was used here only to show how global processes work, in production use environment variables
with config_context(
default_dependencies_preprocess_module="preprocess_function",
default_dependencies_preprocess_function="global_preprocess"
):
df = other_sales.run()
display(df)
==> Global Preprocess
product | price | datetime_sale | |
---|---|---|---|
1 | banana | 1.2 | 2025-01-03 13:15:22 |
2 | orange | 7.5 | 2025-01-04 01:05:01 |
as you can see bellow, flypipe
still uses cdc_function
to preprocess the dependency of sales
node
In [7]:
Copied!
import os
from flypipe.config import config_context
@node(
type="pandas",
dependencies=[
raw_sales.alias("df_raw")
]
)
def other_sales(df_raw):
return df_raw
# with context was used here only to show how global processes work, in production use environment variables
with config_context(
default_dependencies_preprocess_module="preprocess_function",
default_dependencies_preprocess_function="global_preprocess"
):
df = sales.run()
display(df)
import os
from flypipe.config import config_context
@node(
type="pandas",
dependencies=[
raw_sales.alias("df_raw")
]
)
def other_sales(df_raw):
return df_raw
# with context was used here only to show how global processes work, in production use environment variables
with config_context(
default_dependencies_preprocess_module="preprocess_function",
default_dependencies_preprocess_function="global_preprocess"
):
df = sales.run()
display(df)
==> Getting cdc_changes from 2025-01-03 00:00:00
product | price | datetime_sale | |
---|---|---|---|
1 | banana | 1.2 | 2025-01-03 13:15:22 |
2 | orange | 7.5 | 2025-01-04 01:05:01 |
Chaining preprocessing functions¶
In [8]:
Copied!
def preprocess_1(df):
datetime_sales = datetime(2025, 1, 3, 0, 0, 0)
print(f"==> Applying preprocess_1 (filter datime_sale from `{datetime_sales}`)")
return df[df['datetime_sale'] >= datetime_sales]
def preprocess_2(df):
datetime_sales = datetime(2025, 1, 4, 0, 0, 0)
print(f"==> Applying preprocess_2 (filter datime_sale from `{datetime_sales}`)")
return df[df['datetime_sale'] >= datetime_sales]
def preprocess_1(df):
datetime_sales = datetime(2025, 1, 3, 0, 0, 0)
print(f"==> Applying preprocess_1 (filter datime_sale from `{datetime_sales}`)")
return df[df['datetime_sale'] >= datetime_sales]
def preprocess_2(df):
datetime_sales = datetime(2025, 1, 4, 0, 0, 0)
print(f"==> Applying preprocess_2 (filter datime_sale from `{datetime_sales}`)")
return df[df['datetime_sale'] >= datetime_sales]
In [9]:
Copied!
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(preprocess_1, preprocess_2).alias("df_raw")
]
)
def chaining(df_raw):
return df_raw
df = chaining.run()
display(df)
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(preprocess_1, preprocess_2).alias("df_raw")
]
)
def chaining(df_raw):
return df_raw
df = chaining.run()
display(df)
==> Applying preprocess_1 (filter datime_sale from `2025-01-03 00:00:00`) ==> Applying preprocess_2 (filter datime_sale from `2025-01-04 00:00:00`)
product | price | datetime_sale | |
---|---|---|---|
2 | orange | 7.5 | 2025-01-04 01:05:01 |
reverting the order fo the preprocess functions, reverts the callings
In [10]:
Copied!
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(preprocess_2, preprocess_1).alias("df_raw")
]
)
def chaining(df_raw):
return df_raw
df = chaining.run()
display(df)
@node(
type="pandas",
dependencies=[
raw_sales.preprocess(preprocess_2, preprocess_1).alias("df_raw")
]
)
def chaining(df_raw):
return df_raw
df = chaining.run()
display(df)
==> Applying preprocess_2 (filter datime_sale from `2025-01-04 00:00:00`) ==> Applying preprocess_1 (filter datime_sale from `2025-01-03 00:00:00`)
product | price | datetime_sale | |
---|---|---|---|
2 | orange | 7.5 | 2025-01-04 01:05:01 |