Low-Latency Transformations with Sparkleframe¶
Apache Spark is designed for distributed, large-scale data processing, but it is not optimized for low-latency use cases. There are scenarios, however, where you need to quickly re-compute certain data—for example, regenerating features for a machine learning model in real time or near-real time.
For such cases, you can leverage Sparkleframe.
What Is Sparkleframe?
Sparkleframe is an experimental backend for Flypipe that maps PySpark transformations to Polars DataFrame operations. Polars is a high-performance, multi-threaded DataFrame library written in Rust, and is significantly faster than Spark for small to medium-scale data in local execution.
By activating Sparkleframe, your existing Flypipe nodes defined as type="pyspark"
will execute using Polars under the hood—without any changes to your transformation logic.
Basic Example: Spark vs Sparkleframe¶
Below is a simple example using Flypipe and a Spark node:
from time import time
from flypipe import node
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
@node(
type="pyspark",
spark_context=True,
)
def my_node(spark):
return spark.createDataFrame([{"a": 1}])
@node(
type="pyspark",
dependencies=[my_node.alias("df")]
)
def add_10(df):
return df.withColumn("a", col("a") + 10)
start = time()
df = add_10.run(spark)
print(df.toPandas())
print(f"\nType of dataframe returned {type(df)}")
print(f"===> Time taken: {round((time() - start)*1000, 2)} milliseconds")
a 0 11 Type of dataframe returned <class 'pyspark.sql.dataframe.DataFrame'> ===> Time taken: 1909.74 milliseconds
Enabling Sparkleframe¶
To switch to the Sparkleframe backend and drastically reduce execution time, just activate it at the beginning of your script:
from sparkleframe.activate import activate
activate()
Run the same code again, and the transformation will execute using Polars instead of Spark:
from time import time
from flypipe import node
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
@node(
type="pyspark",
spark_context=True,
)
def my_node(spark):
return spark.createDataFrame([{"a": 1}])
@node(
type="pyspark",
dependencies=[my_node.alias("df")]
)
def add_10(df):
return df.withColumn("a", col("a") + 10)
start = time()
df = add_10.run(spark)
print(df.toPandas())
print(f"\nType of dataframe returned {type(df)}")
print(f"===> Time taken: {round((time() - start)*1000, 2)} milliseconds")
a 0 11 Type of dataframe returned <class 'sparkleframe.polarsdf.dataframe.DataFrame'> ===> Time taken: 33.05 milliseconds
Notes:
- Sparkleframe is still under development, and not all PySpark operations are currently supported.
- If you encounter any transformation that is not implemented, please open an issue on GitHub so it can be prioritized.
- Sparkleframe is especially useful for unit testing, feature prototyping, or serving small pipelines in microservices.
You can learn more about the design motivation behind Sparkleframe in this discussion thread.