Skip to content

Node Function

flypipe.node_function.node_function

Decorator factory that returns the given function wrapped inside a NodeFunction class

Parameters:

Name Type Description Default
requested_columns (bool, optional)

List of requested columns that successors nodes are demanding from the node function, if True will retrieve requested_columns as named argument. Defaults to False.

required
node_dependencies (List[Node or NodeFunction], optional)

List of external nodes that the node function is dependent on. Any node retrieved by the node function (called internal node) can only be dependent on any internal node or any node inside node_dependencies. True, returns spark context as argument to the function. Defaults to False.

required
output (Schema, optional)

Defines the output schema of the node. Defaults to None.

required

Returns:

Type Description
List[Node]

a list of nodes created internally

Examples

# Syntax
@node_function(
    requested_columns=True,
    node_dependencies=[
        Spark("table")
    ]
)
def my_node_function(requested_columns):

    @node(
        type="pandas",
        dependencies=[
            Spark("table").select(requested_columns).alias("df")
        ]
    )
    def internal_node_1(df):
        return df

    @node(
        type="pandas",
        dependencies=[
            internal_node_1.alias("df")
        ]
    )
    def internal_node_2(df):
        return df

    return internal_node_1, internal_node_2 # <-- ALL INTERNAL NODES CREATED MUST BE RETURNED
Source code in flypipe/node_function.py
def node_function(*args, **kwargs):
    """
    Decorator factory that returns the given function wrapped inside a NodeFunction class

    Parameters:
        requested_columns (bool,optional): List of requested columns that successors nodes are demanding from the node function, if True will retrieve `requested_columns` as named argument. Defaults to `False`.
        node_dependencies (List[Node or NodeFunction],optional): List of external nodes that the node function is dependent on. Any node retrieved by the node function (called internal node) can only be dependent on any internal node or any node inside `node_dependencies`. True, returns spark context as argument to the function. Defaults to `False`.
        output (Schema,optional): Defines the output schema of the node. Defaults to `None`.

    Returns:
        (List[Node]): a list of nodes created internally

    Raises:
        ValueError
            If any internal node is of type NodeFunction; if any internal node has a dependency that is not to another
            internal node and not declared in node_dependencies

    # Examples

    ``` py
    # Syntax
    @node_function(
        requested_columns=True,
        node_dependencies=[
            Spark("table")
        ]
    )
    def my_node_function(requested_columns):

        @node(
            type="pandas",
            dependencies=[
                Spark("table").select(requested_columns).alias("df")
            ]
        )
        def internal_node_1(df):
            return df

        @node(
            type="pandas",
            dependencies=[
                internal_node_1.alias("df")
            ]
        )
        def internal_node_2(df):
            return df

        return internal_node_1, internal_node_2 # <-- ALL INTERNAL NODES CREATED MUST BE RETURNED
    ```

    """

    def decorator(func):
        return NodeFunction(func, *args, **kwargs)

    return decorator