Skip to content

PySpark Daily December Summary II

Continuing on where we left off last post, I'll be exploring pypspark on a daily basis, just to get more used to it. Here I will be posting summaries that cover roughtly 10 days worth of posts that I make on Kaggle, so that would equate to three posts a month

Open Notebook

10/12/2023

❯❯❯ SQL like functions (ORDER BY)

Ordering a column using orderBy based on ascending f.col.asc() or descending order f.col.desc()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [("Alice", 25), 
        ("Bob", 30), 
        ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+
# Order DataFrame by age in descending order
df.orderBy(f.col('age').desc()).show()

+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
|    Bob| 30|
|  Alice| 25|
+-------+---+

If we want to sort with two columns, which is useful when we have the same values in the first column, we would do something like this:

# Create a DataFrame
data = [("Alice", 25, 180), 
        ("Bob", 25, 150), 
        ("Charlie", 35, 167)]
df = spark.createDataFrame(data, ["name", "age","height"])
df.show()
# Order DataFrame by age in descending order
df.orderBy(f.col('age').desc(),f.col('height').asc()).show()

+-------+---+------+
|   name|age|height|
+-------+---+------+
|Charlie| 35|   167|
|    Bob| 25|   150|
|  Alice| 25|   180|
+-------+---+------+

11/12/2023

❯❯❯ SQL like functions (JOIN)

Joining dataframes is of course an important part of data analysis:

  • Using pyspark.sql, we can join dataframes with the notation shown in 01/12/2023, however pyspark dataframe has its own method for joining dataframe tables join()
  • As with the pandas notation of merge df1.merge(df2), we can join dataframes using the a similar notation df1.join(df2,'on','how')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Create sample data for dataset1 (January)
dataset1 = spark.createDataFrame([
    ("2021-01-01", 10),
    ("2021-01-02", 20),
    ("2021-01-03", 30),
    ("2021-01-04", 70)
], ["date", "value1"])

# Create sample data for dataset2 (February)
dataset2 = spark.createDataFrame([
    ("2021-02-01", 40),
    ("2021-02-02", 50),
    ("2021-02-03", 60),
    ("2021-02-05", 70)
], ["date", "value2"])

# Show the joined dataframe
dataset1.show()
dataset2.show()

+----------+------+
|      date|value1|
+----------+------+
|2021-01-01|    10|
|2021-01-02|    20|
|2021-01-03|    30|
|2021-01-04|    70|
+----------+------+

+----------+------+
|      date|value2|
+----------+------+
|2021-02-01|    40|
|2021-02-02|    50|
|2021-02-03|    60|
|2021-02-05|    70|
+----------+------+

If we choose to inner join, we will be left with only three rows, since they both share them

dataset1.join(dataset2,on='date',how='inner').show()

+----------+------+------+
|      date|value1|value2|
+----------+------+------+
|2021-01-01|    10|    40|
|2021-01-02|    20|    50|
|2021-01-03|    30|    60|
+----------+------+------+

LEFT JOIN uses all the values in the left table, if some data is missing in the right it will be replaced with NULL

dataset1.join(dataset2,on='date',how='inner').show()

+----------+------+------+
|      date|value1|value2|
+----------+------+------+
|2021-01-01|    10|    40|
|2021-01-02|    20|    50|
|2021-01-03|    30|    60|
|2021-01-04|    70|  NULL|
+----------+------+------+

Similar to RIGHT JOIN

dataset1.join(dataset2,on='date',how='right').show()

+----------+------+------+
|      date|value1|value2|
+----------+------+------+
|2021-01-01|    10|    40|
|2021-01-02|    20|    50|
|2021-01-03|    30|    60|
|2021-01-05|  NULL|    70|
+----------+------+------+

And FULL OUTER join as well:

dataset1.join(dataset2,on='date',how='outer').show()

+----------+------+------+
|      date|value1|value2|
+----------+------+------+
|2021-01-01|    10|    40|
|2021-01-02|    20|    50|
|2021-01-03|    30|    60|
|2021-01-04|    70|  NULL|
|2021-01-05|  NULL|    70|
+----------+------+------+

12/12/2023

❯❯❯ PySpark UDF (Standard UDF)

PySpark UDFs are custom functions that can be created and applied to DataFrame columns in PySpark. They allow users to perform custom computations or transformations on DataFrame data by defining their own functions and applying them to specific columns,

A UDF example which takes one column value, the only thing to note is that we need to define the output type of the function:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

# Create a DataFrame
data = [("Alice", 25), 
        ("Bob", 30), 
        ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# UDF function which accepts one column & multiplies it by itself
def square(num):
    return num * num

# register UDF
square_udf = udf(square, IntegerType())

# Apply the UDF to a DataFrame column
new_df = df.withColumn("square", square_udf(df["age"]))
new_df.show()

+-------+---+------+
|   name|age|square|
+-------+---+------+
|  Alice| 25|   625|
|    Bob| 30|   900|
|Charlie| 35|  1225|
+-------+---+------+

We can also use more than one column of data

def add(num1,num2):
    return num1 + num2

add_udf = udf(add,IntegerType())

df = df.withColumn("added2",add_udf(df['age'],df['added2']))
df.show()

+-------+---+------+
|   name|age|added2|
+-------+---+------+
|  Alice| 25|    50|
|    Bob| 30|    60|
|Charlie| 35|    70|
+-------+---+------+

Lambda functions can also be used instead of standard python functions

add_udf = udf(lambda x,y : x + y,IntegerType())
df = df.withColumn("added3",add_udf(df['age'],df['age']))
df.show()

+-------+---+------+
|   name|age|added3|
+-------+---+------+
|  Alice| 25|    50|
|    Bob| 30|    60|
|Charlie| 35|    70|
+-------+---+------+

13/12/2023

❯❯❯ PySpark UDF to create features for modeling

UDF can be used like apply in pandas dataframes, allowing custom logic modifications to columns values.

In this example we'll create a new feature (new_column) based on the row values of other columns, and use it as feature input into our model LinearRegression. To do this, we will need to utilise the previously visited VectorAssembler as the inputCols

Suppose we have two features x1,x2 and we want to add an extra feature that is created from using the values from these two columns. To do this we create custom_udf and define the operations and output type DoubleType

To add a new column with the new value we simply use withColumn and call the UDF function with column values as input.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [(1, 2, 5), 
        (4, 1, 10), 
        (7, 3, 15)]
df = spark.createDataFrame(data, ["x1", "x2", "y"])

# Define a UDF to perform a custom operation
def custom_operation(x1,x2):
    return x1 * 2.0 + x2

custom_udf = udf(custom_operation, DoubleType())

# Apply the UDF to create a new column
df = df.withColumn("new_column", custom_udf(df["x1"], df["x2"]))
df.show()

# Assemble features into a vector
assembler = VectorAssembler(inputCols=["x1","new_column"], outputCol="features")
df = assembler.transform(df)
df.show()

# # Train a linear regression model
lr = LinearRegression(featuresCol="features", labelCol="y")
model = lr.fit(df)
spark.stop()

14/12/2023

❯❯❯ PySpark UDF to create features for modeling

PySpark offers another type of UDF, PandasUDF, which can be imported from functions as well like udf. Unlike UDF, we need to specify which type of pandasUDF we will be using (functionType), one of these types is PandasUDFType.SCALAR.

PandasUDFType.SCALAR is a constant in PySpark that represents the type of pandas UDF (SCALAR)

  • A scalar pandas UDF takes one or more columns as input and returns a single column as output
  • It operates on a single row at a time and can be used to apply arbitrary Python functions to the data in a DataFrame
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Create a sample DataFrame
data = [("alice", 25), 
        ("bob", 30), 
        ("charlie", 35)]

df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  alice| 25|
|    bob| 30|
|charlie| 35|
+-------+---+

We can define the UDF as a decorator @pandas_udf, we can add a simple functionality to capitalise the input string column using capitalise_name

from pyspark.sql.functions import pandas_udf, PandasUDFType

# Define a pandas UDF
# return type & function type

@pandas_udf(returnType="string", functionType=PandasUDFType.SCALAR)
def capitalise_name(name):
    return name.str.capitalize()

# Add new column Apply the pandas UDF on the DataFrame
df = df.withColumn("Capitalised", capitalise_name(df["Name"]))
df.show()

+-------+---+-----------+
|   Name|Age|Capitalised|
+-------+---+-----------+
|  alice| 25|      Alice|
|    bob| 30|        Bob|
|charlie| 35|    Charlie|
+-------+---+-----------+

Thank you for reading!

Any questions or comments about the above post can be addressed on the mldsai-info channel or to me directly shtrauss2, on shtrausslearning or shtrausslearning or simply below!