Problem Statement: -

Given dataframe, explode and have name as columns and countries as rows.

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

l = [(1,'Gaga','India',"2022-01-11"),(1,'Katy','UK', "2022-01-11"),(1,'Bey','Europe', "2022-01-11"),(2,'Gaga',None, "2022-10-11"),(2,'Katy','India', "2022-10-11"),(2,'Bey','US',"2022-02-15"),(3,'Gaga','Europe', "2022-10-11"),
(3,'Katy','US',"2022-10-11"),(3,'Bey',None,"2022-02-15"), (1, 'Gaga','US',"2022-01-11"),(3, 'Katy', 'Switz',"2022-02-15") ]

df = spark.createDataFrame(l,['ID','NAME','COUNTRY', 'Date_part'])
df.show()
+---+----+-------+----------+
| ID|NAME|COUNTRY| Date_part|
+---+----+-------+----------+
|  1|Gaga|  India|2022-01-11|
|  1|Katy|     UK|2022-01-11|
|  1| Bey| Europe|2022-01-11|
|  2|Gaga|   NULL|2022-10-11|
|  2|Katy|  India|2022-10-11|
|  2| Bey|     US|2022-02-15|
|  3|Gaga| Europe|2022-10-11|
|  3|Katy|     US|2022-10-11|
|  3| Bey|   NULL|2022-02-15|
|  1|Gaga|     US|2022-01-11|
|  3|Katy|  Switz|2022-02-15|
+---+----+-------+----------+

collect_list()
pivot()

df_1 = df_srt.groupBy("ID","Date_part").pivot("NAME").agg(collect_list("COUNTRY"))

df_1.show(truncate=False)
+---+----------+--------+-----------+-------+
|ID |Date_part |Bey     |Gaga       |Katy   |
+---+----------+--------+-----------+-------+
|1  |2022-01-11|[Europe]|[India, US]|[UK]   |
|2  |2022-02-15|[US]    |[]         |[]     |
|2  |2022-10-11|[]      |[]         |[India]|
|3  |2022-02-15|[]      |[]         |[Switz]|
|3  |2022-10-11|[]      |[Europe]   |[US]   |
+---+----------+--------+-----------+-------+

arrays_zip()

df_2 = df_1.withColumn("new",arrays_zip("Bey","Gaga","Katy"))
df_2.show(truncate=False)
+---+----------+--------+-----------+-------+---------------------------------------+
|ID |Date_part |Bey     |Gaga       |Katy   |new                                    |
+---+----------+--------+-----------+-------+---------------------------------------+
|1  |2022-01-11|[Europe]|[India, US]|[UK]   |[{Europe, India, UK}, {NULL, US, NULL}]|
|2  |2022-02-15|[US]    |[]         |[]     |[{US, NULL, NULL}]                     |
|2  |2022-10-11|[]      |[]         |[India]|[{NULL, NULL, India}]                  |
|3  |2022-02-15|[]      |[]         |[Switz]|[{NULL, NULL, Switz}]                  |
|3  |2022-10-11|[]      |[Europe]   |[US]   |[{NULL, Europe, US}]                   |
+---+----------+--------+-----------+-------+---------------------------------------+

explode()

df_3 = df2.withColumn("new1",explode("new")).drop("new")
df_3.show()
+---+----------+--------+-----------+-------+-------------------+
| ID| Date_part|     Bey|       Gaga|   Katy|               new1|
+---+----------+--------+-----------+-------+-------------------+
|  1|2022-01-11|[Europe]|[India, US]|   [UK]|{Europe, India, UK}|
|  1|2022-01-11|[Europe]|[India, US]|   [UK]|   {NULL, US, NULL}|
|  2|2022-02-15|    [US]|         []|     []|   {US, NULL, NULL}|
|  2|2022-10-11|      []|         []|[India]|{NULL, NULL, India}|
|  3|2022-02-15|      []|         []|[Switz]|{NULL, NULL, Switz}|
|  3|2022-10-11|      []|   [Europe]|   [US]| {NULL, Europe, US}|
+---+----------+--------+-----------+-------+-------------------+
df_4 = df_3.select("ID","Date_Part","new1.Bey","new1.Gaga","new1.Katy").show()
+---+----------+------+------+-----+
| ID| Date_Part|   Bey|  Gaga| Katy|
+---+----------+------+------+-----+
|  1|2022-01-11|Europe| India|   UK|
|  1|2022-01-11|  NULL|    US| NULL|
|  2|2022-02-15|    US|  NULL| NULL|
|  2|2022-10-11|  NULL|  NULL|India|
|  3|2022-02-15|  NULL|  NULL|Switz|
|  3|2022-10-11|  NULL|Europe|   US|
+---+----------+------+------+-----+

