How it works?
Some nodes can represent tables or can be quite expensive to re-compute on every run. In these cases, it is possible to cache or persist the output of the node as you want.
If you want to enable cache for a specific node you have to crate a class
that tells Flypipe how to read
, write
and check if the cache or persisted data exists
.
Example:
from flypipe.cache import Cache
import pandas as pd
class MyCustomPersistance(Cache):
def __init__(self, csv_path_name: str):
self.csv_path_name = csv_path_name
def read(self, spark):
"""
Reads the persisted/cached data into a dataframe
"""
return pd.read_csv(self.csv_path_name)
def write(self, spark, df):
"""
Cache or persist the data
"""
df.to_csv(self.csv_path_name, index=False)
def exists(self, spark):
"""
Check if the data has been cached or persisted.
"""
return os.path.exists(self.csv_path_name)
Having defined your cache/persistance class, you can start marking nodes to be cached, for instance:
@node(
...
cache = MyCustomPersistance("data.csv")
...
)
def t0():
...
Cache/Persistance workflow¶
For every node that has cache set up, Flypipe will do the following:
Node has cache?
Yes -> cache exists (runs the method exists
)?
Yes -> runs read
method and returns the dataframe
No -> runs the node, collects the output dataframe and runs write
method.
Example 1: CSV persistence¶
In [1]:
Copied!
import os
import pandas as pd
from flypipe.cache import Cache
class SaveAsCSV(Cache):
def __init__(self, csv_path_name: str):
self.csv_path_name = csv_path_name
def read(self, spark):
print(f"Reading CSV `{self.csv_path_name}`...")
return pd.read_csv(self.csv_path_name)
def write(self, spark, df):
print(f"Writing CSV `{self.csv_path_name}`...")
df.to_csv(self.csv_path_name, index=False)
def exists(self, spark):
csv_exists = os.path.exists(self.csv_path_name)
print(f"CSV `{self.csv_path_name}` exists?", csv_exists)
return os.path.exists(self.csv_path_name)
import os
import pandas as pd
from flypipe.cache import Cache
class SaveAsCSV(Cache):
def __init__(self, csv_path_name: str):
self.csv_path_name = csv_path_name
def read(self, spark):
print(f"Reading CSV `{self.csv_path_name}`...")
return pd.read_csv(self.csv_path_name)
def write(self, spark, df):
print(f"Writing CSV `{self.csv_path_name}`...")
df.to_csv(self.csv_path_name, index=False)
def exists(self, spark):
csv_exists = os.path.exists(self.csv_path_name)
print(f"CSV `{self.csv_path_name}` exists?", csv_exists)
return os.path.exists(self.csv_path_name)
Example 2: Spark persistence¶
In [35]:
Copied!
from typing import List
from flypipe.cache import Cache
class SparkTable(Cache):
def __init__(self,
table_name: str,
schema: str,
merge_keys: List[str] = None,
partition_columns: List[str] = None,
schema_location: str = None):
self.table_name = table_name
self.schema = schema
self.merge_keys = merge_keys
self.partition_columns = partition_columns
self.schema_location = schema_location
@property
def table(self):
return f"{self.schema}.{self.table_name}"
def read(self, spark):
return spark.table(self.table)
def write(self, spark, df):
# check if database exists
if not spark.catalog.databaseExists(self.schema):
print(f"Creating database `{self.schema}`")
location = f"LOCATION '{self.schema_location}'" if self.schema_location else ""
spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.schema} {location}")
# check if table exists
if not spark.catalog.tableExists(self.table_name, self.schema):
print(f"Creating table `{self.table}`")
df = df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
if self.partition_columns:
df = df.partitionBy(*self.partition_columns)
df.saveAsTable(self.table)
else:
# table already exists, merge into
print(f"Merging into table `{self.table}`")
df.createOrReplaceTempView("updates")
keys = " AND ".join([f"s.{col} = t.{col}" for col in self.merge_keys])
merge_query = f"""
MERGE INTO {self.table} t
USING updates s
ON {keys}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
df._jdf.sparkSession().sql(merge_query)
def exists(self, spark):
table_exists = spark.catalog.tableExists(self.table_name, self.schema)
print(f"Table {self.table} exists?", table_exists)
return table_exists
from typing import List
from flypipe.cache import Cache
class SparkTable(Cache):
def __init__(self,
table_name: str,
schema: str,
merge_keys: List[str] = None,
partition_columns: List[str] = None,
schema_location: str = None):
self.table_name = table_name
self.schema = schema
self.merge_keys = merge_keys
self.partition_columns = partition_columns
self.schema_location = schema_location
@property
def table(self):
return f"{self.schema}.{self.table_name}"
def read(self, spark):
return spark.table(self.table)
def write(self, spark, df):
# check if database exists
if not spark.catalog.databaseExists(self.schema):
print(f"Creating database `{self.schema}`")
location = f"LOCATION '{self.schema_location}'" if self.schema_location else ""
spark.sql(f"CREATE DATABASE IF NOT EXISTS {self.schema} {location}")
# check if table exists
if not spark.catalog.tableExists(self.table_name, self.schema):
print(f"Creating table `{self.table}`")
df = df.write.format("delta").mode("overwrite").option("mergeSchema", "true")
if self.partition_columns:
df = df.partitionBy(*self.partition_columns)
df.saveAsTable(self.table)
else:
# table already exists, merge into
print(f"Merging into table `{self.table}`")
df.createOrReplaceTempView("updates")
keys = " AND ".join([f"s.{col} = t.{col}" for col in self.merge_keys])
merge_query = f"""
MERGE INTO {self.table} t
USING updates s
ON {keys}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
df._jdf.sparkSession().sql(merge_query)
def exists(self, spark):
table_exists = spark.catalog.tableExists(self.table_name, self.schema)
print(f"Table {self.table} exists?", table_exists)
return table_exists
Execution Graph¶
cleaning environment¶
In [36]:
Copied!
import os
# Replace 'path_to_file' with the actual file path you want to delete
file_path = '/tmp/data.csv'
# Check if file exists before trying to delete it
if os.path.isfile(file_path):
os.remove(file_path)
print(f"File '{file_path}' has been deleted.")
else:
print(f"File '{file_path}' does not exist.")
print("Dropping table tmp.my_table")
spark.sql("drop table if exists tmp.my_table")
import os
# Replace 'path_to_file' with the actual file path you want to delete
file_path = '/tmp/data.csv'
# Check if file exists before trying to delete it
if os.path.isfile(file_path):
os.remove(file_path)
print(f"File '{file_path}' has been deleted.")
else:
print(f"File '{file_path}' does not exist.")
print("Dropping table tmp.my_table")
spark.sql("drop table if exists tmp.my_table")
File '/tmp/data.csv' has been deleted. Dropping table tmp.my_table
Out[36]:
In [37]:
Copied!
import pandas as pd
from flypipe import node
from flypipe.cache import Cache
import pyspark.sql.functions as F
@node(
type="pandas",
cache = SaveAsCSV("/tmp/data.csv")
)
def csv_cache():
return pd.DataFrame(data={"id": [1, 2], "sales":[100.0, 34.1]})
@node(
type="pyspark",
cache = SparkTable("my_table", "tmp", merge_keys=["id"], schema_location="/tmp"),
dependencies=[csv_cache.select("id", "sales").alias("df")]
)
def spark_cache(df):
return df.withColumn("above_50", F.col("sales") > 50.0)
@node(
type="pyspark",
dependencies=[
spark_cache.select("id", "sales", "above_50").alias("df"),
]
)
def t0(df):
return df
import pandas as pd
from flypipe import node
from flypipe.cache import Cache
import pyspark.sql.functions as F
@node(
type="pandas",
cache = SaveAsCSV("/tmp/data.csv")
)
def csv_cache():
return pd.DataFrame(data={"id": [1, 2], "sales":[100.0, 34.1]})
@node(
type="pyspark",
cache = SparkTable("my_table", "tmp", merge_keys=["id"], schema_location="/tmp"),
dependencies=[csv_cache.select("id", "sales").alias("df")]
)
def spark_cache(df):
return df.withColumn("above_50", F.col("sales") > 50.0)
@node(
type="pyspark",
dependencies=[
spark_cache.select("id", "sales", "above_50").alias("df"),
]
)
def t0(df):
return df
1st run¶
When no cache exists all cache nodes will be active
In [38]:
Copied!
displayHTML(t0.html(spark))
displayHTML(t0.html(spark))
Table tmp.my_table exists? False CSV `/tmp/data.csv` exists? False
Out[38]: