PySpark¶
This node runs any given PySpark code. The input dataframe is passed into the function myfn as a parameter.
Input¶
The input dataframe is passed in the variable to the function myfn.
Output¶
The output dataframe is returned back from the function
Type¶
pyspark
Class¶
fire.nodes.etl.NodePySpark
Fields¶
Name |
Title |
Description |
|---|---|---|
code |
PySpark |
PySpark code to be run. Input dataframe : “inDF”, SparkContext : “sc”, SQLContext : “sqlContext”, Output/Result dataframe should be registered as a temporary table - df.registerTempTable(“outDF”) |
schema |
InferSchema |
|
outputColNames |
Column Names |
New Output Columns of the SQL |
outputColTypes |
Column Types |
Data Type of the Output Columns |
outputColFormats |
Column Formats |
Format of the Output Columns |
Details¶
Pyspark Details¶
This node receives receives an input pyspark dataframe in function called myfn.
The pyspark/python code processes it and returns one computed pyspark dataframe.
Examples¶
Pyspark Examples¶
Input Schema: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea
Add the house_type column¶
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
from fire.workflowcontext import *
def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):
house_type_udf = udf(lambda bedrooms: “big house” if int(bedrooms) >2 else “small house”, StringType())
filetr_df = inDF.select(“id”, “price”, “lotsize”, “bedrooms”)
outDF = filetr_df.withColumn(“house_type”, house_type_udf(filetr_df.bedrooms))
return outDF
Using pandas dataframe¶
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
from fire.workflowcontext import *
def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):
# Convert the Spark DataFrame to a Pandas DataFrame
pdf = inDF.select(“*”).toPandas()
# Display the result on the Executions page
workflowContext.outStr(id, “Outputting Pandas Dataframe”)
# Display the dataframe on the Executions page
workflowContext.outPandasDataframe(id, “Pandas DataFrame”, pdf, 10)
# Create a Spark DataFrame from a Pandas DataFrame
df = spark.createDataFrame(pdf)
return df
Numpy 2d array to pandas dataframe & pandas dataframe to spark dataframe¶
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
import numpy as np
import pandas as pd
from fire.workflowcontext import *
def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame, cust_dict):
# Create the numpy 2d array
example_array = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])
# Convert to Pandas Dataframe
pandas_dataframe = pd.DataFrame(example_array, columns=[‘a’, ‘b’, ‘c’, ‘d’])
# Convert Pandas Dataframe to Spark Dataframe
spark_dataframe = spark.createDataFrame(pandas_dataframe)
return spark_dataframe