Parameterizing, in case there are many columns: -

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

l = [(1,'Gaga','India',"2022-01-11"),(1,'Katy','UK', "2022-01-11"),(1,'Bey','Europe', "2022-01-11"),(2,'Gaga',None, "2022-10-11"),(2,'Katy','India', "2022-10-11"),(2,'Bey','US',"2022-02-15"),(3,'Gaga','Europe', "2022-10-11"),
(3,'Katy','US',"2022-10-11"),(3,'Bey',None,"2022-02-15"), (1, 'Gaga','US',"2022-01-11"),(3, 'Katy', 'Switz',"2022-02-15") ]

df = spark.createDataFrame(l,['ID','NAME','COUNTRY', 'Date_part'])
df.show()
+---+----+-------+----------+
| ID|NAME|COUNTRY| Date_part|
+---+----+-------+----------+
|  1|Gaga|  India|2022-01-11|
|  1|Katy|     UK|2022-01-11|
|  1| Bey| Europe|2022-01-11|
|  2|Gaga|   NULL|2022-10-11|
|  2|Katy|  India|2022-10-11|
|  2| Bey|     US|2022-02-15|
|  3|Gaga| Europe|2022-10-11|
|  3|Katy|     US|2022-10-11|
|  3| Bey|   NULL|2022-02-15|
|  1|Gaga|     US|2022-01-11|
|  3|Katy|  Switz|2022-02-15|
+---+----+-------+----------+
df_1 = df.groupBy("ID","Date_part").pivot("NAME").agg(collect_list("COUNTRY"))
df_1.show(truncate=False)
+---+----------+--------+-----------+-------+
|ID |Date_part |Bey     |Gaga       |Katy   |
+---+----------+--------+-----------+-------+
|1  |2022-01-11|[Europe]|[India, US]|[UK]   |
|2  |2022-10-11|[]      |[]         |[India]|
|2  |2022-02-15|[US]    |[]         |[]     |
|3  |2022-10-11|[]      |[Europe]   |[US]   |
|3  |2022-02-15|[]      |[]         |[Switz]|
+---+----------+--------+-----------+-------+
df_2 = df_1.withColumn("new",arrays_zip(*df_1.columns[2:]))
df_2.show(truncate=False)
+---+----------+--------+-----------+-------+---------------------------------------+
|ID |Date_part |Bey     |Gaga       |Katy   |new                                    |
+---+----------+--------+-----------+-------+---------------------------------------+
|1  |2022-01-11|[Europe]|[India, US]|[UK]   |[{Europe, India, UK}, {NULL, US, NULL}]|
|2  |2022-10-11|[]      |[]         |[India]|[{NULL, NULL, India}]                  |
|2  |2022-02-15|[US]    |[]         |[]     |[{US, NULL, NULL}]                     |
|3  |2022-10-11|[]      |[Europe]   |[US]   |[{NULL, Europe, US}]                   |
|3  |2022-02-15|[]      |[]         |[Switz]|[{NULL, NULL, Switz}]                  |
+---+----------+--------+-----------+-------+---------------------------------------+
df_3 = df2.withColumn("new",explode("new")).drop(*df_1.columns[2:])
df_3.show()
+---+----------+-------------------+
| ID| Date_part|                new|
+---+----------+-------------------+
|  1|2022-01-11|{Europe, India, UK}|
|  1|2022-01-11|   {NULL, US, NULL}|
|  2|2022-02-15|   {US, NULL, NULL}|
|  2|2022-10-11|{NULL, NULL, India}|
|  3|2022-02-15|{NULL, NULL, Switz}|
|  3|2022-10-11| {NULL, Europe, US}|
+---+----------+-------------------+

The bit done for parameterization: -

exploded_columns_list = df_1.columns[2:]

for i in range(len(exploded_columns_list)):
    exploded_columns_list[i] = "new."+exploded_columns_list[i]

print(exploded_columns_list)
['new.Bey', 'new.Gaga', 'new.Katy']

The * operator is used to unpack a list or tuple into individual arguments and can also be used to select all columns in DataFrame operations.

df_4 = df_3.select(*df_1.columns[:2],*exploded_columns_list)
df_4.show()
+---+----------+------+------+-----+
| ID| Date_part|   Bey|  Gaga| Katy|
+---+----------+------+------+-----+
|  1|2022-01-11|Europe| India|   UK|
|  1|2022-01-11|  NULL|    US| NULL|
|  2|2022-02-15|    US|  NULL| NULL|
|  2|2022-10-11|  NULL|  NULL|India|
|  3|2022-02-15|  NULL|  NULL|Switz|
|  3|2022-10-11|  NULL|Europe|   US|
+---+----------+------+------+-----+