Pandas¶
Below we have a very simple transformation pipeline setup that shows how Flypipe might be used. Given the names of various fruits, we will do some minor cleaning of the data and add two columns- color and category.
InĀ [1]:
Copied!
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
# Create the Graph
@node(
type="pandas",
output=Schema(
Column("_fruit", String(), "_fruit"),
)
)
def table():
return pd.DataFrame(data={'_fruit': ['ORANGE', 'WATERMELON', 'LEMON']})
@node(
type="pandas",
dependencies=[
table.select("_fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
)
)
def clean(df):
df = df.rename(columns={'_fruit': 'fruit'})
df['fruit'] = df['fruit'].str.lower()
return df
@node(
type="pandas",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
)
)
def color(df):
replacements = {
"blackberry": "black",
"strawberry": "red",
"orange": "orange",
"watermelon": "red",
"lemon": "yellow",
"plum": "purple",
}
df['color'] = df['fruit']
df = df.replace({"color": replacements})
return df
@node(
type="pandas",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("category", String(), "category of the fruit"),
)
)
def category(df):
replacements = {
"blackberry": "berry",
"strawberry": "berry",
"orange": "citrus",
"watermelon": "misc",
"lemon": "citrus",
"plum": "stonefruit",
}
df['category'] = df['fruit']
df = df.replace({"category": replacements})
return df
@node(
type="pandas",
dependencies=[
color.select("fruit", "color"),
category.select("fruit", "category")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
Column("category", String(), "category of the fruit"),
)
)
def fruits(color, category):
return color.merge(category, on="fruit", how="left")
from flypipe import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
# Create the Graph
@node(
type="pandas",
output=Schema(
Column("_fruit", String(), "_fruit"),
)
)
def table():
return pd.DataFrame(data={'_fruit': ['ORANGE', 'WATERMELON', 'LEMON']})
@node(
type="pandas",
dependencies=[
table.select("_fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
)
)
def clean(df):
df = df.rename(columns={'_fruit': 'fruit'})
df['fruit'] = df['fruit'].str.lower()
return df
@node(
type="pandas",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
)
)
def color(df):
replacements = {
"blackberry": "black",
"strawberry": "red",
"orange": "orange",
"watermelon": "red",
"lemon": "yellow",
"plum": "purple",
}
df['color'] = df['fruit']
df = df.replace({"color": replacements})
return df
@node(
type="pandas",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("category", String(), "category of the fruit"),
)
)
def category(df):
replacements = {
"blackberry": "berry",
"strawberry": "berry",
"orange": "citrus",
"watermelon": "misc",
"lemon": "citrus",
"plum": "stonefruit",
}
df['category'] = df['fruit']
df = df.replace({"category": replacements})
return df
@node(
type="pandas",
dependencies=[
color.select("fruit", "color"),
category.select("fruit", "category")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
Column("category", String(), "category of the fruit"),
)
)
def fruits(color, category):
return color.merge(category, on="fruit", how="left")
InĀ [2]:
Copied!
displayHTML(fruits.html())
displayHTML(fruits.html())
Out[2]: