Examples¶
pandas
node¶
In [1]:
Copied!
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas",
description="Outputs a dataframe with fruit names",
tags=["fruit"],
output=Schema([
Column("fruit", String(), "name of the fruit"),
])
)
def fruits():
return pd.DataFrame(data={"fruit": ["mango", "lemon"]})
fruits.run()
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas",
description="Outputs a dataframe with fruit names",
tags=["fruit"],
output=Schema([
Column("fruit", String(), "name of the fruit"),
])
)
def fruits():
return pd.DataFrame(data={"fruit": ["mango", "lemon"]})
fruits.run()
Out[1]:
fruit | |
---|---|
0 | mango |
1 | lemon |
pandas_on_spark
node¶
In [2]:
Copied!
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas_on_spark",
description="Only outputs a pandas dataframe",
tags=["flavour", "pandas_on_spark"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pandas_spark(df):
flavours = {'mango': 'sweet', 'lemon': 'citric'}
df['flavour'] = df['fruit']
df = df.replace({'flavour': flavours})
return df
flavour_pandas_spark.run(spark)
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas_on_spark",
description="Only outputs a pandas dataframe",
tags=["flavour", "pandas_on_spark"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pandas_spark(df):
flavours = {'mango': 'sweet', 'lemon': 'citric'}
df['flavour'] = df['fruit']
df = df.replace({'flavour': flavours})
return df
flavour_pandas_spark.run(spark)
Out[2]:
fruit | flavour | |
---|---|---|
0 | mango | sweet |
1 | lemon | citric |
pyspark
node¶
In [3]:
Copied!
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node(
type="pyspark",
description="Only outputs a pandas dataframe",
tags=["flavour", "pyspark"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pyspark(df):
fruits = ['mango', 'lemon']
flavours = ['sweet', 'citric']
df = df.withColumn('flavour', F.col('fruit'))
df = df.replace(fruits, flavours, 'flavour')
return df
flavour_pyspark.run(spark)
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pyspark.sql.functions as F
@node(
type="pyspark",
description="Only outputs a pandas dataframe",
tags=["flavour", "pyspark"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pyspark(df):
fruits = ['mango', 'lemon']
flavours = ['sweet', 'citric']
df = df.withColumn('flavour', F.col('fruit'))
df = df.replace(fruits, flavours, 'flavour')
return df
flavour_pyspark.run(spark)
Out[3]:
fruit | flavour |
---|---|
mango | sweet |
lemon | citric |
spark_sql
node¶
In [4]:
Copied!
from flypipe.node import node
@node(
type='spark_sql',
dependencies=[fruits.select('fruit')]
)
def flavour_spark_sql(fruits):
return f"""
SELECT fruit, CASE WHEN fruit='mango' THEN 'sweet' ELSE 'citric' END as flavour from {fruits}
"""
flavour_spark_sql.run(spark=spark)
from flypipe.node import node
@node(
type='spark_sql',
dependencies=[fruits.select('fruit')]
)
def flavour_spark_sql(fruits):
return f"""
SELECT fruit, CASE WHEN fruit='mango' THEN 'sweet' ELSE 'citric' END as flavour from {fruits}
"""
flavour_spark_sql.run(spark=spark)
Out[4]:
fruit | flavour |
---|---|
mango | sweet |
lemon | citric |
dependencies¶
In [5]:
Copied!
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas",
description="Only outputs a pandas dataframe",
tags=["flavour", "pandas"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pandas(df):
flavours = {'mango': 'sweet', 'lemon': 'citric'}
df['flavour'] = df['fruit']
df = df.replace({'flavour': flavours})
return df
flavour_pandas.run()
from flypipe.node import node
from flypipe.schema import Schema, Column
from flypipe.schema.types import String
import pandas as pd
@node(
type="pandas",
description="Only outputs a pandas dataframe",
tags=["flavour", "pandas"],
dependencies = [
fruits.select("fruit").alias("df")
],
output=Schema(
fruits.output.get("fruit"),
Column("flavour", String(), "fruit flavour")
)
)
def flavour_pandas(df):
flavours = {'mango': 'sweet', 'lemon': 'citric'}
df['flavour'] = df['fruit']
df = df.replace({'flavour': flavours})
return df
flavour_pandas.run()
Out[5]:
fruit | flavour | |
---|---|---|
0 | mango | sweet |
1 | lemon | citric |