PySpark

This node runs any given PySpark code. The input dataframe is passed into the function myfn as a parameter.

Input

The input dataframe is passed in the variable to the function myfn.

Output

The output dataframe is returned back from the function

Type

pyspark

Class

fire.nodes.etl.NodePySpark

Fields

Name

Title

Description

code

PySpark

PySpark code to be run. Input dataframe : “inDF”, SparkContext : “sc”, SQLContext : “sqlContext”, Output/Result dataframe should be registered as a temporary table - df.registerTempTable(“outDF”)

schema

InferSchema

outputColNames

Column Names

New Output Columns of the SQL

outputColTypes

Column Types

Data Type of the Output Columns

outputColFormats

Column Formats

Format of the Output Columns

Details

Pyspark Details

This node receives receives an input pyspark dataframe in function called myfn.

The pyspark/python code processes it and returns one computed pyspark dataframe.

Examples

Pyspark Examples

Input Schema: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea

Add the house_type column

from pyspark.sql.types import StringType

from pyspark.sql.functions import *

from pyspark.sql import *

from fire.workflowcontext import *

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):

house_type_udf = udf(lambda bedrooms: “big house” if int(bedrooms) >2 else “small house”, StringType())

filetr_df = inDF.select(“id”, “price”, “lotsize”, “bedrooms”)

outDF = filetr_df.withColumn(“house_type”, house_type_udf(filetr_df.bedrooms))

return outDF

Using pandas dataframe

from pyspark.sql.types import StringType

from pyspark.sql.functions import *

from pyspark.sql import *

from fire.workflowcontext import *

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):

# Convert the Spark DataFrame to a Pandas DataFrame

pdf = inDF.select(“*”).toPandas()

# Display the result on the Executions page

workflowContext.outStr(id, “Outputting Pandas Dataframe”)

# Display the dataframe on the Executions page

workflowContext.outPandasDataframe(id, “Pandas DataFrame”, pdf, 10)

# Create a Spark DataFrame from a Pandas DataFrame

df = spark.createDataFrame(pdf)

return df

Numpy 2d array to pandas dataframe & pandas dataframe to spark dataframe

from pyspark.sql.types import StringType

from pyspark.sql.functions import *

from pyspark.sql import *

import numpy as np

import pandas as pd

from fire.workflowcontext import *

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):

# Create the numpy 2d array

example_array = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])

# Convert to Pandas Dataframe

pandas_dataframe = pd.DataFrame(example_array, columns=[‘a’, ‘b’, ‘c’, ‘d’])

# Convert Pandas Dataframe to Spark Dataframe

spark_dataframe = spark.createDataFrame(pandas_dataframe)

return spark_dataframe