Databricks¶
Below we have a very simple transformation pipeline setup that shows how Flypipe might be used. Given the names of various fruits, we will do some minor cleaning of the data and add two columns- color and category.
Install flypipe¶
In [6]:
Copied!
%pip install flypipe
%pip install flypipe
Requirement already satisfied: flypipe in /usr/local/lib/python3.9/site-packages (4.3.2) Requirement already satisfied: Jinja2>=3.1 in /usr/local/lib/python3.9/site-packages (from flypipe) (3.1.6) Requirement already satisfied: networkx>=3.1 in /usr/local/lib/python3.9/site-packages (from flypipe) (3.2.1) Requirement already satisfied: tabulate>=0.9 in /usr/local/lib/python3.9/site-packages (from flypipe) (0.9.0) Requirement already satisfied: sparkleframe>=0.2 in /usr/local/lib/python3.9/site-packages (from flypipe) (0.3.2) Requirement already satisfied: MarkupSafe>=2.0 in /usr/local/lib/python3.9/site-packages (from Jinja2>=3.1->flypipe) (3.0.2) Requirement already satisfied: polars>=1.30 in /usr/local/lib/python3.9/site-packages (from sparkleframe>=0.2->flypipe) (1.30.0) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning. Note: you may need to restart the kernel to use updated packages.
Create a temporary view representing a table¶
In [7]:
Copied!
df = spark.createDataFrame(
schema=("_fruit",),
data=[
("ORANGE",),
("WATERMELON",),
("LEMON",),
]
)
df.createOrReplaceTempView("table")
display(df)
df = spark.createDataFrame(
schema=("_fruit",),
data=[
("ORANGE",),
("WATERMELON",),
("LEMON",),
]
)
df.createOrReplaceTempView("table")
display(df)
_fruit |
---|
ORANGE |
WATERMELON |
LEMON |
Create a graph¶
In [8]:
Copied!
from flypipe import node
from flypipe.datasource.spark import Spark
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node(
type="pyspark",
dependencies=[
Spark("table").select("_fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
)
)
def clean(df):
df = df.withColumnRenamed('_fruit', 'fruit')
df = df.withColumn('fruit', F.lower(F.col('fruit')))
return df
@node(
type="pyspark",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
)
)
def color(df):
replacements = {
"blackberry": "black",
"strawberry": "red",
"orange": "orange",
"watermelon": "red",
"lemon": "yellow",
"plum": "purple",
}
df = df.withColumn("color", F.col("fruit"))
df = df.replace(list(replacements.keys()), list(replacements.values()), "color")
return df
@node(
type="pyspark",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("category", String(), "category of the fruit"),
)
)
def category(df):
replacements = {
"blackberry": "berry",
"strawberry": "berry",
"orange": "citrus",
"watermelon": "misc",
"lemon": "citrus",
"plum": "stonefruit",
}
df = df.withColumn("category", F.col("fruit"))
df = df.replace(list(replacements.keys()), list(replacements.values()), "category")
return df
@node(
type="pyspark",
dependencies=[
color.select("fruit", "color"),
category.select("fruit", "category")
],
output=Schema(
Column("fruit", String(), "fruit description"),
Column("color", String(), "color of the fruit"),
Column("category", String(), "category of the fruit"),
)
)
def fruits(color, category):
return color.join(category, on="fruit", how="left")
from flypipe import node
from flypipe.datasource.spark import Spark
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node(
type="pyspark",
dependencies=[
Spark("table").select("_fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
)
)
def clean(df):
df = df.withColumnRenamed('_fruit', 'fruit')
df = df.withColumn('fruit', F.lower(F.col('fruit')))
return df
@node(
type="pyspark",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("color", String(), "color of the fruit"),
)
)
def color(df):
replacements = {
"blackberry": "black",
"strawberry": "red",
"orange": "orange",
"watermelon": "red",
"lemon": "yellow",
"plum": "purple",
}
df = df.withColumn("color", F.col("fruit"))
df = df.replace(list(replacements.keys()), list(replacements.values()), "color")
return df
@node(
type="pyspark",
dependencies=[
clean.select("fruit").alias("df")
],
output=Schema(
Column("fruit", String(), "fruit name"),
Column("category", String(), "category of the fruit"),
)
)
def category(df):
replacements = {
"blackberry": "berry",
"strawberry": "berry",
"orange": "citrus",
"watermelon": "misc",
"lemon": "citrus",
"plum": "stonefruit",
}
df = df.withColumn("category", F.col("fruit"))
df = df.replace(list(replacements.keys()), list(replacements.values()), "category")
return df
@node(
type="pyspark",
dependencies=[
color.select("fruit", "color"),
category.select("fruit", "category")
],
output=Schema(
Column("fruit", String(), "fruit description"),
Column("color", String(), "color of the fruit"),
Column("category", String(), "category of the fruit"),
)
)
def fruits(color, category):
return color.join(category, on="fruit", how="left")
Execution Graph¶
In [9]:
Copied!
displayHTML(fruits.html())
displayHTML(fruits.html())
Out[9]: