Problem Statement: -
Sometimes you receive files with header and sometimes without it, how will you handle it?
from pyspark.sql.types import *
schema = StructType([
StructField("ID", IntegerType(), True), # ID column with IntegerType
StructField("Name", StringType(), True) # Name column with StringType
])
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{path}")
if df.schema == schema:
df.write.mode("overwrite").save("Files/output/")
else:
df = spark.read.schema(schema).csv(f"{path}")
df.write.mode("overwrite").save("Files/output/")
- Define Predefined Schema
schema = StructType([...]):- Defines a schema with two columns:
ID: Integer type, nullable.Name: String type, nullable.
- Defines a schema with two columns:
- Read the CSV File
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{path}"):- Reads a CSV file located at
path. - Options:
header="true": Treats the first row as the header.inferSchema="true": Automatically infers column data types.
- Reads a CSV file located at
- Compare Schema
if df.schema == schema:- Compares the inferred schema of the DataFrame (
df.schema) with the predefined schema (schema).
- Compares the inferred schema of the DataFrame (
- Case 1: Schema Matches
df.write.mode("overwrite").save("Files/output/"):- Saves the DataFrame to
Files/output/in overwrite mode, replacing any existing data.
- Saves the DataFrame to
- Case 2: Schema Doesn't Match
df = spark.read.schema(schema).csv(f"{path}"):- Re-reads the CSV file, explicitly applying the predefined schema.
df.write.mode("overwrite").save("Files/output/"):- Saves the corrected DataFrame to
Files/output/.
- Saves the corrected DataFrame to
- Purpose of the Code
- Ensures the schema of the output DataFrame matches the predefined schema by:
- Checking the schema.
- Re-reading and correcting the schema if necessary.
- Ensures the schema of the output DataFrame matches the predefined schema by:
- Key Details
overwrite: Replaces existing data in the output path.- Schema enforcement: Prevents unexpected schema variations in the output.