Split Datasets

Image
 The Objective of this article is to transform data set from row to column using explode() method. The scope of this article is to understand how to  unnest or explode a data set using parallel processing framework Pyspark and Python native library- Pandas . Dataset looks like as below: dept,name 10,vivek#ruby#aniket 20,rahul#john#amy 30,shankar#jagdish 40, 50,yug#alex#alexa Pandas explode() import pandas as pd pan_df=pd.read_csv(r'explode.csv') df_exp=pan_df.assign(name=pan_df['name'].str.split('#')).explode('name') df_exp Output: Dataset is transformed successfully and we are able to create new rows from nested dataset. Pandas way of explode is simple, crisp and straight forward unless the dataset is complex. In next section of this article we will cover PySpark way of exploding or unnesting dataset. PySpark explode() Import libraries and Connect to Spark from pyspark import SparkContext,SparkConf import pyspark from pyspark.sql import SparkSes

Spark Window Functions

 The objective of this article is to understand Pyspark Window functions. The blog will do a comparative study of Pyspark window functions and Relational DB systems, Oracle Database, analytical functions.


Spark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. To perform an operation on a group first, we need to partition the data using Window.partitionBy(), and for row number and rank function we need to additionally order by on partition data using orderBy() clause.

Connect to Spark

import pyspark
from pyspark.sql import SparkSession
print('modules imported')
spark=SparkSession.builder.appName('Spark_window_functions').getOrCreate()

Load Dataset

emp_df=spark.read.csv(r'emp.csv',header=True,inferSchema=True) 
emp_df.show(10)


Import necessary Libraries
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank
from pyspark.sql import functions as f

Get Top N Salaried employees Deptwise using row_number(), rank(), dense_rank()

row_number()

win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',row_number().over(win_func)).show()

row_number() ranks the records in a sequential manner, in exact similar way like oracle row_number() works.

Now, check for top 3 earners department wise.
emp_df.withColumn('rank',row_number().over(win_func)).filter('rank<=3').show()

Let's write the SQL query to validate the data returned by Pyspark row_number().
select * from (
  select e.*, row_number() 
  over(partition by deptno order by sal desc) rank from emp e) rk where rank=3 
order by deptno;

rank()
win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',rank().over(win_func)).show()
Now, check for top 3 earners department wise.
emp_df.withColumn('rank',rank().over(win_func)).filter('rank=3').show()
pyspark rank() also works in similar fashion as oracle rank().

Let's write the SQL query to validate the data returned by Pyspark rank().
select * from (
  select e.*, rank() over(partition by deptno order by sal desc) rank from emp e) rk
  where rank=3
order by deptno;
Pyspark output and SQL output are similar, so output is validated.

dense_rank()
win_func=Window.partitionBy(emp_df['DEPTNO']).orderBy(emp_df['SAL'].desc())
emp_df.withColumn('rank',dense_rank().over(win_func)).show()


Now, check for top 3 earners department wise.
 
emp_df.withColumn('rank',dense_rank().over(win_func)).filter('rank=3').show()

Let's write the SQL query to validate the data returned by Pyspark dense_rank().
select * from (
  select e.*, dense_rank() over(partition by deptno order by sal desc) rank from emp e) rk 
  where rank=3
order by deptno;


Pyspark output and SQL output are same, output validated successfully .
So, Idea behind choosing this use case is, Data Engineers sometimes work on relational datasets along with other stuff and budding Data Engineers work more on relational data, so I thought this particular use case can be helpful for both of them to do a comparative study and understand the window function.

That's all with the Pyspark Window functions and in upcoming blog I will share some more interesting use cases of Window functions. 

Please subscribe and follow me on blogspot for upcoming contents. Word of appreciation always helps to keep up the spirit and a healthy knowledge network helps us grow. 

Github link: https://github.com/viv07/PythonDataEngg
Share the Knowledge.





Comments

Popular posts from this blog

Split Datasets

pySQL

Can Julia compete PySpark? A Data Comparison