MultiInputPySpark¶
This node runs any given PySpark code. The input dataframe is passed in the variable inDFs. The output dataframe is passed back by registering it as a temporary table.
Input¶
The input dataframe is passed in the variable inDFs.
Output¶
The output dataframe is passed back by registering it as a temporary table
Type¶
pyspark2inputs
Class¶
fire.nodes.code.NodeMultiInputPySpark
Fields¶
Name |
Title |
Description |
|---|---|---|
code |
PySpark |
PySpark code to be run. Input dataframe : “inDF”, SparkContext : “sc”, SQLContext : “sqlContext”, Output/Result dataframe should be registered as a temporary table - df.registerTempTable(“outDF”) |
schema |
InferSchema |
|
outputColNames |
Column Names for the CSV |
New Output Columns of the SQL |
outputColTypes |
Column Types for the CSV |
Data Type of the Output Columns |
outputColFormats |
Column Formats for the CSV |
Format of the Output Columns |
Details¶
Pyspark Details¶
This node receives input pyspark dataframes in function called myfn.
The pyspark/python code processes it and returns one computed pyspark dataframe.
Examples¶
Pyspark Examples¶
Input Schema of dataframe.
Input Schema of first dataframe: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea
Input Schema of second dataframe: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea
Add the house_type column¶
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
from fire.workflowcontext import *
def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDFs:[DataFrame], cust_dict:dict):
#get the first dataframe
df1 = inDFs[0]
#get the second dataframe
df2 = inDFs[1]
# Join the two dataframes
outdf = df1.join(df2, [‘id’])
return outdf