MultiInputPySpark

This node runs any given PySpark code. The input dataframe is passed in the variable inDFs. The output dataframe is passed back by registering it as a temporary table.

Input

The input dataframe is passed in the variable inDFs.

Output

The output dataframe is passed back by registering it as a temporary table

Type

pyspark2inputs

Class

fire.nodes.code.NodeMultiInputPySpark

Fields

Name

Title

Description

code

PySpark

PySpark code to be run. Input dataframe : “inDF”, SparkContext : “sc”, SQLContext : “sqlContext”, Output/Result dataframe should be registered as a temporary table - df.registerTempTable(“outDF”)

schema

InferSchema

outputColNames

Column Names for the CSV

New Output Columns of the SQL

outputColTypes

Column Types for the CSV

Data Type of the Output Columns

outputColFormats

Column Formats for the CSV

Format of the Output Columns

Details

Pyspark Details

This node receives input pyspark dataframes in function called myfn.

The pyspark/python code processes it and returns one computed pyspark dataframe.

Examples

Pyspark Examples

Input Schema of dataframe.

Input Schema of first dataframe: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea

Input Schema of second dataframe: id, price, lotsize, bedrooms, bathrms, stories, driveway, recroom, fullbase, gashw, airco, garagepl, prefarea

Add the house_type column

from pyspark.sql.types import StringType

from pyspark.sql.functions import *

from pyspark.sql import *

from fire.workflowcontext import *

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDFs:[DataFrame], cust_dict:dict):

#get the first dataframe

df1 = inDFs[0]

#get the second dataframe

df2 = inDFs[1]

# Join the two dataframes

outdf = df1.join(df2, [‘id’])

return outdf