The objective of this article is to understand Pyspark Window functions. The blog will do a comparative study of Pyspark window functions and Relational DB systems, Oracle Database, analytical functions.
Spark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. To perform an operation on a group first, we need to partition the data using Window.partitionBy(), and for row number and rank function we need to additionally order by on partition data using orderBy() clause.Connect to Spark
import pyspark
from pyspark.sql import SparkSession
print('modules imported')
spark=SparkSession.builder.appName('Spark_window_functions').getOrCreate()
Load Dataset
emp_df=spark.read.csv(r'emp.csv',header=True,inferSchema=True)
emp_df.show(10)
Import necessary Librariesfrom pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank
from pyspark.sql import functions as f
Get Top N Salaried employees Deptwise using row_number(), rank(), dense_rank()
row_number()
win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',row_number().over(win_func)).show()
row_number() ranks the records in a sequential manner, in exact similar way like oracle row_number() works.
Now, check for top 3 earners department wise.
emp_df.withColumn('rank',row_number().over(win_func)).filter('rank<=3').show()
Let's write the SQL query to validate the data returned by Pyspark row_number().select * from (
select e.*, row_number()
over(partition by deptno order by sal desc) rank from emp e) rk where rank=3
order by deptno;
win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',rank().over(win_func)).show()
Now, check for top 3 earners department wise. emp_df.withColumn('rank',rank().over(win_func)).filter('rank=3').show()
pyspark rank() also works in similar fashion as oracle rank().
Let's write the SQL query to validate the data returned by Pyspark rank().
select * from (
select e.*, rank() over(partition by deptno order by sal desc) rank from emp e) rk
where rank=3
order by deptno;
Pyspark output and SQL output are similar, so output is validated.
dense_rank()
win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',dense_rank().over(win_func)).show()
Now, check for top 3 earners department wise.
emp_df.withColumn('rank',dense_rank().over(win_func)).filter('rank=3').show()
Let's write the SQL query to validate the data returned by Pyspark dense_rank().select * from (
select e.*, dense_rank() over(partition by deptno order by sal desc) rank from emp e) rk
where rank=3
order by deptno;
Pyspark output and SQL output are same, output validated successfully . So, Idea behind choosing this use case is, Data Engineers sometimes work on relational datasets along with other stuff and budding Data Engineers work more on relational data, so I thought this particular use case can be helpful for both of them to do a comparative study and understand the window function.
That's all with the Pyspark Window functions and in upcoming blog I will share some more interesting use cases of Window functions.
Please subscribe and follow me on blogspot for upcoming contents. Word of appreciation always helps to keep up the spirit and a healthy knowledge network helps us grow.
Github link: https://github.com/viv07/PythonDataEngg
Comments
Post a Comment