Skip to content

Load Spark Table

flypipe.datasource.spark.Spark

Creates a flypipe node that loads a Spark table

Attributes:

Name Type Description
table str

name of the spark table table

Returns:

Type Description

Node

Usage:

@node(
    ...
    dependencies = [
        Spark("your_spark_table_name_here").select("column1", "column2",...).alias("any_alias_df")
    ]
    ...
)
def my_transformation(any_alias_df):
    return any_alias_df

Source code in flypipe/datasource/spark.py
def Spark(table):
    """Creates a flypipe node that loads a Spark table

    Attributes:
        table (str): name of the spark table table

    Returns:
        Node

    Usage:
    ``` py
    @node(
        ...
        dependencies = [
            Spark("your_spark_table_name_here").select("column1", "column2",...).alias("any_alias_df")
        ]
        ...
    )
    def my_transformation(any_alias_df):
        return any_alias_df
    ```
    """

    @node(
        type="pyspark",
        description=f"Spark datasource on table {table}",
        tags=["datasource"],
        spark_context=True,
    )
    def spark_datasource(spark):
        if spark is None:
            raise ValueError("Please provide a spark session, i.e. node.run(spark)")

        return spark.table(table)

    spark_datasource.function.__name__ = table

    key = f"spark.{table}"
    spark_datasource.key = re.sub(r"[^\da-zA-Z]", "_", key)
    spark_datasource.node_type = NodeType.DATASOURCE
    return spark_datasource