Problem Statement: -
Given dataframe, explode and have name as columns and countries as rows.
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
l = [(1,'Gaga','India',"2022-01-11"),(1,'Katy','UK', "2022-01-11"),(1,'Bey','Europe', "2022-01-11"),(2,'Gaga',None, "2022-10-11"),(2,'Katy','India', "2022-10-11"),(2,'Bey','US',"2022-02-15"),(3,'Gaga','Europe', "2022-10-11"),
(3,'Katy','US',"2022-10-11"),(3,'Bey',None,"2022-02-15"), (1, 'Gaga','US',"2022-01-11"),(3, 'Katy', 'Switz',"2022-02-15") ]
df = spark.createDataFrame(l,['ID','NAME','COUNTRY', 'Date_part'])
df.show()
+---+----+-------+----------+
| ID|NAME|COUNTRY| Date_part|
+---+----+-------+----------+
| 1|Gaga| India|2022-01-11|
| 1|Katy| UK|2022-01-11|
| 1| Bey| Europe|2022-01-11|
| 2|Gaga| NULL|2022-10-11|
| 2|Katy| India|2022-10-11|
| 2| Bey| US|2022-02-15|
| 3|Gaga| Europe|2022-10-11|
| 3|Katy| US|2022-10-11|
| 3| Bey| NULL|2022-02-15|
| 1|Gaga| US|2022-01-11|
| 3|Katy| Switz|2022-02-15|
+---+----+-------+----------+
df_1 = df_srt.groupBy("ID","Date_part").pivot("NAME").agg(collect_list("COUNTRY"))
df_1.show(truncate=False)
+---+----------+--------+-----------+-------+
|ID |Date_part |Bey |Gaga |Katy |
+---+----------+--------+-----------+-------+
|1 |2022-01-11|[Europe]|[India, US]|[UK] |
|2 |2022-02-15|[US] |[] |[] |
|2 |2022-10-11|[] |[] |[India]|
|3 |2022-02-15|[] |[] |[Switz]|
|3 |2022-10-11|[] |[Europe] |[US] |
+---+----------+--------+-----------+-------+
df_2 = df_1.withColumn("new",arrays_zip("Bey","Gaga","Katy"))
df_2.show(truncate=False)
+---+----------+--------+-----------+-------+---------------------------------------+
|ID |Date_part |Bey |Gaga |Katy |new |
+---+----------+--------+-----------+-------+---------------------------------------+
|1 |2022-01-11|[Europe]|[India, US]|[UK] |[{Europe, India, UK}, {NULL, US, NULL}]|
|2 |2022-02-15|[US] |[] |[] |[{US, NULL, NULL}] |
|2 |2022-10-11|[] |[] |[India]|[{NULL, NULL, India}] |
|3 |2022-02-15|[] |[] |[Switz]|[{NULL, NULL, Switz}] |
|3 |2022-10-11|[] |[Europe] |[US] |[{NULL, Europe, US}] |
+---+----------+--------+-----------+-------+---------------------------------------+
df_3 = df2.withColumn("new1",explode("new")).drop("new")
df_3.show()
+---+----------+--------+-----------+-------+-------------------+
| ID| Date_part| Bey| Gaga| Katy| new1|
+---+----------+--------+-----------+-------+-------------------+
| 1|2022-01-11|[Europe]|[India, US]| [UK]|{Europe, India, UK}|
| 1|2022-01-11|[Europe]|[India, US]| [UK]| {NULL, US, NULL}|
| 2|2022-02-15| [US]| []| []| {US, NULL, NULL}|
| 2|2022-10-11| []| []|[India]|{NULL, NULL, India}|
| 3|2022-02-15| []| []|[Switz]|{NULL, NULL, Switz}|
| 3|2022-10-11| []| [Europe]| [US]| {NULL, Europe, US}|
+---+----------+--------+-----------+-------+-------------------+
df_4 = df_3.select("ID","Date_Part","new1.Bey","new1.Gaga","new1.Katy").show()
+---+----------+------+------+-----+
| ID| Date_Part| Bey| Gaga| Katy|
+---+----------+------+------+-----+
| 1|2022-01-11|Europe| India| UK|
| 1|2022-01-11| NULL| US| NULL|
| 2|2022-02-15| US| NULL| NULL|
| 2|2022-10-11| NULL| NULL|India|
| 3|2022-02-15| NULL| NULL|Switz|
| 3|2022-10-11| NULL|Europe| US|
+---+----------+------+------+-----+
Parameterizing, in case there are many columns: -
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
l = [(1,'Gaga','India',"2022-01-11"),(1,'Katy','UK', "2022-01-11"),(1,'Bey','Europe', "2022-01-11"),(2,'Gaga',None, "2022-10-11"),(2,'Katy','India', "2022-10-11"),(2,'Bey','US',"2022-02-15"),(3,'Gaga','Europe', "2022-10-11"),
(3,'Katy','US',"2022-10-11"),(3,'Bey',None,"2022-02-15"), (1, 'Gaga','US',"2022-01-11"),(3, 'Katy', 'Switz',"2022-02-15") ]
df = spark.createDataFrame(l,['ID','NAME','COUNTRY', 'Date_part'])
df.show()
+---+----+-------+----------+
| ID|NAME|COUNTRY| Date_part|
+---+----+-------+----------+
| 1|Gaga| India|2022-01-11|
| 1|Katy| UK|2022-01-11|
| 1| Bey| Europe|2022-01-11|
| 2|Gaga| NULL|2022-10-11|
| 2|Katy| India|2022-10-11|
| 2| Bey| US|2022-02-15|
| 3|Gaga| Europe|2022-10-11|
| 3|Katy| US|2022-10-11|
| 3| Bey| NULL|2022-02-15|
| 1|Gaga| US|2022-01-11|
| 3|Katy| Switz|2022-02-15|
+---+----+-------+----------+
df_1 = df.groupBy("ID","Date_part").pivot("NAME").agg(collect_list("COUNTRY"))
df_1.show(truncate=False)
+---+----------+--------+-----------+-------+
|ID |Date_part |Bey |Gaga |Katy |
+---+----------+--------+-----------+-------+
|1 |2022-01-11|[Europe]|[India, US]|[UK] |
|2 |2022-10-11|[] |[] |[India]|
|2 |2022-02-15|[US] |[] |[] |
|3 |2022-10-11|[] |[Europe] |[US] |
|3 |2022-02-15|[] |[] |[Switz]|
+---+----------+--------+-----------+-------+
df_2 = df_1.withColumn("new",arrays_zip(*df_1.columns[2:]))
df_2.show(truncate=False)
+---+----------+--------+-----------+-------+---------------------------------------+
|ID |Date_part |Bey |Gaga |Katy |new |
+---+----------+--------+-----------+-------+---------------------------------------+
|1 |2022-01-11|[Europe]|[India, US]|[UK] |[{Europe, India, UK}, {NULL, US, NULL}]|
|2 |2022-10-11|[] |[] |[India]|[{NULL, NULL, India}] |
|2 |2022-02-15|[US] |[] |[] |[{US, NULL, NULL}] |
|3 |2022-10-11|[] |[Europe] |[US] |[{NULL, Europe, US}] |
|3 |2022-02-15|[] |[] |[Switz]|[{NULL, NULL, Switz}] |
+---+----------+--------+-----------+-------+---------------------------------------+
df_3 = df2.withColumn("new",explode("new")).drop(*df_1.columns[2:])
df_3.show()
+---+----------+-------------------+
| ID| Date_part| new|
+---+----------+-------------------+
| 1|2022-01-11|{Europe, India, UK}|
| 1|2022-01-11| {NULL, US, NULL}|
| 2|2022-02-15| {US, NULL, NULL}|
| 2|2022-10-11|{NULL, NULL, India}|
| 3|2022-02-15|{NULL, NULL, Switz}|
| 3|2022-10-11| {NULL, Europe, US}|
+---+----------+-------------------+
The bit done for parameterization: -
exploded_columns_list = df_1.columns[2:]
for i in range(len(exploded_columns_list)):
exploded_columns_list[i] = "new."+exploded_columns_list[i]
print(exploded_columns_list)
['new.Bey', 'new.Gaga', 'new.Katy']
The * operator is used to unpack a list or tuple into individual arguments and can also be used to select all columns in DataFrame operations.
df_4 = df_3.select(*df_1.columns[:2],*exploded_columns_list)
df_4.show()
+---+----------+------+------+-----+
| ID| Date_part| Bey| Gaga| Katy|
+---+----------+------+------+-----+
| 1|2022-01-11|Europe| India| UK|
| 1|2022-01-11| NULL| US| NULL|
| 2|2022-02-15| US| NULL| NULL|
| 2|2022-10-11| NULL| NULL|India|
| 3|2022-02-15| NULL| NULL|Switz|
| 3|2022-10-11| NULL|Europe| US|
+---+----------+------+------+-----+