Node Function Examples¶
A node function allows you to create nodes programmatically.
It is useful when you need nodes to behave in different ways, depending on specific conditions.
In [1]:
Copied!
df = spark.createDataFrame(
data=[
("LEMON", "Yellow",),
("LIME", "Green",)
], schema=["fruit", "color"])
df.createOrReplaceTempView("fruits_table")
display(df)
df = spark.createDataFrame(
data=[
("LEMON", "Yellow",),
("LIME", "Green",)
], schema=["fruit", "color"])
df.createOrReplaceTempView("fruits_table")
display(df)
fruit | color |
---|---|
LEMON | Yellow |
LIME | Green |
Dynamic transformations¶
Suppose you want to lower case the columns of the dataframe above, however this dataframe can contain hundreds of columns, therefore, you only want to apply transformations on columns requested by the graph.
In this example, if a node is importing this dataframe and selecting only column fruit
, then only the raw column fruit
should be queried and applied lower case transformation.
In [2]:
Copied!
from flypipe import node
from flypipe import node_function
from flypipe.datasource.spark import Spark
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node_function(
requested_columns=True,
node_dependencies=[
Spark("fruits_table")
]
)
def fruits_function(requested_columns):
print(f"Raw columns queried: {requested_columns}")
@node(
type="pyspark",
dependencies=[
Spark("fruits_table").select(requested_columns)
],
output=Schema([
Column(col, String(), col) for col in requested_columns
])
)
def lower(fruits_table):
for col in requested_columns:
print(f"lower case column `{col}`")
fruits_table = fruits_table.withColumn(col, F.lower(col))
return fruits_table
return lower
from flypipe import node
from flypipe import node_function
from flypipe.datasource.spark import Spark
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node_function(
requested_columns=True,
node_dependencies=[
Spark("fruits_table")
]
)
def fruits_function(requested_columns):
print(f"Raw columns queried: {requested_columns}")
@node(
type="pyspark",
dependencies=[
Spark("fruits_table").select(requested_columns)
],
output=Schema([
Column(col, String(), col) for col in requested_columns
])
)
def lower(fruits_table):
for col in requested_columns:
print(f"lower case column `{col}`")
fruits_table = fruits_table.withColumn(col, F.lower(col))
return fruits_table
return lower
Selecting fruit
¶
In [3]:
Copied!
@node(
type="pyspark",
dependencies=[fruits_function.select("fruit")]
)
def my_fruits(fruits_function):
return fruits_function
df = my_fruits.run(spark)
display(df)
displayHTML(my_fruits.html())
@node(
type="pyspark",
dependencies=[fruits_function.select("fruit")]
)
def my_fruits(fruits_function):
return fruits_function
df = my_fruits.run(spark)
display(df)
displayHTML(my_fruits.html())
Raw columns queried: ['fruit'] lower case column `fruit`
fruit |
---|
lemon |
lime |
Raw columns queried: ['fruit']
Out[3]: