Problem Statement: -

Find the ctr of each Ad. Round ctr to 2 decimal points. Order the result table by ctr in descending order and by ad_id in ascending order in case of a tie.

Ctr=Clicked/(Clicked+Viewed)

Pasted image 20241224225204.png

# Define the schema for the Ads table
schema=StructType([
StructField('AD_ID',IntegerType(),True)
,StructField('USER_ID',IntegerType(),True)
,StructField('ACTION',StringType(),True)
])
# Define the data for the Ads table
data = [
(1, 1, 'Clicked'),
(2, 2, 'Clicked'),
(3, 3, 'Viewed'),
(5, 5, 'Ignored'),
(1, 7, 'Ignored'),
(2, 7, 'Viewed'),
(3, 5, 'Clicked'),
(1, 4, 'Viewed'),
(2, 11, 'Viewed'),
(1, 2, 'Clicked')
]

df = spark.createDataFrame(data,schema)
df.show()
+-----+-------+-------+
|AD_ID|USER_ID| ACTION|
+-----+-------+-------+
|    1|      1|Clicked|
|    2|      2|Clicked|
|    3|      3| Viewed|
|    5|      5|Ignored|
|    1|      7|Ignored|
|    2|      7| Viewed|
|    3|      5|Clicked|
|    1|      4| Viewed|
|    2|     11| Viewed|
|    1|      2|Clicked|
+-----+-------+-------+

orderBy_Multiple_Columns
When_Otherwise
fillna()


df_grouped = df.groupBy(col("AD_ID")) \
.agg(
sum(when(col("Action") == "Clicked",1).otherwise(0)).alias("Click_Count"),
sum(when(col("Action") == "Viewed",1).otherwise(0)).alias("View_Count") 
) \
.withColumn("Ctr",round(col("Click_Count")/(col("Click_Count")+col("View_Count")),2)) \
.fillna(0) \
.orderBy(["Ctr","AD_ID"],ascending = [False,True]) \

df_res = df_grouped.select("AD_ID","Ctr")
df_res.show()
+-----+----+
|AD_ID| Ctr|
+-----+----+
|    1|0.67|
|    3| 0.5|
|    2|0.33|
|    5| 0.0|
+-----+----+

Using Window

df_grouped = df.groupBy(col("AD_ID")) \
.agg(
sum(when(col("Action") == "Clicked",1).otherwise(0)).alias("Click_Count"),
sum(when(col("Action") == "Viewed",1).otherwise(0)).alias("View_Count") 
) \
.withColumn("Ctr",round(col("Click_Count")/(col("Click_Count")+col("View_Count")),2)) \
.fillna(0) \

Window_spec = Window.orderBy(col("Ctr").desc(),col("AD_ID").asc())

Ranked_df = df_grouped.withColumn("rank",rank().over(Window_spec))
Ranked_df.show()
+-----+-----------+----------+----+----+
|AD_ID|Click_Count|View_Count| Ctr|rank|
+-----+-----------+----------+----+----+
|    1|          2|         1|0.67|   1|
|    3|          1|         1| 0.5|   2|
|    2|          1|         2|0.33|   3|
|    5|          0|         0| 0.0|   4|
+-----+-----------+----------+----+----+
result_df = Ranked_df.select("AD_ID","Ctr")
result_df.show()
df.createOrReplaceTempView("Ads")

result = spark.sql("""
WITH CTE AS(
SELECT 
    AD_ID,
    SUM(CASE WHEN Action = "Clicked" THEN 1 ELSE 0 END) AS Click_Cnt, 
    SUM(CASE WHEN Action = "Viewed" THEN 1 ELSE 0 END) AS View_Cnt
FROM Ads
GROUP BY AD_ID
)

SELECT 
    AD_ID,
    COALESCE(ROUND(Click_cnt/(Click_cnt+View_cnt),2),0) AS Ctr
FROM CTE
ORDER BY CTR DESC, AD_ID ASC
""")

result.show()