Problem Statement: -

Sometimes you receive files with header and sometimes without it, how will you handle it?

from pyspark.sql.types import *

schema = StructType([
    StructField("ID", IntegerType(), True),   # ID column with IntegerType
    StructField("Name", StringType(), True)   # Name column with StringType
])
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{path}")

if df.schema == schema:
    df.write.mode("overwrite").save("Files/output/")
else:
    df = spark.read.schema(schema).csv(f"{path}")
    df.write.mode("overwrite").save("Files/output/")
  1. Define Predefined Schema
    • schema = StructType([...]):
      • Defines a schema with two columns:
        • ID: Integer type, nullable.
        • Name: String type, nullable.

  1. Read the CSV File
    • df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{path}"):
      • Reads a CSV file located at path.
      • Options:
        • header="true": Treats the first row as the header.
        • inferSchema="true": Automatically infers column data types.

  1. Compare Schema
    • if df.schema == schema:
      • Compares the inferred schema of the DataFrame (df.schema) with the predefined schema (schema).

  1. Case 1: Schema Matches
    • df.write.mode("overwrite").save("Files/output/"):
      • Saves the DataFrame to Files/output/ in overwrite mode, replacing any existing data.

  1. Case 2: Schema Doesn't Match
    • df = spark.read.schema(schema).csv(f"{path}"):
      • Re-reads the CSV file, explicitly applying the predefined schema.
    • df.write.mode("overwrite").save("Files/output/"):
      • Saves the corrected DataFrame to Files/output/.

  1. Purpose of the Code
    • Ensures the schema of the output DataFrame matches the predefined schema by:
      • Checking the schema.
      • Re-reading and correcting the schema if necessary.

  1. Key Details
    • overwrite: Replaces existing data in the output path.
    • Schema enforcement: Prevents unexpected schema variations in the output.